diff --git a/routes/skill_manager.py b/routes/skill_manager.py index d6e61f3..72abb07 100644 --- a/routes/skill_manager.py +++ b/routes/skill_manager.py @@ -95,46 +95,53 @@ async def validate_upload_file_size(file: UploadFile) -> int: return file_size -def detect_zip_has_top_level_dirs(zip_path: str) -> bool: - """检测 zip 文件是否包含顶级目录(而非直接包含文件) + +def has_skill_metadata_files(dir_path: str) -> bool: + """检查目录是否包含 skill 元数据文件(SKILL.md 或 .claude-plugin/plugin.json) Args: - zip_path: zip 文件路径 + dir_path: 要检查的目录路径 Returns: - bool: 如果 zip 包含顶级目录则返回 True + bool: 如果包含元数据文件则返回 True """ + skill_md = os.path.join(dir_path, 'SKILL.md') + plugin_json = os.path.join(dir_path, '.claude-plugin', 'plugin.json') + return os.path.exists(skill_md) or os.path.exists(plugin_json) + + +def detect_skill_structure(extract_dir: str) -> str: + """检测解压后目录中 skill 元数据的位置 + + 优先检查根目录是否直接包含 SKILL.md 或 .claude-plugin/plugin.json, + 如果没有,再检查第二级子目录。 + + Args: + extract_dir: 解压后的目录路径 + + Returns: + "root" - 根目录直接包含 SKILL.md 或 .claude-plugin/plugin.json + "subdirs" - 第二级子目录包含 skill 元数据 + "unknown" - 未找到有效的 skill 元数据 + """ + # 第一步:检查根目录 + if has_skill_metadata_files(extract_dir): + logger.info(f"Skill metadata found at root level: {extract_dir}") + return "root" + + # 第二步:检查第二级子目录 try: - with zipfile.ZipFile(zip_path, 'r') as zip_ref: - # 获取所有顶级路径(第一层目录/文件) - top_level_paths = set() - for name in zip_ref.namelist(): - # 跳过空目录项(以 / 结尾的空路径) - if not name or name == '/': - continue - # 跳过 macOS 系统文件夹 - if name.startswith('__MACOSX') or name.startswith('.'): - continue - # 提取顶级路径(第一层) - parts = name.split('/') - if parts[0]: # 忽略空字符串 - top_level_paths.add(parts[0]) + for item in os.listdir(extract_dir): + item_path = os.path.join(extract_dir, item) + if os.path.isdir(item_path) and item != '__MACOSX': + if has_skill_metadata_files(item_path): + logger.info(f"Skill metadata found in subdirectory: {item}") + return "subdirs" + except OSError as e: + logger.warning(f"Error scanning directory {extract_dir}: {e}") - logger.info(f"Zip top-level paths: {top_level_paths}") - - # 检查是否有目录(目录项以 / 结尾,或路径中包含 /) - for path in top_level_paths: - # 如果路径中包含 /,说明是目录 - # 或者检查 namelist 中是否有以该路径/ 开头的项 - for full_name in zip_ref.namelist(): - if full_name.startswith(f"{path}/"): - return True - - return False - - except Exception as e: - logger.warning(f"Error detecting zip structure: {e}") - return False + logger.warning(f"No skill metadata found in {extract_dir}") + return "unknown" async def safe_extract_zip(zip_path: str, extract_dir: str) -> None: @@ -214,71 +221,6 @@ async def safe_extract_zip(zip_path: str, extract_dir: str) -> None: raise HTTPException(status_code=400, detail=f"无效的 zip 文件: {str(e)}") -async def validate_and_rename_skill_folder( - extract_dir: str, - has_top_level_dirs: bool -) -> str: - """验证并重命名解压后的 skill 文件夹 - - 检查解压后文件夹名称是否与 skill metadata (plugin.json 或 SKILL.md) 中的 name 匹配, - 如果不匹配则重命名文件夹。 - - Args: - extract_dir: 解压目标目录 - has_top_level_dirs: zip 是否包含顶级目录 - - Returns: - str: 最终的解压路径(可能因为重命名而改变) - """ - try: - if has_top_level_dirs: - # zip 包含目录,检查每个目录 - for folder_name in os.listdir(extract_dir): - # 跳过 macOS 系统文件夹和隐藏文件夹 - if folder_name.startswith('__MACOSX') or folder_name.startswith('.'): - continue - folder_path = os.path.join(extract_dir, folder_name) - if os.path.isdir(folder_path): - result = await asyncio.to_thread( - get_skill_metadata, folder_path - ) - if result.valid and result.name: - expected_name = result.name - if folder_name != expected_name: - new_folder_path = os.path.join(extract_dir, expected_name) - await asyncio.to_thread( - shutil.move, folder_path, new_folder_path - ) - logger.info( - f"Renamed skill folder: {folder_name} -> {expected_name}" - ) - return extract_dir - else: - # zip 直接包含文件,检查当前目录的 metadata - result = await asyncio.to_thread( - get_skill_metadata, extract_dir - ) - if result.valid and result.name: - expected_name = result.name - # 获取当前文件夹名称 - current_name = os.path.basename(extract_dir) - if current_name != expected_name: - parent_dir = os.path.dirname(extract_dir) - new_folder_path = os.path.join(parent_dir, expected_name) - await asyncio.to_thread( - shutil.move, extract_dir, new_folder_path - ) - logger.info( - f"Renamed skill folder: {current_name} -> {expected_name}" - ) - return new_folder_path - return extract_dir - - except Exception as e: - logger.warning(f"Failed to validate/rename skill folder: {e}") - # 不抛出异常,允许上传继续 - return extract_dir - async def save_upload_file_async(file: UploadFile, destination: str) -> None: """异步保存上传文件到目标路径""" @@ -650,54 +592,74 @@ async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = For await save_upload_file_async(file, file_path) logger.info(f"Saved zip file: {file_path}") - # 检测 zip 文件结构:是否包含顶级目录 - has_top_level_dirs = await asyncio.to_thread( - detect_zip_has_top_level_dirs, file_path - ) - logger.info(f"Zip contains top-level directories: {has_top_level_dirs}") - - # 根据检测结果决定解压目标目录 - if has_top_level_dirs: - # zip 包含目录(如 a-skill/, b-skill/),解压到 skills/ 目录 - extract_target = os.path.join("projects", "uploads", bot_id, "skills") - logger.info(f"Detected directories in zip, extracting to: {extract_target}") - else: - # zip 直接包含文件,解压到 skills/{folder_name}/ 目录 - extract_target = os.path.join("projects", "uploads", bot_id, "skills", folder_name) - logger.info(f"No directories in zip, extracting to: {extract_target}") - - # 使用线程池避免阻塞 - await asyncio.to_thread(os.makedirs, extract_target, exist_ok=True) + # 统一解压到临时目录 + tmp_extract_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp", folder_name) + await asyncio.to_thread(os.makedirs, tmp_extract_dir, exist_ok=True) # P1-001, P1-005: 安全解压(防止 ZipSlip 和 zip 炸弹) - await safe_extract_zip(file_path, extract_target) - logger.info(f"Extracted to: {extract_target}") + await safe_extract_zip(file_path, tmp_extract_dir) + logger.info(f"Extracted to tmp dir: {tmp_extract_dir}") # 清理 macOS 自动生成的 __MACOSX 目录 - macosx_dir = os.path.join(extract_target, "__MACOSX") + macosx_dir = os.path.join(tmp_extract_dir, "__MACOSX") if os.path.exists(macosx_dir): await asyncio.to_thread(shutil.rmtree, macosx_dir) logger.info(f"Cleaned up __MACOSX directory: {macosx_dir}") - # 验证并重命名文件夹以匹配 SKILL.md 中的 name - final_extract_path = await validate_and_rename_skill_folder( - extract_target, has_top_level_dirs - ) + # 基于 skill 元数据文件位置检测结构 + skill_structure = await asyncio.to_thread(detect_skill_structure, tmp_extract_dir) + logger.info(f"Detected skill structure: {skill_structure}") + + skills_dir = os.path.join("projects", "uploads", bot_id, "skills") + await asyncio.to_thread(os.makedirs, skills_dir, exist_ok=True) - # 验证 skill 格式 - # 如果 zip 包含多个顶���目录,需要验证每个目录 skill_dirs_to_validate = [] - if has_top_level_dirs: - # 获取所有解压后的 skill 目录 - for item in os.listdir(final_extract_path): - # 跳过 macOS 系统文件夹和隐藏文件夹 - if item.startswith('__MACOSX') or item.startswith('.'): + + + if skill_structure == "root": + # 根目录直接包含 skill 元数据,整体作为一个 skill + result = await asyncio.to_thread(get_skill_metadata, tmp_extract_dir) + if result.valid and result.name: + skill_name = result.name + else: + skill_name = folder_name + target_dir = os.path.join(skills_dir, skill_name) + # 如果目标已存在,先删除 + if os.path.exists(target_dir): + await asyncio.to_thread(shutil.rmtree, target_dir) + await asyncio.to_thread(shutil.move, tmp_extract_dir, target_dir) + skill_dirs_to_validate.append(target_dir) + logger.info(f"Moved skill to: {target_dir}") + + elif skill_structure == "subdirs": + # 第二级子目录包含 skill 元数据,逐个移动 + for item in os.listdir(tmp_extract_dir): + item_path = os.path.join(tmp_extract_dir, item) + if not os.path.isdir(item_path) or item == '__MACOSX': continue - item_path = os.path.join(final_extract_path, item) - if os.path.isdir(item_path): - skill_dirs_to_validate.append(item_path) + if has_skill_metadata_files(item_path): + result = await asyncio.to_thread(get_skill_metadata, item_path) + if result.valid and result.name: + skill_name = result.name + else: + skill_name = item + target_dir = os.path.join(skills_dir, skill_name) + if os.path.exists(target_dir): + await asyncio.to_thread(shutil.rmtree, target_dir) + await asyncio.to_thread(shutil.move, item_path, target_dir) + skill_dirs_to_validate.append(target_dir) + logger.info(f"Moved skill '{skill_name}' to: {target_dir}") + # 清理临时目录 + if os.path.exists(tmp_extract_dir): + await asyncio.to_thread(shutil.rmtree, tmp_extract_dir) + else: - skill_dirs_to_validate.append(final_extract_path) + # unknown - 未找到有效的 skill 元数据 + await asyncio.to_thread(shutil.rmtree, tmp_extract_dir) + raise HTTPException( + status_code=400, + detail="Skill 格式不正确:请确保 skill 包含 SKILL.md 文件(包含 YAML frontmatter)或 .claude-plugin/plugin.json 文件" + ) # 验证每个 skill 目录的格式 validation_errors = [] @@ -710,7 +672,6 @@ async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = For # 如果有验证错误,清理已解压的文件并返回错误 if validation_errors: - # 清理解压的目录 for skill_dir in skill_dirs_to_validate: try: await asyncio.to_thread(shutil.rmtree, skill_dir) @@ -718,7 +679,6 @@ async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = For except Exception as cleanup_error: logger.error(f"Failed to cleanup skill directory {skill_dir}: {cleanup_error}") - # 如果只有一个错误,直接返回该错误 if len(validation_errors) == 1: error_detail = validation_errors[0] else: @@ -727,10 +687,12 @@ async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = For raise HTTPException(status_code=400, detail=error_detail) # 获取最终的 skill 名称 - if has_top_level_dirs: - final_skill_name = folder_name - else: + if len(skill_dirs_to_validate) == 1: + final_extract_path = skill_dirs_to_validate[0] final_skill_name = os.path.basename(final_extract_path) + else: + final_extract_path = skills_dir + final_skill_name = folder_name return { "success": True, @@ -742,23 +704,35 @@ async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = For } except HTTPException: - # 清理已上传的文件 + # 清理已上传的文件和临时目录 if file_path and os.path.exists(file_path): try: await asyncio.to_thread(os.remove, file_path) logger.info(f"Cleaned up file: {file_path}") except Exception as cleanup_error: logger.error(f"Failed to cleanup file: {cleanup_error}") + tmp_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp") if bot_id else None + if tmp_dir and os.path.exists(tmp_dir): + try: + await asyncio.to_thread(shutil.rmtree, tmp_dir) + except Exception: + pass raise except Exception as e: - # 清理已上传的文件 + # 清理已上传的文件和临时目录 if file_path and os.path.exists(file_path): try: await asyncio.to_thread(os.remove, file_path) logger.info(f"Cleaned up file: {file_path}") except Exception as cleanup_error: logger.error(f"Failed to cleanup file: {cleanup_error}") + tmp_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp") if bot_id else None + if tmp_dir and os.path.exists(tmp_dir): + try: + await asyncio.to_thread(shutil.rmtree, tmp_dir) + except Exception: + pass logger.error(f"Error uploading skill file: {str(e)}") # 不暴露详细错误信息给客户端(安全考虑)