Merge branch 'bugfix/20260212-skill-format-validation' into master

This commit is contained in:
朱潮 2026-02-13 11:52:09 +08:00
commit 3655e0ca9b

View File

@ -1,11 +1,13 @@
import os
import re
import json
import shutil
import zipfile
import logging
import asyncio
import yaml
from typing import List, Optional
from dataclasses import dataclass
from fastapi import APIRouter, HTTPException, Query, UploadFile, File, Form
from pydantic import BaseModel
from utils.settings import SKILLS_DIR
@ -27,6 +29,15 @@ class SkillListResponse(BaseModel):
total: int
@dataclass
class SkillValidationResult:
"""Skill 格式验证结果"""
valid: bool
name: Optional[str] = None
description: Optional[str] = None
error_message: Optional[str] = None
# ============ 安全常量 ============
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB 最大上传文件大小
MAX_UNCOMPRESSED_SIZE = 500 * 1024 * 1024 # 500MB 解压后最大大小
@ -222,11 +233,11 @@ async def validate_and_rename_skill_folder(
for folder_name in os.listdir(extract_dir):
folder_path = os.path.join(extract_dir, folder_name)
if os.path.isdir(folder_path):
metadata = await asyncio.to_thread(
result = await asyncio.to_thread(
get_skill_metadata, folder_path
)
if metadata and 'name' in metadata:
expected_name = metadata['name']
if result.valid and result.name:
expected_name = result.name
if folder_name != expected_name:
new_folder_path = os.path.join(extract_dir, expected_name)
await asyncio.to_thread(
@ -238,11 +249,11 @@ async def validate_and_rename_skill_folder(
return extract_dir
else:
# zip 直接包含文件,检查当前目录的 metadata
metadata = await asyncio.to_thread(
result = await asyncio.to_thread(
get_skill_metadata, extract_dir
)
if metadata and 'name' in metadata:
expected_name = metadata['name']
if result.valid and result.name:
expected_name = result.name
# 获取当前文件夹名称
current_name = os.path.basename(extract_dir)
if current_name != expected_name:
@ -271,47 +282,68 @@ async def save_upload_file_async(file: UploadFile, destination: str) -> None:
await f.write(chunk)
def parse_plugin_json(plugin_json_path: str) -> Optional[dict]:
def parse_plugin_json(plugin_json_path: str) -> SkillValidationResult:
"""Parse the plugin.json file for name and description
Args:
plugin_json_path: Path to the plugin.json file
Returns:
dict with 'name' and 'description' if found, None otherwise
SkillValidationResult with validation result and error message if invalid
"""
try:
import json
with open(plugin_json_path, 'r', encoding='utf-8') as f:
plugin_config = json.load(f)
if not isinstance(plugin_config, dict):
logger.warning(f"Invalid plugin.json format in {plugin_json_path}")
return None
return SkillValidationResult(
valid=False,
error_message="plugin.json 格式不正确:文件内容必须是一个 JSON 对象"
)
# Return name and description if both exist
if 'name' in plugin_config and 'description' in plugin_config:
return {
'name': plugin_config['name'],
'description': plugin_config['description']
}
# Check for required fields
missing_fields = []
if 'name' not in plugin_config:
missing_fields.append('name')
if 'description' not in plugin_config:
missing_fields.append('description')
logger.warning(f"Missing name or description in {plugin_json_path}")
return None
if missing_fields:
logger.warning(f"Missing fields {missing_fields} in {plugin_json_path}")
return SkillValidationResult(
valid=False,
error_message=f"plugin.json 缺少必需字段:请确保包含 {', '.join(missing_fields)} 字段"
)
return SkillValidationResult(
valid=True,
name=plugin_config['name'],
description=plugin_config['description']
)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error in {plugin_json_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="plugin.json 格式不正确:请确保文件是有效的 JSON 格式"
)
except Exception as e:
logger.error(f"Error parsing {plugin_json_path}: {e}")
return None
return SkillValidationResult(
valid=False,
error_message="读取 plugin.json 时发生未知错误,请检查文件权限或格式"
)
def parse_skill_frontmatter(skill_md_path: str) -> Optional[dict]:
def parse_skill_frontmatter(skill_md_path: str) -> SkillValidationResult:
"""Parse the YAML frontmatter from SKILL.md file
Args:
skill_md_path: Path to the SKILL.md file
Returns:
dict with 'name' and 'description' if found, None otherwise
SkillValidationResult with validation result and error message if invalid
"""
try:
with open(skill_md_path, 'r', encoding='utf-8') as f:
@ -321,7 +353,10 @@ def parse_skill_frontmatter(skill_md_path: str) -> Optional[dict]:
frontmatter_match = re.match(r'^---\s*\n(.*?)\n---', content, re.DOTALL)
if not frontmatter_match:
logger.warning(f"No frontmatter found in {skill_md_path}")
return None
return SkillValidationResult(
valid=False,
error_message="SKILL.md 格式不正确:文件开头需要包含 YAML frontmatter以 --- 开始和结束),并包含 name 和 description 字段"
)
frontmatter = frontmatter_match.group(1)
@ -329,46 +364,108 @@ def parse_skill_frontmatter(skill_md_path: str) -> Optional[dict]:
metadata = yaml.safe_load(frontmatter)
if not isinstance(metadata, dict):
logger.warning(f"Invalid frontmatter format in {skill_md_path}")
return None
return SkillValidationResult(
valid=False,
error_message="SKILL.md frontmatter 格式不正确YAML 内容必须是一个对象"
)
# Return name and description if both exist
if 'name' in metadata and 'description' in metadata:
return {
'name': metadata['name'],
'description': metadata['description']
}
# Check for required fields
missing_fields = []
if 'name' not in metadata:
missing_fields.append('name')
if 'description' not in metadata:
missing_fields.append('description')
logger.warning(f"Missing name or description in {skill_md_path}")
return None
if missing_fields:
logger.warning(f"Missing fields {missing_fields} in {skill_md_path}")
return SkillValidationResult(
valid=False,
error_message=f"SKILL.md 缺少必需字段:请确保 frontmatter 中包含 {', '.join(missing_fields)} 字段"
)
return SkillValidationResult(
valid=True,
name=metadata['name'],
description=metadata['description']
)
except yaml.YAMLError as e:
logger.error(f"YAML parse error in {skill_md_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="SKILL.md frontmatter 格式不正确:请确保 YAML 格式有效"
)
except Exception as e:
logger.error(f"Error parsing {skill_md_path}: {e}")
return None
return SkillValidationResult(
valid=False,
error_message="读取 SKILL.md 时发生未知错误,请检查文件权限或格式"
)
def get_skill_metadata(skill_path: str) -> Optional[dict]:
def get_skill_metadata(skill_path: str) -> SkillValidationResult:
"""Get skill metadata, trying plugin.json first, then SKILL.md
Args:
skill_path: Path to the skill directory
Returns:
SkillValidationResult with validation result and error message if invalid
"""
plugin_json_path = os.path.join(skill_path, '.claude-plugin', 'plugin.json')
skill_md_path = os.path.join(skill_path, 'SKILL.md')
has_plugin_json = os.path.exists(plugin_json_path)
has_skill_md = os.path.exists(skill_md_path)
# Check if at least one metadata file exists
if not has_plugin_json and not has_skill_md:
return SkillValidationResult(
valid=False,
error_message="Skill 格式不正确:请确保 skill 包含 SKILL.md 文件(包含 YAML frontmatter或 .claude-plugin/plugin.json 文件"
)
# Try plugin.json first
if has_plugin_json:
result = parse_plugin_json(plugin_json_path)
if result.valid:
return result
# If plugin.json exists but is invalid, return its error
# (unless SKILL.md also exists and might be valid)
if not has_skill_md:
return result
# If both exist, prefer plugin.json error message
skill_md_result = parse_skill_frontmatter(skill_md_path)
if skill_md_result.valid:
return skill_md_result
# Both invalid, return plugin.json error
return result
# Fallback to SKILL.md
if has_skill_md:
return parse_skill_frontmatter(skill_md_path)
return SkillValidationResult(
valid=False,
error_message="Skill 格式不正确:无法读取有效的元数据"
)
def get_skill_metadata_legacy(skill_path: str) -> Optional[dict]:
"""Legacy function for backward compatibility - returns dict or None
Args:
skill_path: Path to the skill directory
Returns:
dict with 'name' and 'description' if found, None otherwise
"""
# Try plugin.json first
plugin_json_path = os.path.join(skill_path, '.claude-plugin', 'plugin.json')
if os.path.exists(plugin_json_path):
metadata = parse_plugin_json(plugin_json_path)
if metadata:
return metadata
# Fallback to SKILL.md
skill_md_path = os.path.join(skill_path, 'SKILL.md')
if os.path.exists(skill_md_path):
metadata = parse_skill_frontmatter(skill_md_path)
if metadata:
return metadata
result = get_skill_metadata(skill_path)
if result.valid:
return {
'name': result.name,
'description': result.description
}
return None
@ -395,7 +492,7 @@ def get_official_skills(base_dir: str) -> List[SkillItem]:
for skill_name in os.listdir(official_skills_dir):
skill_path = os.path.join(official_skills_dir, skill_name)
if os.path.isdir(skill_path):
metadata = get_skill_metadata(skill_path)
metadata = get_skill_metadata_legacy(skill_path)
if metadata:
skills.append(SkillItem(
name=metadata['name'],
@ -427,7 +524,7 @@ def get_user_skills(base_dir: str, bot_id: str) -> List[SkillItem]:
for skill_name in os.listdir(user_skills_dir):
skill_path = os.path.join(user_skills_dir, skill_name)
if os.path.isdir(skill_path):
metadata = get_skill_metadata(skill_path)
metadata = get_skill_metadata_legacy(skill_path)
if metadata:
skills.append(SkillItem(
name=metadata['name'],
@ -575,6 +672,45 @@ async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = For
extract_target, has_top_level_dirs
)
# 验证 skill 格式
# 如果 zip 包含多个顶<E4B8AA><E9A1B6><EFBFBD>目录需要验证每个目录
skill_dirs_to_validate = []
if has_top_level_dirs:
# 获取所有解压后的 skill 目录
for item in os.listdir(final_extract_path):
item_path = os.path.join(final_extract_path, item)
if os.path.isdir(item_path):
skill_dirs_to_validate.append(item_path)
else:
skill_dirs_to_validate.append(final_extract_path)
# 验证每个 skill 目录的格式
validation_errors = []
for skill_dir in skill_dirs_to_validate:
validation_result = await asyncio.to_thread(get_skill_metadata, skill_dir)
if not validation_result.valid:
skill_dir_name = os.path.basename(skill_dir)
validation_errors.append(f"{skill_dir_name}: {validation_result.error_message}")
logger.warning(f"Skill format validation failed for {skill_dir}: {validation_result.error_message}")
# 如果有验证错误,清理已解压的文件并返回错误
if validation_errors:
# 清理解压的目录
for skill_dir in skill_dirs_to_validate:
try:
await asyncio.to_thread(shutil.rmtree, skill_dir)
logger.info(f"Cleaned up invalid skill directory: {skill_dir}")
except Exception as cleanup_error:
logger.error(f"Failed to cleanup skill directory {skill_dir}: {cleanup_error}")
# 如果只有一个错误,直接返回该错误
if len(validation_errors) == 1:
error_detail = validation_errors[0]
else:
error_detail = "多个 skill 格式验证失败:\n" + "\n".join(validation_errors)
raise HTTPException(status_code=400, detail=error_detail)
# 获取最终的 skill 名称
if has_top_level_dirs:
final_skill_name = folder_name