qwen_agent/routes/skill_manager.py
2026-05-26 16:13:26 +08:00

833 lines
32 KiB
Python

import os
import re
import json
import shutil
import zipfile
import logging
import asyncio
import yaml
from typing import List, Optional
from dataclasses import dataclass
from fastapi import APIRouter, HTTPException, Query, UploadFile, File, Form
from pydantic import BaseModel
from utils.settings import SKILLS_DIR, PROJECT_NAME
import aiofiles
logger = logging.getLogger('app')
router = APIRouter()
class SkillItem(BaseModel):
name: str
description: str
user_skill: bool = False
category: str = "other"
class SkillListResponse(BaseModel):
skills: List[SkillItem]
total: int
@dataclass
class SkillValidationResult:
"""Skill format validation result"""
valid: bool
name: Optional[str] = None
description: Optional[str] = None
category: Optional[str] = None
error_message: Optional[str] = None
# ============ Security constants ============
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB maximum upload file size
MAX_UNCOMPRESSED_SIZE = 500 * 1024 * 1024 # 500MB maximum extracted size
MAX_COMPRESSION_RATIO = 100 # maximum compression ratio of 100:1
MAX_ZIP_ENTRIES = 1000 # maximum number of files in a zip archive
def validate_bot_id(bot_id: str) -> str:
"""Validate the bot_id format to prevent directory traversal attacks"""
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id cannot be empty")
# Check for directory traversal characters
if '..' in bot_id or '/' in bot_id or '\\' in bot_id:
raise HTTPException(status_code=400, detail="bot_id contains illegal characters")
# Validate UUID format if needed
uuid_pattern = r'^[a-fA-F0-9-]{36}$'
if not re.match(uuid_pattern, bot_id):
logger.warning(f"bot_id format may be invalid: {bot_id}")
return bot_id
def validate_skill_name(skill_name: str) -> str:
"""Validate the skill_name format to prevent directory traversal attacks"""
if not skill_name:
raise HTTPException(status_code=400, detail="skill_name cannot be empty")
# Check for directory traversal characters
if '..' in skill_name or '/' in skill_name or '\\' in skill_name:
raise HTTPException(status_code=400, detail="skill_name contains illegal characters")
return skill_name
async def validate_upload_file_size(file: UploadFile) -> int:
"""Validate uploaded file size and return the actual file size"""
file_size = 0
chunk_size = 8192
# Save the current position so it can be reset later
await file.seek(0)
while chunk := await file.read(chunk_size):
file_size += len(chunk)
if file_size > MAX_FILE_SIZE:
await file.seek(0) # Reset the file pointer
raise HTTPException(
status_code=413,
detail=f"File is too large; maximum allowed is {MAX_FILE_SIZE // (1024*1024)}MB"
)
await file.seek(0) # Reset the file pointer for later use
return file_size
def has_skill_metadata_files(dir_path: str) -> bool:
"""Check whether the directory contains skill metadata files (SKILL.md or .claude-plugin/plugin.json)
Args:
dir_path: directory path to inspect
Returns:
bool: returns True if metadata files are present
"""
skill_md = os.path.join(dir_path, 'SKILL.md')
plugin_json = os.path.join(dir_path, '.claude-plugin', 'plugin.json')
return os.path.exists(skill_md) or os.path.exists(plugin_json)
def detect_skill_structure(extract_dir: str) -> str:
"""Detect where skill metadata is located in the extracted directory
First check whether the root directory directly contains SKILL.md or .claude-plugin/plugin.json,
and if not, then inspect second-level subdirectories.
Args:
extract_dir: path to the extracted directory
Returns:
"root" - the root directory directly contains SKILL.md or .claude-plugin/plugin.json
"subdirs" - second-level subdirectories contain skill metadata
"unknown" - no valid skill metadata was found
"""
# Step 1: check the root directory
if has_skill_metadata_files(extract_dir):
logger.info(f"Skill metadata found at root level: {extract_dir}")
return "root"
# Step 2: check second-level subdirectories
try:
for item in os.listdir(extract_dir):
item_path = os.path.join(extract_dir, item)
if os.path.isdir(item_path) and item != '__MACOSX':
if has_skill_metadata_files(item_path):
logger.info(f"Skill metadata found in subdirectory: {item}")
return "subdirs"
except OSError as e:
logger.warning(f"Error scanning directory {extract_dir}: {e}")
logger.warning(f"No skill metadata found in {extract_dir}")
return "unknown"
async def safe_extract_zip(zip_path: str, extract_dir: str) -> None:
"""Safely extract a zip file and prevent ZipSlip and zip bomb attacks
Args:
zip_path: zip file path
extract_dir: target extraction directory
Raises:
HTTPException: if a malicious file is detected
"""
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
# Check the number of files
file_list = zip_ref.infolist()
if len(file_list) > MAX_ZIP_ENTRIES:
raise zipfile.BadZipFile(f"zip archive contains too many files: {len(file_list)}")
# Check compression ratio and total size
compressed_size = sum(z.file_size for z in file_list)
uncompressed_size = sum(z.compress_size for z in file_list)
if uncompressed_size > MAX_UNCOMPRESSED_SIZE:
raise zipfile.BadZipFile(
f"Extracted size {uncompressed_size // (1024*1024)}MB exceeds the limit of "
f"{MAX_UNCOMPRESSED_SIZE // (1024*1024)}MB"
)
# Check the compression ratio to prevent zip bombs
if compressed_size > 0:
ratio = uncompressed_size / compressed_size
if ratio > MAX_COMPRESSION_RATIO:
raise zipfile.BadZipFile(
f"Compression ratio {ratio:.1f}:1 exceeds the limit of {MAX_COMPRESSION_RATIO}:1, "
f"which may indicate a zip bomb attack"
)
# Normalize the target directory path
extract_dir_real = os.path.realpath(extract_dir)
# Safely extract each file
for zip_info in file_list:
# Check for directory traversal attacks
if '..' in zip_info.filename or zip_info.filename.startswith('/'):
raise zipfile.BadZipFile(
f"Detected a directory traversal attack: {zip_info.filename}"
)
# Build the full target path
target_path = os.path.realpath(os.path.join(extract_dir, zip_info.filename))
# Ensure the target path stays within the extraction directory
if not target_path.startswith(extract_dir_real + os.sep):
if target_path != extract_dir_real: # allow the directory itself
raise zipfile.BadZipFile(
f"File would be extracted outside the target directory: {zip_info.filename}"
)
# Check for symbolic links in a way compatible with Python 3.8+
# The is_symlink() method is available only in Python 3.9+, so use hasattr for older versions
is_symlink = (
hasattr(zip_info, 'is_symlink') and zip_info.is_symlink()
) or (
# Check via external_attr for compatibility with all versions
(zip_info.external_attr >> 16) & 0o170000 == 0o120000
)
if is_symlink:
raise zipfile.BadZipFile(
f"Symbolic links are not allowed: {zip_info.filename}"
)
# Extract files using a thread pool to avoid blocking
await asyncio.to_thread(zip_ref.extract, zip_info, extract_dir)
except zipfile.BadZipFile as e:
raise HTTPException(status_code=400, detail=f"Invalid zip file: {str(e)}")
async def save_upload_file_async(file: UploadFile, destination: str) -> None:
"""Asynchronously save the uploaded file to the destination path"""
async with aiofiles.open(destination, 'wb') as f:
chunk_size = 8192
while chunk := await file.read(chunk_size):
await f.write(chunk)
def parse_plugin_json(plugin_json_path: str) -> SkillValidationResult:
"""Parse the plugin.json file for name and description
Args:
plugin_json_path: Path to the plugin.json file
Returns:
SkillValidationResult with validation result and error message if invalid
"""
try:
with open(plugin_json_path, 'r', encoding='utf-8') as f:
plugin_config = json.load(f)
if not isinstance(plugin_config, dict):
logger.warning(f"Invalid plugin.json format in {plugin_json_path}")
return SkillValidationResult(
valid=False,
error_message="Invalid plugin.json format: file content must be a JSON object"
)
# Check for required fields
missing_fields = []
if 'name' not in plugin_config:
missing_fields.append('name')
if 'description' not in plugin_config:
missing_fields.append('description')
if missing_fields:
logger.warning(f"Missing fields {missing_fields} in {plugin_json_path}")
return SkillValidationResult(
valid=False,
error_message=f"plugin.json is missing required fields: make sure it includes {', '.join(missing_fields)} field(s)"
)
return SkillValidationResult(
valid=True,
name=plugin_config['name'],
description=plugin_config['description'],
category=plugin_config.get('category'),
)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error in {plugin_json_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="Invalid plugin.json format: make sure the file is valid JSON"
)
except Exception as e:
logger.error(f"Error parsing {plugin_json_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="An unknown error occurred while reading plugin.json; check file permissions or format"
)
def parse_skill_frontmatter(skill_md_path: str) -> SkillValidationResult:
"""Parse the YAML frontmatter from SKILL.md file
Args:
skill_md_path: Path to the SKILL.md file
Returns:
SkillValidationResult with validation result and error message if invalid
"""
try:
with open(skill_md_path, 'r', encoding='utf-8') as f:
content = f.read()
# Match YAML frontmatter between --- delimiters
frontmatter_match = re.match(r'^---\s*\n(.*?)\n---', content, re.DOTALL)
if not frontmatter_match:
logger.warning(f"No frontmatter found in {skill_md_path}")
return SkillValidationResult(
valid=False,
error_message="Invalid SKILL.md format: the file must start with YAML frontmatter delimited by --- and include name and description fields"
)
frontmatter = frontmatter_match.group(1)
# Parse YAML using yaml.safe_load
metadata = yaml.safe_load(frontmatter)
if not isinstance(metadata, dict):
logger.warning(f"Invalid frontmatter format in {skill_md_path}")
return SkillValidationResult(
valid=False,
error_message="Invalid SKILL.md frontmatter format: YAML content must be an object"
)
# Check for required fields
missing_fields = []
if 'name' not in metadata:
missing_fields.append('name')
if 'description' not in metadata:
missing_fields.append('description')
if missing_fields:
logger.warning(f"Missing fields {missing_fields} in {skill_md_path}")
return SkillValidationResult(
valid=False,
error_message=f"SKILL.md is missing required fields: make sure the frontmatter includes {', '.join(missing_fields)} field(s)"
)
return SkillValidationResult(
valid=True,
name=metadata['name'],
description=metadata['description'],
category=metadata.get('category'),
)
except yaml.YAMLError as e:
logger.error(f"YAML parse error in {skill_md_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="Invalid SKILL.md frontmatter format: make sure the YAML is valid"
)
except Exception as e:
logger.error(f"Error parsing {skill_md_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="An unknown error occurred while reading SKILL.md; check file permissions or format"
)
def get_skill_metadata(skill_path: str) -> SkillValidationResult:
"""Get skill metadata, trying plugin.json first, then SKILL.md
Args:
skill_path: Path to the skill directory
Returns:
SkillValidationResult with validation result and error message if invalid
"""
plugin_json_path = os.path.join(skill_path, '.claude-plugin', 'plugin.json')
skill_md_path = os.path.join(skill_path, 'SKILL.md')
has_plugin_json = os.path.exists(plugin_json_path)
has_skill_md = os.path.exists(skill_md_path)
# Check if at least one metadata file exists
if not has_plugin_json and not has_skill_md:
return SkillValidationResult(
valid=False,
error_message="Invalid skill format: make sure the skill contains SKILL.md with YAML frontmatter or .claude-plugin/plugin.json"
)
# Try plugin.json first
if has_plugin_json:
result = parse_plugin_json(plugin_json_path)
if result.valid:
return result
# If plugin.json exists but is invalid, return its error
# (unless SKILL.md also exists and might be valid)
if not has_skill_md:
return result
# If both exist, prefer plugin.json error message
skill_md_result = parse_skill_frontmatter(skill_md_path)
if skill_md_result.valid:
return skill_md_result
# Both invalid, return plugin.json error
return result
# Fallback to SKILL.md
if has_skill_md:
return parse_skill_frontmatter(skill_md_path)
return SkillValidationResult(
valid=False,
error_message="Invalid skill format: unable to read valid metadata"
)
def get_skill_metadata_legacy(skill_path: str) -> Optional[dict]:
"""Legacy function for backward compatibility - returns dict or None
Args:
skill_path: Path to the skill directory
Returns:
dict with 'name' and 'description' if found, None otherwise
"""
result = get_skill_metadata(skill_path)
if result.valid:
ret = {
'name': result.name,
'description': result.description,
}
if result.category:
ret['category'] = result.category
return ret
return None
def get_official_skills(base_dir: str) -> List[SkillItem]:
"""Get all official skills from the skills directory
Args:
base_dir: Base directory of the project
Returns:
List of SkillItem objects
"""
skills = []
skill_names = set()
# Use SKILLS_DIR from settings, relative to base_dir
if os.path.isabs(SKILLS_DIR):
skills_root_dir = SKILLS_DIR
else:
skills_root_dir = os.path.join(base_dir, SKILLS_DIR)
official_skills_dirs = [
os.path.join(skills_root_dir, "common"),
os.path.join(skills_root_dir, PROJECT_NAME),
]
for official_skills_dir in official_skills_dirs:
if not os.path.exists(official_skills_dir):
logger.warning(f"Official skills directory not found: {official_skills_dir}")
continue
for skill_name in os.listdir(official_skills_dir):
if skill_name in skill_names:
continue
skill_path = os.path.join(official_skills_dir, skill_name)
if os.path.isdir(skill_path):
metadata = get_skill_metadata_legacy(skill_path)
if metadata:
skills.append(SkillItem(
name=metadata['name'],
description=metadata['description'],
user_skill=False,
category=metadata.get('category', 'other'),
))
skill_names.add(skill_name)
logger.debug(f"Found official skill: {metadata['name']} from {official_skills_dir}")
return skills
def get_user_skills(base_dir: str, bot_id: str) -> List[SkillItem]:
"""Get all user uploaded skills for a specific bot
Args:
base_dir: Base directory of the project
bot_id: Bot ID to look up user skills for
Returns:
List of SkillItem objects
"""
skills = []
user_skills_dir = os.path.join(base_dir, 'projects', 'uploads', bot_id, 'skills')
if not os.path.exists(user_skills_dir):
logger.info(f"No user skills directory found for bot {bot_id}: {user_skills_dir}")
return skills
for skill_name in os.listdir(user_skills_dir):
skill_path = os.path.join(user_skills_dir, skill_name)
if os.path.isdir(skill_path):
metadata = get_skill_metadata_legacy(skill_path)
if metadata:
skills.append(SkillItem(
name=metadata['name'],
description=metadata['description'],
user_skill=True,
category=metadata.get('category', 'custom'),
))
logger.debug(f"Found user skill: {metadata['name']}")
return skills
@router.get("/api/v1/skill/list", response_model=SkillListResponse)
async def list_skills(
bot_id: str = Query(..., description="Bot ID to fetch user skills for")
):
"""
Get list of all available skills (official + user uploaded)
Args:
bot_id: Bot ID to fetch user uploaded skills for
Returns:
SkillListResponse containing all skills
Notes:
- Official skills are read from /skills/common and /skills/{PROJECT_NAME}
- User skills are read from /projects/uploads/{bot_id}/skills directory
- User skills are marked with user_skill: true
"""
try:
# Get the project base directory
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Get official skills
official_skills = get_official_skills(base_dir)
# Get user skills for the specific bot
user_skills = get_user_skills(base_dir, bot_id)
# Combine both lists (user skills first)
all_skills = user_skills + official_skills
logger.info(f"Found {len(official_skills)} official skills and {len(user_skills)} user skills for bot {bot_id}")
return SkillListResponse(
skills=all_skills,
total=len(all_skills)
)
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in list_skills: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/api/v1/skill/upload")
async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = Form(None)):
"""
Skill upload API endpoint that uploads a zip file to ./projects/uploads/ and extracts it automatically
Security improvements:
- P1-001: ZipSlip path traversal protection - check each extracted file path
- P1-004: File size limit - maximum 50 MB
- P1-005: Zip bomb protection - check compression ratio (max 100:1) and extracted size (max 500 MB)
- P1-008: Async I/O - use aiofiles and asyncio.to_thread
Args:
file: uploaded zip file
bot_id: Bot ID used to create the user-specific skills directory
Returns:
dict: response containing file path and extraction information
Notes:
- Only .zip skill files are supported
- After upload, files are automatically extracted to projects/uploads/{bot_id}/skills/{skill_name}/
- File size limit: 50 MB
- Extracted size limit: 500 MB
"""
file_path = None # Initialize for use in exception handling
try:
# Validate bot_id (P1-006 path traversal protection)
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id cannot be empty")
bot_id = validate_bot_id(bot_id)
# Validate the file name
if not file.filename:
raise HTTPException(status_code=400, detail="File name cannot be empty")
logger.info(f"Skill upload - bot_id: {bot_id}, filename: {file.filename}")
# Validate that the file is a zip file
original_filename = file.filename
name_without_ext, file_extension = os.path.splitext(original_filename)
if file_extension.lower() != '.zip':
raise HTTPException(status_code=400, detail="Only .zip skill files can be uploaded")
# P1-004: Validate file size asynchronously without blocking the event loop
file_size = await validate_upload_file_size(file)
logger.info(f"File size: {file_size // 1024}KB")
folder_name = name_without_ext
# Create the upload directory and store the zip file first
upload_dir = os.path.join("projects", "uploads", bot_id, "skill_zip")
await asyncio.to_thread(os.makedirs, upload_dir, exist_ok=True)
# Store the zip file path
file_path = os.path.join(upload_dir, original_filename)
# P1-008: Save the file asynchronously using aiofiles without blocking the event loop
await save_upload_file_async(file, file_path)
logger.info(f"Saved zip file: {file_path}")
# Extract everything into a temporary directory
tmp_extract_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp", folder_name)
await asyncio.to_thread(os.makedirs, tmp_extract_dir, exist_ok=True)
# P1-001, P1-005: Safely extract to prevent ZipSlip and zip bombs
await safe_extract_zip(file_path, tmp_extract_dir)
logger.info(f"Extracted to tmp dir: {tmp_extract_dir}")
# Clean up the __MACOSX directory automatically created by macOS
macosx_dir = os.path.join(tmp_extract_dir, "__MACOSX")
if os.path.exists(macosx_dir):
await asyncio.to_thread(shutil.rmtree, macosx_dir)
logger.info(f"Cleaned up __MACOSX directory: {macosx_dir}")
# Detect the structure based on the location of skill metadata files
skill_structure = await asyncio.to_thread(detect_skill_structure, tmp_extract_dir)
logger.info(f"Detected skill structure: {skill_structure}")
skills_dir = os.path.join("projects", "uploads", bot_id, "skills")
await asyncio.to_thread(os.makedirs, skills_dir, exist_ok=True)
skill_dirs_to_validate = []
if skill_structure == "root":
# If the root directory directly contains skill metadata, treat the whole directory as one skill
result = await asyncio.to_thread(get_skill_metadata, tmp_extract_dir)
if result.valid and result.name:
skill_name = result.name
else:
skill_name = folder_name
target_dir = os.path.join(skills_dir, skill_name)
# If the target already exists, delete it first
if os.path.exists(target_dir):
await asyncio.to_thread(shutil.rmtree, target_dir)
await asyncio.to_thread(shutil.move, tmp_extract_dir, target_dir)
skill_dirs_to_validate.append(target_dir)
logger.info(f"Moved skill to: {target_dir}")
elif skill_structure == "subdirs":
# If second-level subdirectories contain skill metadata, move them one by one
for item in os.listdir(tmp_extract_dir):
item_path = os.path.join(tmp_extract_dir, item)
if not os.path.isdir(item_path) or item == '__MACOSX':
continue
if has_skill_metadata_files(item_path):
result = await asyncio.to_thread(get_skill_metadata, item_path)
if result.valid and result.name:
skill_name = result.name
else:
skill_name = item
target_dir = os.path.join(skills_dir, skill_name)
if os.path.exists(target_dir):
await asyncio.to_thread(shutil.rmtree, target_dir)
await asyncio.to_thread(shutil.move, item_path, target_dir)
skill_dirs_to_validate.append(target_dir)
logger.info(f"Moved skill '{skill_name}' to: {target_dir}")
# Clean up the temporary directory
if os.path.exists(tmp_extract_dir):
await asyncio.to_thread(shutil.rmtree, tmp_extract_dir)
else:
# unknown - no valid skill metadata was found
await asyncio.to_thread(shutil.rmtree, tmp_extract_dir)
raise HTTPException(
status_code=400,
detail="Invalid skill format: make sure the skill contains SKILL.md with YAML frontmatter or .claude-plugin/plugin.json"
)
# Validate the format of each skill directory
validation_errors = []
for skill_dir in skill_dirs_to_validate:
validation_result = await asyncio.to_thread(get_skill_metadata, skill_dir)
if not validation_result.valid:
skill_dir_name = os.path.basename(skill_dir)
validation_errors.append(f"{skill_dir_name}: {validation_result.error_message}")
logger.warning(f"Skill format validation failed for {skill_dir}: {validation_result.error_message}")
# If validation errors exist, clean up extracted files and return an error
if validation_errors:
for skill_dir in skill_dirs_to_validate:
try:
await asyncio.to_thread(shutil.rmtree, skill_dir)
logger.info(f"Cleaned up invalid skill directory: {skill_dir}")
except Exception as cleanup_error:
logger.error(f"Failed to cleanup skill directory {skill_dir}: {cleanup_error}")
if len(validation_errors) == 1:
error_detail = validation_errors[0]
else:
error_detail = "Multiple skill format validations failed:\n" + "\n".join(validation_errors)
raise HTTPException(status_code=400, detail=error_detail)
# Get the final skill name
if len(skill_dirs_to_validate) == 1:
final_extract_path = skill_dirs_to_validate[0]
final_skill_name = os.path.basename(final_extract_path)
else:
final_extract_path = skills_dir
final_skill_name = folder_name
return {
"success": True,
"message": f"Skill file uploaded and extracted successfully",
"file_path": file_path,
"extract_path": final_extract_path,
"original_filename": original_filename,
"skill_name": final_skill_name
}
except HTTPException:
# Clean up uploaded files and temporary directories
if file_path and os.path.exists(file_path):
try:
await asyncio.to_thread(os.remove, file_path)
logger.info(f"Cleaned up file: {file_path}")
except Exception as cleanup_error:
logger.error(f"Failed to cleanup file: {cleanup_error}")
tmp_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp") if bot_id else None
if tmp_dir and os.path.exists(tmp_dir):
try:
await asyncio.to_thread(shutil.rmtree, tmp_dir)
except Exception:
pass
raise
except Exception as e:
# Clean up uploaded files and temporary directories
if file_path and os.path.exists(file_path):
try:
await asyncio.to_thread(os.remove, file_path)
logger.info(f"Cleaned up file: {file_path}")
except Exception as cleanup_error:
logger.error(f"Failed to cleanup file: {cleanup_error}")
tmp_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp") if bot_id else None
if tmp_dir and os.path.exists(tmp_dir):
try:
await asyncio.to_thread(shutil.rmtree, tmp_dir)
except Exception:
pass
logger.error(f"Error uploading skill file: {str(e)}")
# Do not expose detailed error information to the client for security reasons
raise HTTPException(status_code=500, detail="Skill file upload failed")
@router.delete("/api/v1/skill/remove")
async def remove_skill(
bot_id: str = Query(..., description="Bot ID"),
skill_name: str = Query(..., description="Skill name to remove")
):
"""
Delete a user-uploaded skill
Args:
bot_id: Bot ID
skill_name: skill name to delete
Returns:
dict: deletion result
Notes:
- Only user-uploaded skills can be deleted; official skills cannot be deleted
- Deletion path: projects/uploads/{bot_id}/skills/{skill_name}
"""
try:
# Validate parameters to prevent directory traversal attacks
bot_id = validate_bot_id(bot_id)
skill_name = validate_skill_name(skill_name)
logger.info(f"Skill remove - bot_id: {bot_id}, skill_name: {skill_name}")
# Build the deletion directory path
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
skill_dir = os.path.join(base_dir, "projects", "uploads", bot_id, "skills", skill_name)
# Normalize the path and ensure it stays within the allowed directory
skill_dir_real = os.path.realpath(skill_dir)
allowed_base = os.path.realpath(os.path.join(base_dir, "projects", "uploads", bot_id, "skills"))
if not skill_dir_real.startswith(allowed_base + os.sep):
raise HTTPException(status_code=403, detail="Illegal deletion path")
# Check whether the directory exists
if not os.path.exists(skill_dir_real):
raise HTTPException(status_code=404, detail="Skill does not exist")
# Check whether the path is a directory
if not os.path.isdir(skill_dir_real):
raise HTTPException(status_code=400, detail="Target path is not a directory")
# Delete the directory using a thread pool to avoid blocking the event loop
await asyncio.to_thread(shutil.rmtree, skill_dir_real)
logger.info(f"Successfully removed skill directory: {skill_dir_real}")
# Synchronously delete the skill copy under the robot directory
robot_skill_dir = os.path.join(base_dir, "projects", "robot", bot_id, "skills", skill_name)
if os.path.exists(robot_skill_dir):
await asyncio.to_thread(shutil.rmtree, robot_skill_dir)
logger.info(f"Also removed robot skill directory: {robot_skill_dir}")
return {
"success": True,
"message": f"Skill '{skill_name}' deleted successfully",
"bot_id": bot_id,
"skill_name": skill_name
}
except HTTPException:
raise
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error removing skill: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail="Failed to delete skill")