refactor: 改用基于元数据文件位置的 skill 结构检测

将 skill 上传检测逻辑从基于 zip 文件结构(是否有顶级目录)改为基于
SKILL.md 和 .claude-plugin/plugin.json 的实际位置:先检查解压根目录,
再检查第二级子目录。统一解压到临时目录后再按结构移动到 skills 目录。

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
朱潮 2026-03-26 15:30:17 +08:00
parent e13405ba29
commit 1b4fcb3d71

View File

@ -95,43 +95,53 @@ async def validate_upload_file_size(file: UploadFile) -> int:
return file_size return file_size
def detect_zip_has_top_level_dirs(zip_path: str) -> bool:
"""检测 zip 文件是否包含顶级目录(而非直接包含文件) def has_skill_metadata_files(dir_path: str) -> bool:
"""检查目录是否包含 skill 元数据文件SKILL.md 或 .claude-plugin/plugin.json
Args: Args:
zip_path: zip 文件路径 dir_path: 要检查的目录路径
Returns: Returns:
bool: 如果 zip 包含顶级目录则返回 True bool: 如果包含元数据文件则返回 True
""" """
skill_md = os.path.join(dir_path, 'SKILL.md')
plugin_json = os.path.join(dir_path, '.claude-plugin', 'plugin.json')
return os.path.exists(skill_md) or os.path.exists(plugin_json)
def detect_skill_structure(extract_dir: str) -> str:
"""检测解压后目录中 skill 元数据的位置
优先检查根目录是否直接包含 SKILL.md .claude-plugin/plugin.json
如果没有再检查第二级子目录
Args:
extract_dir: 解压后的目录路径
Returns:
"root" - 根目录直接包含 SKILL.md .claude-plugin/plugin.json
"subdirs" - 第二级子目录包含 skill 元数据
"unknown" - 未找到有效的 skill 元数据
"""
# 第一步:检查根目录
if has_skill_metadata_files(extract_dir):
logger.info(f"Skill metadata found at root level: {extract_dir}")
return "root"
# 第二步:检查第二级子目录
try: try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref: for item in os.listdir(extract_dir):
# 获取所有顶级路径(第一层目录/文件) item_path = os.path.join(extract_dir, item)
top_level_paths = set() if os.path.isdir(item_path) and item != '__MACOSX':
for name in zip_ref.namelist(): if has_skill_metadata_files(item_path):
# 跳过空目录项(以 / 结尾的空路径) logger.info(f"Skill metadata found in subdirectory: {item}")
if not name or name == '/': return "subdirs"
continue except OSError as e:
# 提取顶级路径(第一层) logger.warning(f"Error scanning directory {extract_dir}: {e}")
parts = name.split('/')
if parts[0]: # 忽略空字符串
top_level_paths.add(parts[0])
logger.info(f"Zip top-level paths: {top_level_paths}") logger.warning(f"No skill metadata found in {extract_dir}")
return "unknown"
# 检查是否有目录(目录项以 / 结尾,或路径中包含 /
for path in top_level_paths:
# 如果路径中包含 /,说明是目录
# 或者检查 namelist 中是否有以该路径/ 开头的项
for full_name in zip_ref.namelist():
if full_name.startswith(f"{path}/"):
return True
return False
except Exception as e:
logger.warning(f"Error detecting zip structure: {e}")
return False
async def safe_extract_zip(zip_path: str, extract_dir: str) -> None: async def safe_extract_zip(zip_path: str, extract_dir: str) -> None:
@ -211,67 +221,6 @@ async def safe_extract_zip(zip_path: str, extract_dir: str) -> None:
raise HTTPException(status_code=400, detail=f"无效的 zip 文件: {str(e)}") raise HTTPException(status_code=400, detail=f"无效的 zip 文件: {str(e)}")
async def validate_and_rename_skill_folder(
extract_dir: str,
has_top_level_dirs: bool
) -> str:
"""验证并重命名解压后的 skill 文件夹
检查解压后文件夹名称是否与 skill metadata (plugin.json SKILL.md) 中的 name 匹配
如果不匹配则重命名文件夹
Args:
extract_dir: 解压目标目录
has_top_level_dirs: zip 是否包含顶级目录
Returns:
str: 最终的解压路径可能因为重命名而改变
"""
try:
if has_top_level_dirs:
# zip 包含目录,检查每个目录
for folder_name in os.listdir(extract_dir):
folder_path = os.path.join(extract_dir, folder_name)
if os.path.isdir(folder_path):
result = await asyncio.to_thread(
get_skill_metadata, folder_path
)
if result.valid and result.name:
expected_name = result.name
if folder_name != expected_name:
new_folder_path = os.path.join(extract_dir, expected_name)
await asyncio.to_thread(
shutil.move, folder_path, new_folder_path
)
logger.info(
f"Renamed skill folder: {folder_name} -> {expected_name}"
)
return extract_dir
else:
# zip 直接包含文件,检查当前目录的 metadata
result = await asyncio.to_thread(
get_skill_metadata, extract_dir
)
if result.valid and result.name:
expected_name = result.name
# 获取当前文件夹名称
current_name = os.path.basename(extract_dir)
if current_name != expected_name:
parent_dir = os.path.dirname(extract_dir)
new_folder_path = os.path.join(parent_dir, expected_name)
await asyncio.to_thread(
shutil.move, extract_dir, new_folder_path
)
logger.info(
f"Renamed skill folder: {current_name} -> {expected_name}"
)
return new_folder_path
return extract_dir
except Exception as e:
logger.warning(f"Failed to validate/rename skill folder: {e}")
# 不抛出异常,允许上传继续
return extract_dir
async def save_upload_file_async(file: UploadFile, destination: str) -> None: async def save_upload_file_async(file: UploadFile, destination: str) -> None:
@ -644,51 +593,73 @@ async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = For
await save_upload_file_async(file, file_path) await save_upload_file_async(file, file_path)
logger.info(f"Saved zip file: {file_path}") logger.info(f"Saved zip file: {file_path}")
# 检测 zip 文件结构:是否包含顶级目录 # 统一解压到临时目录
has_top_level_dirs = await asyncio.to_thread( tmp_extract_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp", folder_name)
detect_zip_has_top_level_dirs, file_path await asyncio.to_thread(os.makedirs, tmp_extract_dir, exist_ok=True)
)
logger.info(f"Zip contains top-level directories: {has_top_level_dirs}")
# 根据检测结果决定解压目标目录
if has_top_level_dirs:
# zip 包含目录(如 a-skill/, b-skill/),解压到 skills/ 目录
extract_target = os.path.join("projects", "uploads", bot_id, "skills")
logger.info(f"Detected directories in zip, extracting to: {extract_target}")
else:
# zip 直接包含文件,解压到 skills/{folder_name}/ 目录
extract_target = os.path.join("projects", "uploads", bot_id, "skills", folder_name)
logger.info(f"No directories in zip, extracting to: {extract_target}")
# 使用线程池避免阻塞
await asyncio.to_thread(os.makedirs, extract_target, exist_ok=True)
# P1-001, P1-005: 安全解压(防止 ZipSlip 和 zip 炸弹) # P1-001, P1-005: 安全解压(防止 ZipSlip 和 zip 炸弹)
await safe_extract_zip(file_path, extract_target) await safe_extract_zip(file_path, tmp_extract_dir)
logger.info(f"Extracted to: {extract_target}") logger.info(f"Extracted to tmp dir: {tmp_extract_dir}")
# 清理 macOS 自动生成的 __MACOSX 目录 # 清理 macOS 自动生成的 __MACOSX 目录
macosx_dir = os.path.join(extract_target, "__MACOSX") macosx_dir = os.path.join(tmp_extract_dir, "__MACOSX")
if os.path.exists(macosx_dir): if os.path.exists(macosx_dir):
await asyncio.to_thread(shutil.rmtree, macosx_dir) await asyncio.to_thread(shutil.rmtree, macosx_dir)
logger.info(f"Cleaned up __MACOSX directory: {macosx_dir}") logger.info(f"Cleaned up __MACOSX directory: {macosx_dir}")
# 验证并重命名文件夹以匹配 SKILL.md 中的 name # 基于 skill 元数据文件位置检测结构
final_extract_path = await validate_and_rename_skill_folder( skill_structure = await asyncio.to_thread(detect_skill_structure, tmp_extract_dir)
extract_target, has_top_level_dirs logger.info(f"Detected skill structure: {skill_structure}")
)
skills_dir = os.path.join("projects", "uploads", bot_id, "skills")
await asyncio.to_thread(os.makedirs, skills_dir, exist_ok=True)
# 验证 skill 格式
# 如果 zip 包含多个顶<E4B8AA><E9A1B6><EFBFBD>目录需要验证每个目录
skill_dirs_to_validate = [] skill_dirs_to_validate = []
if has_top_level_dirs:
# 获取所有解压后的 skill 目录 if skill_structure == "root":
for item in os.listdir(final_extract_path): # 根目录直接包含 skill 元数据,整体作为一个 skill
item_path = os.path.join(final_extract_path, item) result = await asyncio.to_thread(get_skill_metadata, tmp_extract_dir)
if os.path.isdir(item_path): if result.valid and result.name:
skill_dirs_to_validate.append(item_path) skill_name = result.name
else:
skill_name = folder_name
target_dir = os.path.join(skills_dir, skill_name)
# 如果目标已存在,先删除
if os.path.exists(target_dir):
await asyncio.to_thread(shutil.rmtree, target_dir)
await asyncio.to_thread(shutil.move, tmp_extract_dir, target_dir)
skill_dirs_to_validate.append(target_dir)
logger.info(f"Moved skill to: {target_dir}")
elif skill_structure == "subdirs":
# 第二级子目录包含 skill 元数据,逐个移动
for item in os.listdir(tmp_extract_dir):
item_path = os.path.join(tmp_extract_dir, item)
if not os.path.isdir(item_path) or item == '__MACOSX':
continue
if has_skill_metadata_files(item_path):
result = await asyncio.to_thread(get_skill_metadata, item_path)
if result.valid and result.name:
skill_name = result.name
else:
skill_name = item
target_dir = os.path.join(skills_dir, skill_name)
if os.path.exists(target_dir):
await asyncio.to_thread(shutil.rmtree, target_dir)
await asyncio.to_thread(shutil.move, item_path, target_dir)
skill_dirs_to_validate.append(target_dir)
logger.info(f"Moved skill '{skill_name}' to: {target_dir}")
# 清理临时目录
if os.path.exists(tmp_extract_dir):
await asyncio.to_thread(shutil.rmtree, tmp_extract_dir)
else: else:
skill_dirs_to_validate.append(final_extract_path) # unknown - 未找到有效的 skill 元数据
await asyncio.to_thread(shutil.rmtree, tmp_extract_dir)
raise HTTPException(
status_code=400,
detail="Skill 格式不正确:请确保 skill 包含 SKILL.md 文件(包含 YAML frontmatter或 .claude-plugin/plugin.json 文件"
)
# 验证每个 skill 目录的格式 # 验证每个 skill 目录的格式
validation_errors = [] validation_errors = []
@ -701,7 +672,6 @@ async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = For
# 如果有验证错误,清理已解压的文件并返回错误 # 如果有验证错误,清理已解压的文件并返回错误
if validation_errors: if validation_errors:
# 清理解压的目录
for skill_dir in skill_dirs_to_validate: for skill_dir in skill_dirs_to_validate:
try: try:
await asyncio.to_thread(shutil.rmtree, skill_dir) await asyncio.to_thread(shutil.rmtree, skill_dir)
@ -709,7 +679,6 @@ async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = For
except Exception as cleanup_error: except Exception as cleanup_error:
logger.error(f"Failed to cleanup skill directory {skill_dir}: {cleanup_error}") logger.error(f"Failed to cleanup skill directory {skill_dir}: {cleanup_error}")
# 如果只有一个错误,直接返回该错误
if len(validation_errors) == 1: if len(validation_errors) == 1:
error_detail = validation_errors[0] error_detail = validation_errors[0]
else: else:
@ -718,10 +687,12 @@ async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = For
raise HTTPException(status_code=400, detail=error_detail) raise HTTPException(status_code=400, detail=error_detail)
# 获取最终的 skill 名称 # 获取最终的 skill 名称
if has_top_level_dirs: if len(skill_dirs_to_validate) == 1:
final_skill_name = folder_name final_extract_path = skill_dirs_to_validate[0]
else:
final_skill_name = os.path.basename(final_extract_path) final_skill_name = os.path.basename(final_extract_path)
else:
final_extract_path = skills_dir
final_skill_name = folder_name
return { return {
"success": True, "success": True,
@ -733,23 +704,35 @@ async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = For
} }
except HTTPException: except HTTPException:
# 清理已上传的文件 # 清理已上传的文件和临时目录
if file_path and os.path.exists(file_path): if file_path and os.path.exists(file_path):
try: try:
await asyncio.to_thread(os.remove, file_path) await asyncio.to_thread(os.remove, file_path)
logger.info(f"Cleaned up file: {file_path}") logger.info(f"Cleaned up file: {file_path}")
except Exception as cleanup_error: except Exception as cleanup_error:
logger.error(f"Failed to cleanup file: {cleanup_error}") logger.error(f"Failed to cleanup file: {cleanup_error}")
tmp_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp") if bot_id else None
if tmp_dir and os.path.exists(tmp_dir):
try:
await asyncio.to_thread(shutil.rmtree, tmp_dir)
except Exception:
pass
raise raise
except Exception as e: except Exception as e:
# 清理已上传的文件 # 清理已上传的文件和临时目录
if file_path and os.path.exists(file_path): if file_path and os.path.exists(file_path):
try: try:
await asyncio.to_thread(os.remove, file_path) await asyncio.to_thread(os.remove, file_path)
logger.info(f"Cleaned up file: {file_path}") logger.info(f"Cleaned up file: {file_path}")
except Exception as cleanup_error: except Exception as cleanup_error:
logger.error(f"Failed to cleanup file: {cleanup_error}") logger.error(f"Failed to cleanup file: {cleanup_error}")
tmp_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp") if bot_id else None
if tmp_dir and os.path.exists(tmp_dir):
try:
await asyncio.to_thread(shutil.rmtree, tmp_dir)
except Exception:
pass
logger.error(f"Error uploading skill file: {str(e)}") logger.error(f"Error uploading skill file: {str(e)}")
# 不暴露详细错误信息给客户端(安全考虑) # 不暴露详细错误信息给客户端(安全考虑)