import os import re import json import shutil import zipfile import logging import asyncio import yaml from typing import List, Optional from dataclasses import dataclass from fastapi import APIRouter, HTTPException, Query, UploadFile, File, Form from pydantic import BaseModel from utils.settings import SKILLS_DIR import aiofiles logger = logging.getLogger('app') router = APIRouter() class SkillItem(BaseModel): name: str description: str user_skill: bool = False class SkillListResponse(BaseModel): skills: List[SkillItem] total: int @dataclass class SkillValidationResult: """Skill 格式验证结果""" valid: bool name: Optional[str] = None description: Optional[str] = None error_message: Optional[str] = None # ============ 安全常量 ============ MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB 最大上传文件大小 MAX_UNCOMPRESSED_SIZE = 500 * 1024 * 1024 # 500MB 解压后最大大小 MAX_COMPRESSION_RATIO = 100 # 最大压缩比例 100:1 MAX_ZIP_ENTRIES = 1000 # zip 文件中最多文件数量 def validate_bot_id(bot_id: str) -> str: """验证 bot_id 格式,防止路径遍历攻击""" if not bot_id: raise HTTPException(status_code=400, detail="bot_id 不能为空") # 检查路径遍历字符 if '..' in bot_id or '/' in bot_id or '\\' in bot_id: raise HTTPException(status_code=400, detail="bot_id 包含非法字符") # 验证 UUID 格式(可选,根据实际需求) uuid_pattern = r'^[a-fA-F0-9-]{36}$' if not re.match(uuid_pattern, bot_id): logger.warning(f"bot_id 格式可能无效: {bot_id}") return bot_id def validate_skill_name(skill_name: str) -> str: """验证 skill_name 格式,防止路径遍历攻击""" if not skill_name: raise HTTPException(status_code=400, detail="skill_name 不能为空") # 检查路径遍历字符 if '..' in skill_name or '/' in skill_name or '\\' in skill_name: raise HTTPException(status_code=400, detail="skill_name 包含非法字符") return skill_name async def validate_upload_file_size(file: UploadFile) -> int: """验证上传文件大小,返回实际文件大小""" file_size = 0 chunk_size = 8192 # 保存当前位置以便后续重置 await file.seek(0) while chunk := await file.read(chunk_size): file_size += len(chunk) if file_size > MAX_FILE_SIZE: await file.seek(0) # 重置文件指针 raise HTTPException( status_code=413, detail=f"文件过大,最大允许 {MAX_FILE_SIZE // (1024*1024)}MB" ) await file.seek(0) # 重置文件指针供后续使用 return file_size def has_skill_metadata_files(dir_path: str) -> bool: """检查目录是否包含 skill 元数据文件(SKILL.md 或 .claude-plugin/plugin.json) Args: dir_path: 要检查的目录路径 Returns: bool: 如果包含元数据文件则返回 True """ skill_md = os.path.join(dir_path, 'SKILL.md') plugin_json = os.path.join(dir_path, '.claude-plugin', 'plugin.json') return os.path.exists(skill_md) or os.path.exists(plugin_json) def detect_skill_structure(extract_dir: str) -> str: """检测解压后目录中 skill 元数据的位置 优先检查根目录是否直接包含 SKILL.md 或 .claude-plugin/plugin.json, 如果没有,再检查第二级子目录。 Args: extract_dir: 解压后的目录路径 Returns: "root" - 根目录直接包含 SKILL.md 或 .claude-plugin/plugin.json "subdirs" - 第二级子目录包含 skill 元数据 "unknown" - 未找到有效的 skill 元数据 """ # 第一步:检查根目录 if has_skill_metadata_files(extract_dir): logger.info(f"Skill metadata found at root level: {extract_dir}") return "root" # 第二步:检查第二级子目录 try: for item in os.listdir(extract_dir): item_path = os.path.join(extract_dir, item) if os.path.isdir(item_path) and item != '__MACOSX': if has_skill_metadata_files(item_path): logger.info(f"Skill metadata found in subdirectory: {item}") return "subdirs" except OSError as e: logger.warning(f"Error scanning directory {extract_dir}: {e}") logger.warning(f"No skill metadata found in {extract_dir}") return "unknown" async def safe_extract_zip(zip_path: str, extract_dir: str) -> None: """安全地解压 zip 文件,防止 ZipSlip 和 zip 炸弹攻击 Args: zip_path: zip 文件路径 extract_dir: 解压目标目录 Raises: HTTPException: 如果检测到恶意文件 """ try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: # 检查文件数量 file_list = zip_ref.infolist() if len(file_list) > MAX_ZIP_ENTRIES: raise zipfile.BadZipFile(f"zip 文件包含过多文件: {len(file_list)}") # 检查压缩比例和总大小 compressed_size = sum(z.file_size for z in file_list) uncompressed_size = sum(z.compress_size for z in file_list) if uncompressed_size > MAX_UNCOMPRESSED_SIZE: raise zipfile.BadZipFile( f"解压后大小 {uncompressed_size // (1024*1024)}MB 超过限制 " f"{MAX_UNCOMPRESSED_SIZE // (1024*1024)}MB" ) # 检查压缩比例(防止 zip 炸弹) if compressed_size > 0: ratio = uncompressed_size / compressed_size if ratio > MAX_COMPRESSION_RATIO: raise zipfile.BadZipFile( f"压缩比例 {ratio:.1f}:1 超过限制 {MAX_COMPRESSION_RATIO}:1," f"可能是 zip 炸弹攻击" ) # 规范化目标目录路径 extract_dir_real = os.path.realpath(extract_dir) # 安全地解压每个文件 for zip_info in file_list: # 检查路径遍历攻击 if '..' in zip_info.filename or zip_info.filename.startswith('/'): raise zipfile.BadZipFile( f"检测到路径遍历攻击: {zip_info.filename}" ) # 构建完整的目标路径 target_path = os.path.realpath(os.path.join(extract_dir, zip_info.filename)) # 确保目标路径在解压目录内 if not target_path.startswith(extract_dir_real + os.sep): if target_path != extract_dir_real: # 允许目录本身 raise zipfile.BadZipFile( f"文件将被解压到目标目录之外: {zip_info.filename}" ) # 检查符号链接(兼容 Python 3.8+) # is_symlink() 方法在 Python 3.9+ 才有,使用 hasattr 兼容旧版本 is_symlink = ( hasattr(zip_info, 'is_symlink') and zip_info.is_symlink() ) or ( # 通过 external_attr 检查(兼容所有版本) (zip_info.external_attr >> 16) & 0o170000 == 0o120000 ) if is_symlink: raise zipfile.BadZipFile( f"不允许符号链接: {zip_info.filename}" ) # 解压文件(使用线程池避免阻塞) await asyncio.to_thread(zip_ref.extract, zip_info, extract_dir) except zipfile.BadZipFile as e: raise HTTPException(status_code=400, detail=f"无效的 zip 文件: {str(e)}") async def save_upload_file_async(file: UploadFile, destination: str) -> None: """异步保存上传文件到目标路径""" async with aiofiles.open(destination, 'wb') as f: chunk_size = 8192 while chunk := await file.read(chunk_size): await f.write(chunk) def parse_plugin_json(plugin_json_path: str) -> SkillValidationResult: """Parse the plugin.json file for name and description Args: plugin_json_path: Path to the plugin.json file Returns: SkillValidationResult with validation result and error message if invalid """ try: with open(plugin_json_path, 'r', encoding='utf-8') as f: plugin_config = json.load(f) if not isinstance(plugin_config, dict): logger.warning(f"Invalid plugin.json format in {plugin_json_path}") return SkillValidationResult( valid=False, error_message="plugin.json 格式不正确:文件内容必须是一个 JSON 对象" ) # Check for required fields missing_fields = [] if 'name' not in plugin_config: missing_fields.append('name') if 'description' not in plugin_config: missing_fields.append('description') if missing_fields: logger.warning(f"Missing fields {missing_fields} in {plugin_json_path}") return SkillValidationResult( valid=False, error_message=f"plugin.json 缺少必需字段:请确保包含 {', '.join(missing_fields)} 字段" ) return SkillValidationResult( valid=True, name=plugin_config['name'], description=plugin_config['description'] ) except json.JSONDecodeError as e: logger.error(f"JSON parse error in {plugin_json_path}: {e}") return SkillValidationResult( valid=False, error_message="plugin.json 格式不正确:请确保文件是有效的 JSON 格式" ) except Exception as e: logger.error(f"Error parsing {plugin_json_path}: {e}") return SkillValidationResult( valid=False, error_message="读取 plugin.json 时发生未知错误,请检查文件权限或格式" ) def parse_skill_frontmatter(skill_md_path: str) -> SkillValidationResult: """Parse the YAML frontmatter from SKILL.md file Args: skill_md_path: Path to the SKILL.md file Returns: SkillValidationResult with validation result and error message if invalid """ try: with open(skill_md_path, 'r', encoding='utf-8') as f: content = f.read() # Match YAML frontmatter between --- delimiters frontmatter_match = re.match(r'^---\s*\n(.*?)\n---', content, re.DOTALL) if not frontmatter_match: logger.warning(f"No frontmatter found in {skill_md_path}") return SkillValidationResult( valid=False, error_message="SKILL.md 格式不正确:文件开头需要包含 YAML frontmatter(以 --- 开始和结束),并包含 name 和 description 字段" ) frontmatter = frontmatter_match.group(1) # Parse YAML using yaml.safe_load metadata = yaml.safe_load(frontmatter) if not isinstance(metadata, dict): logger.warning(f"Invalid frontmatter format in {skill_md_path}") return SkillValidationResult( valid=False, error_message="SKILL.md frontmatter 格式不正确:YAML 内容必须是一个对象" ) # Check for required fields missing_fields = [] if 'name' not in metadata: missing_fields.append('name') if 'description' not in metadata: missing_fields.append('description') if missing_fields: logger.warning(f"Missing fields {missing_fields} in {skill_md_path}") return SkillValidationResult( valid=False, error_message=f"SKILL.md 缺少必需字段:请确保 frontmatter 中包含 {', '.join(missing_fields)} 字段" ) return SkillValidationResult( valid=True, name=metadata['name'], description=metadata['description'] ) except yaml.YAMLError as e: logger.error(f"YAML parse error in {skill_md_path}: {e}") return SkillValidationResult( valid=False, error_message="SKILL.md frontmatter 格式不正确:请确保 YAML 格式有效" ) except Exception as e: logger.error(f"Error parsing {skill_md_path}: {e}") return SkillValidationResult( valid=False, error_message="读取 SKILL.md 时发生未知错误,请检查文件权限或格式" ) def get_skill_metadata(skill_path: str) -> SkillValidationResult: """Get skill metadata, trying plugin.json first, then SKILL.md Args: skill_path: Path to the skill directory Returns: SkillValidationResult with validation result and error message if invalid """ plugin_json_path = os.path.join(skill_path, '.claude-plugin', 'plugin.json') skill_md_path = os.path.join(skill_path, 'SKILL.md') has_plugin_json = os.path.exists(plugin_json_path) has_skill_md = os.path.exists(skill_md_path) # Check if at least one metadata file exists if not has_plugin_json and not has_skill_md: return SkillValidationResult( valid=False, error_message="Skill 格式不正确:请确保 skill 包含 SKILL.md 文件(包含 YAML frontmatter)或 .claude-plugin/plugin.json 文件" ) # Try plugin.json first if has_plugin_json: result = parse_plugin_json(plugin_json_path) if result.valid: return result # If plugin.json exists but is invalid, return its error # (unless SKILL.md also exists and might be valid) if not has_skill_md: return result # If both exist, prefer plugin.json error message skill_md_result = parse_skill_frontmatter(skill_md_path) if skill_md_result.valid: return skill_md_result # Both invalid, return plugin.json error return result # Fallback to SKILL.md if has_skill_md: return parse_skill_frontmatter(skill_md_path) return SkillValidationResult( valid=False, error_message="Skill 格式不正确:无法读取有效的元数据" ) def get_skill_metadata_legacy(skill_path: str) -> Optional[dict]: """Legacy function for backward compatibility - returns dict or None Args: skill_path: Path to the skill directory Returns: dict with 'name' and 'description' if found, None otherwise """ result = get_skill_metadata(skill_path) if result.valid: return { 'name': result.name, 'description': result.description } return None def get_official_skills(base_dir: str) -> List[SkillItem]: """Get all official skills from the skills directory Args: base_dir: Base directory of the project Returns: List of SkillItem objects """ skills = [] # Use SKILLS_DIR from settings, relative to base_dir if os.path.isabs(SKILLS_DIR): official_skills_dir = SKILLS_DIR else: official_skills_dir = os.path.join(base_dir, SKILLS_DIR) if not os.path.exists(official_skills_dir): logger.warning(f"Official skills directory not found: {official_skills_dir}") return skills for skill_name in os.listdir(official_skills_dir): skill_path = os.path.join(official_skills_dir, skill_name) if os.path.isdir(skill_path): metadata = get_skill_metadata_legacy(skill_path) if metadata: skills.append(SkillItem( name=metadata['name'], description=metadata['description'], user_skill=False )) logger.debug(f"Found official skill: {metadata['name']}") return skills def get_user_skills(base_dir: str, bot_id: str) -> List[SkillItem]: """Get all user uploaded skills for a specific bot Args: base_dir: Base directory of the project bot_id: Bot ID to look up user skills for Returns: List of SkillItem objects """ skills = [] user_skills_dir = os.path.join(base_dir, 'projects', 'uploads', bot_id, 'skills') if not os.path.exists(user_skills_dir): logger.info(f"No user skills directory found for bot {bot_id}: {user_skills_dir}") return skills for skill_name in os.listdir(user_skills_dir): skill_path = os.path.join(user_skills_dir, skill_name) if os.path.isdir(skill_path): metadata = get_skill_metadata_legacy(skill_path) if metadata: skills.append(SkillItem( name=metadata['name'], description=metadata['description'], user_skill=True )) logger.debug(f"Found user skill: {metadata['name']}") return skills @router.get("/api/v1/skill/list", response_model=SkillListResponse) async def list_skills( bot_id: str = Query(..., description="Bot ID to fetch user skills for") ): """ Get list of all available skills (official + user uploaded) Args: bot_id: Bot ID to fetch user uploaded skills for Returns: SkillListResponse containing all skills Notes: - Official skills are read from the /skills directory - User skills are read from /projects/uploads/{bot_id}/skills directory - User skills are marked with user_skill: true """ try: # Get the project base directory base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Get official skills official_skills = get_official_skills(base_dir) # Get user skills for the specific bot user_skills = get_user_skills(base_dir, bot_id) # Combine both lists (user skills first) all_skills = user_skills + official_skills logger.info(f"Found {len(official_skills)} official skills and {len(user_skills)} user skills for bot {bot_id}") return SkillListResponse( skills=all_skills, total=len(all_skills) ) except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in list_skills: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/api/v1/skill/upload") async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = Form(None)): """ Skill文件上传API接口,上传zip文件到 ./projects/uploads/ 目录下并自动解压 安全改进: - P1-001: ZipSlip 路径遍历防护 - 检查每个文件的解压路径 - P1-004: 文件大小限制 - 最大 50MB - P1-005: Zip 炸弹防护 - 检查压缩比例(最大 100:1)和解压后大小(最大 500MB) - P1-008: 异步 I/O - 使用 aiofiles 和 asyncio.to_thread Args: file: 上传的zip文件 bot_id: Bot ID,用于创建用户专属的skills目录 Returns: dict: 包含文件路径、解压信息的响应 Notes: - 仅支持.zip格式的skill文件 - 上传后会自动解压到 projects/uploads/{bot_id}/skills/{skill_name}/ 目录 - 文件大小限制: 50MB - 解压后大小限制: 500MB """ file_path = None # 初始化以便在异常处理中使用 try: # 验证 bot_id (P1-006 路径遍历防护) if not bot_id: raise HTTPException(status_code=400, detail="bot_id 不能为空") bot_id = validate_bot_id(bot_id) # 验证文件名 if not file.filename: raise HTTPException(status_code=400, detail="文件名不能为空") logger.info(f"Skill upload - bot_id: {bot_id}, filename: {file.filename}") # 验证是否为zip文件 original_filename = file.filename name_without_ext, file_extension = os.path.splitext(original_filename) if file_extension.lower() != '.zip': raise HTTPException(status_code=400, detail="仅支持上传.zip格式的skill文件") # P1-004: 验证文件大小(异步读取,不阻塞事件循环) file_size = await validate_upload_file_size(file) logger.info(f"File size: {file_size // 1024}KB") folder_name = name_without_ext # 创建上传目录(先保存 zip 文件) upload_dir = os.path.join("projects", "uploads", bot_id, "skill_zip") await asyncio.to_thread(os.makedirs, upload_dir, exist_ok=True) # 保存zip文件路径 file_path = os.path.join(upload_dir, original_filename) # P1-008: 异步保存文件(使用 aiofiles,不阻塞事件循环) await save_upload_file_async(file, file_path) logger.info(f"Saved zip file: {file_path}") # 统一解压到临时目录 tmp_extract_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp", folder_name) await asyncio.to_thread(os.makedirs, tmp_extract_dir, exist_ok=True) # P1-001, P1-005: 安全解压(防止 ZipSlip 和 zip 炸弹) await safe_extract_zip(file_path, tmp_extract_dir) logger.info(f"Extracted to tmp dir: {tmp_extract_dir}") # 清理 macOS 自动生成的 __MACOSX 目录 macosx_dir = os.path.join(tmp_extract_dir, "__MACOSX") if os.path.exists(macosx_dir): await asyncio.to_thread(shutil.rmtree, macosx_dir) logger.info(f"Cleaned up __MACOSX directory: {macosx_dir}") # 基于 skill 元数据文件位置检测结构 skill_structure = await asyncio.to_thread(detect_skill_structure, tmp_extract_dir) logger.info(f"Detected skill structure: {skill_structure}") skills_dir = os.path.join("projects", "uploads", bot_id, "skills") await asyncio.to_thread(os.makedirs, skills_dir, exist_ok=True) skill_dirs_to_validate = [] if skill_structure == "root": # 根目录直接包含 skill 元数据,整体作为一个 skill result = await asyncio.to_thread(get_skill_metadata, tmp_extract_dir) if result.valid and result.name: skill_name = result.name else: skill_name = folder_name target_dir = os.path.join(skills_dir, skill_name) # 如果目标已存在,先删除 if os.path.exists(target_dir): await asyncio.to_thread(shutil.rmtree, target_dir) await asyncio.to_thread(shutil.move, tmp_extract_dir, target_dir) skill_dirs_to_validate.append(target_dir) logger.info(f"Moved skill to: {target_dir}") elif skill_structure == "subdirs": # 第二级子目录包含 skill 元数据,逐个移动 for item in os.listdir(tmp_extract_dir): item_path = os.path.join(tmp_extract_dir, item) if not os.path.isdir(item_path) or item == '__MACOSX': continue if has_skill_metadata_files(item_path): result = await asyncio.to_thread(get_skill_metadata, item_path) if result.valid and result.name: skill_name = result.name else: skill_name = item target_dir = os.path.join(skills_dir, skill_name) if os.path.exists(target_dir): await asyncio.to_thread(shutil.rmtree, target_dir) await asyncio.to_thread(shutil.move, item_path, target_dir) skill_dirs_to_validate.append(target_dir) logger.info(f"Moved skill '{skill_name}' to: {target_dir}") # 清理临时目录 if os.path.exists(tmp_extract_dir): await asyncio.to_thread(shutil.rmtree, tmp_extract_dir) else: # unknown - 未找到有效的 skill 元数据 await asyncio.to_thread(shutil.rmtree, tmp_extract_dir) raise HTTPException( status_code=400, detail="Skill 格式不正确:请确保 skill 包含 SKILL.md 文件(包含 YAML frontmatter)或 .claude-plugin/plugin.json 文件" ) # 验证每个 skill 目录的格式 validation_errors = [] for skill_dir in skill_dirs_to_validate: validation_result = await asyncio.to_thread(get_skill_metadata, skill_dir) if not validation_result.valid: skill_dir_name = os.path.basename(skill_dir) validation_errors.append(f"{skill_dir_name}: {validation_result.error_message}") logger.warning(f"Skill format validation failed for {skill_dir}: {validation_result.error_message}") # 如果有验证错误,清理已解压的文件并返回错误 if validation_errors: for skill_dir in skill_dirs_to_validate: try: await asyncio.to_thread(shutil.rmtree, skill_dir) logger.info(f"Cleaned up invalid skill directory: {skill_dir}") except Exception as cleanup_error: logger.error(f"Failed to cleanup skill directory {skill_dir}: {cleanup_error}") if len(validation_errors) == 1: error_detail = validation_errors[0] else: error_detail = "多个 skill 格式验证失败:\n" + "\n".join(validation_errors) raise HTTPException(status_code=400, detail=error_detail) # 获取最终的 skill 名称 if len(skill_dirs_to_validate) == 1: final_extract_path = skill_dirs_to_validate[0] final_skill_name = os.path.basename(final_extract_path) else: final_extract_path = skills_dir final_skill_name = folder_name return { "success": True, "message": f"Skill文件上传并解压成功", "file_path": file_path, "extract_path": final_extract_path, "original_filename": original_filename, "skill_name": final_skill_name } except HTTPException: # 清理已上传的文件和临时目录 if file_path and os.path.exists(file_path): try: await asyncio.to_thread(os.remove, file_path) logger.info(f"Cleaned up file: {file_path}") except Exception as cleanup_error: logger.error(f"Failed to cleanup file: {cleanup_error}") tmp_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp") if bot_id else None if tmp_dir and os.path.exists(tmp_dir): try: await asyncio.to_thread(shutil.rmtree, tmp_dir) except Exception: pass raise except Exception as e: # 清理已上传的文件和临时目录 if file_path and os.path.exists(file_path): try: await asyncio.to_thread(os.remove, file_path) logger.info(f"Cleaned up file: {file_path}") except Exception as cleanup_error: logger.error(f"Failed to cleanup file: {cleanup_error}") tmp_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp") if bot_id else None if tmp_dir and os.path.exists(tmp_dir): try: await asyncio.to_thread(shutil.rmtree, tmp_dir) except Exception: pass logger.error(f"Error uploading skill file: {str(e)}") # 不暴露详细错误信息给客户端(安全考虑) raise HTTPException(status_code=500, detail="Skill文件上传失败") @router.delete("/api/v1/skill/remove") async def remove_skill( bot_id: str = Query(..., description="Bot ID"), skill_name: str = Query(..., description="Skill name to remove") ): """ 删除用户上传的 skill Args: bot_id: Bot ID skill_name: 要删除的 skill 名称 Returns: dict: 删除结果 Notes: - 只能删除用户上传的 skills,不能删除官方 skills - 删除路径: projects/uploads/{bot_id}/skills/{skill_name} """ try: # 验证参数(防止路径遍历攻击) bot_id = validate_bot_id(bot_id) skill_name = validate_skill_name(skill_name) logger.info(f"Skill remove - bot_id: {bot_id}, skill_name: {skill_name}") # 构建删除目录路径 base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) skill_dir = os.path.join(base_dir, "projects", "uploads", bot_id, "skills", skill_name) # 规范化路径并确保在允许的目录内 skill_dir_real = os.path.realpath(skill_dir) allowed_base = os.path.realpath(os.path.join(base_dir, "projects", "uploads", bot_id, "skills")) if not skill_dir_real.startswith(allowed_base + os.sep): raise HTTPException(status_code=403, detail="非法的删除路径") # 检查目录是否存在 if not os.path.exists(skill_dir_real): raise HTTPException(status_code=404, detail="Skill 不存在") # 检查是否为目录 if not os.path.isdir(skill_dir_real): raise HTTPException(status_code=400, detail="目标路径不是目录") # 使用线程池删除目录(避免阻塞事件循环) await asyncio.to_thread(shutil.rmtree, skill_dir_real) logger.info(f"Successfully removed skill directory: {skill_dir_real}") return { "success": True, "message": f"Skill '{skill_name}' 删除成功", "bot_id": bot_id, "skill_name": skill_name } except HTTPException: raise except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error removing skill: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail="删除 Skill 失败")