Compare commits

..

No commits in common. "6befae11fb80b07d93271474e1729d2e9baea955" and "423fdc5c0cce69dbb7d97b02afce2be6489ee521" have entirely different histories.

2 changed files with 51 additions and 187 deletions

View File

@ -66,7 +66,7 @@ class LoggingCallbackHandler(BaseCallbackHandler):
tool_name = 'unknown_tool'
else:
tool_name = serialized.get('name', 'unknown_tool')
self.logger.info(f"🔧 Tool Start - {tool_name} with input: {str(input_str)[:300]}")
self.logger.info(f"🔧 Tool Start - {tool_name} with input: {str(input_str)[:100]}")
def on_tool_end(self, output: str, **kwargs: Any) -> None:
"""当工具调用结束时调用"""
@ -76,4 +76,4 @@ class LoggingCallbackHandler(BaseCallbackHandler):
self, error: Exception, **kwargs: Any
) -> None:
"""当工具调用出错时调用"""
self.logger.error(f"❌ Tool Error: {error}")
self.logger.error(f"❌ Tool Error: {error}")

View File

@ -1,13 +1,11 @@
import os
import re
import json
import shutil
import zipfile
import logging
import asyncio
import yaml
from typing import List, Optional
from dataclasses import dataclass
from fastapi import APIRouter, HTTPException, Query, UploadFile, File, Form
from pydantic import BaseModel
from utils.settings import SKILLS_DIR
@ -29,15 +27,6 @@ class SkillListResponse(BaseModel):
total: int
@dataclass
class SkillValidationResult:
"""Skill 格式验证结果"""
valid: bool
name: Optional[str] = None
description: Optional[str] = None
error_message: Optional[str] = None
# ============ 安全常量 ============
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB 最大上传文件大小
MAX_UNCOMPRESSED_SIZE = 500 * 1024 * 1024 # 500MB 解压后最大大小
@ -233,11 +222,11 @@ async def validate_and_rename_skill_folder(
for folder_name in os.listdir(extract_dir):
folder_path = os.path.join(extract_dir, folder_name)
if os.path.isdir(folder_path):
result = await asyncio.to_thread(
metadata = await asyncio.to_thread(
get_skill_metadata, folder_path
)
if result.valid and result.name:
expected_name = result.name
if metadata and 'name' in metadata:
expected_name = metadata['name']
if folder_name != expected_name:
new_folder_path = os.path.join(extract_dir, expected_name)
await asyncio.to_thread(
@ -249,11 +238,11 @@ async def validate_and_rename_skill_folder(
return extract_dir
else:
# zip 直接包含文件,检查当前目录的 metadata
result = await asyncio.to_thread(
metadata = await asyncio.to_thread(
get_skill_metadata, extract_dir
)
if result.valid and result.name:
expected_name = result.name
if metadata and 'name' in metadata:
expected_name = metadata['name']
# 获取当前文件夹名称
current_name = os.path.basename(extract_dir)
if current_name != expected_name:
@ -282,68 +271,47 @@ async def save_upload_file_async(file: UploadFile, destination: str) -> None:
await f.write(chunk)
def parse_plugin_json(plugin_json_path: str) -> SkillValidationResult:
def parse_plugin_json(plugin_json_path: str) -> Optional[dict]:
"""Parse the plugin.json file for name and description
Args:
plugin_json_path: Path to the plugin.json file
Returns:
SkillValidationResult with validation result and error message if invalid
dict with 'name' and 'description' if found, None otherwise
"""
try:
import json
with open(plugin_json_path, 'r', encoding='utf-8') as f:
plugin_config = json.load(f)
if not isinstance(plugin_config, dict):
logger.warning(f"Invalid plugin.json format in {plugin_json_path}")
return SkillValidationResult(
valid=False,
error_message="plugin.json 格式不正确:文件内容必须是一个 JSON 对象"
)
return None
# Check for required fields
missing_fields = []
if 'name' not in plugin_config:
missing_fields.append('name')
if 'description' not in plugin_config:
missing_fields.append('description')
# Return name and description if both exist
if 'name' in plugin_config and 'description' in plugin_config:
return {
'name': plugin_config['name'],
'description': plugin_config['description']
}
if missing_fields:
logger.warning(f"Missing fields {missing_fields} in {plugin_json_path}")
return SkillValidationResult(
valid=False,
error_message=f"plugin.json 缺少必需字段:请确保包含 {', '.join(missing_fields)} 字段"
)
logger.warning(f"Missing name or description in {plugin_json_path}")
return None
return SkillValidationResult(
valid=True,
name=plugin_config['name'],
description=plugin_config['description']
)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error in {plugin_json_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="plugin.json 格式不正确:请确保文件是有效的 JSON 格式"
)
except Exception as e:
logger.error(f"Error parsing {plugin_json_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="读取 plugin.json 时发生未知错误,请检查文件权限或格式"
)
return None
def parse_skill_frontmatter(skill_md_path: str) -> SkillValidationResult:
def parse_skill_frontmatter(skill_md_path: str) -> Optional[dict]:
"""Parse the YAML frontmatter from SKILL.md file
Args:
skill_md_path: Path to the SKILL.md file
Returns:
SkillValidationResult with validation result and error message if invalid
dict with 'name' and 'description' if found, None otherwise
"""
try:
with open(skill_md_path, 'r', encoding='utf-8') as f:
@ -353,10 +321,7 @@ def parse_skill_frontmatter(skill_md_path: str) -> SkillValidationResult:
frontmatter_match = re.match(r'^---\s*\n(.*?)\n---', content, re.DOTALL)
if not frontmatter_match:
logger.warning(f"No frontmatter found in {skill_md_path}")
return SkillValidationResult(
valid=False,
error_message="SKILL.md 格式不正确:文件开头需要包含 YAML frontmatter以 --- 开始和结束),并包含 name 和 description 字段"
)
return None
frontmatter = frontmatter_match.group(1)
@ -364,108 +329,46 @@ def parse_skill_frontmatter(skill_md_path: str) -> SkillValidationResult:
metadata = yaml.safe_load(frontmatter)
if not isinstance(metadata, dict):
logger.warning(f"Invalid frontmatter format in {skill_md_path}")
return SkillValidationResult(
valid=False,
error_message="SKILL.md frontmatter 格式不正确YAML 内容必须是一个对象"
)
return None
# Check for required fields
missing_fields = []
if 'name' not in metadata:
missing_fields.append('name')
if 'description' not in metadata:
missing_fields.append('description')
# Return name and description if both exist
if 'name' in metadata and 'description' in metadata:
return {
'name': metadata['name'],
'description': metadata['description']
}
if missing_fields:
logger.warning(f"Missing fields {missing_fields} in {skill_md_path}")
return SkillValidationResult(
valid=False,
error_message=f"SKILL.md 缺少必需字段:请确保 frontmatter 中包含 {', '.join(missing_fields)} 字段"
)
logger.warning(f"Missing name or description in {skill_md_path}")
return None
return SkillValidationResult(
valid=True,
name=metadata['name'],
description=metadata['description']
)
except yaml.YAMLError as e:
logger.error(f"YAML parse error in {skill_md_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="SKILL.md frontmatter 格式不正确:请确保 YAML 格式有效"
)
except Exception as e:
logger.error(f"Error parsing {skill_md_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="读取 SKILL.md 时发生未知错误,请检查文件权限或格式"
)
return None
def get_skill_metadata(skill_path: str) -> SkillValidationResult:
def get_skill_metadata(skill_path: str) -> Optional[dict]:
"""Get skill metadata, trying plugin.json first, then SKILL.md
Args:
skill_path: Path to the skill directory
Returns:
SkillValidationResult with validation result and error message if invalid
"""
plugin_json_path = os.path.join(skill_path, '.claude-plugin', 'plugin.json')
skill_md_path = os.path.join(skill_path, 'SKILL.md')
has_plugin_json = os.path.exists(plugin_json_path)
has_skill_md = os.path.exists(skill_md_path)
# Check if at least one metadata file exists
if not has_plugin_json and not has_skill_md:
return SkillValidationResult(
valid=False,
error_message="Skill 格式不正确:请确保 skill 包含 SKILL.md 文件(包含 YAML frontmatter或 .claude-plugin/plugin.json 文件"
)
# Try plugin.json first
if has_plugin_json:
result = parse_plugin_json(plugin_json_path)
if result.valid:
return result
# If plugin.json exists but is invalid, return its error
# (unless SKILL.md also exists and might be valid)
if not has_skill_md:
return result
# If both exist, prefer plugin.json error message
skill_md_result = parse_skill_frontmatter(skill_md_path)
if skill_md_result.valid:
return skill_md_result
# Both invalid, return plugin.json error
return result
# Fallback to SKILL.md
if has_skill_md:
return parse_skill_frontmatter(skill_md_path)
return SkillValidationResult(
valid=False,
error_message="Skill 格式不正确:无法读取有效的元数据"
)
def get_skill_metadata_legacy(skill_path: str) -> Optional[dict]:
"""Legacy function for backward compatibility - returns dict or None
Args:
skill_path: Path to the skill directory
Returns:
dict with 'name' and 'description' if found, None otherwise
"""
result = get_skill_metadata(skill_path)
if result.valid:
return {
'name': result.name,
'description': result.description
}
# Try plugin.json first
plugin_json_path = os.path.join(skill_path, '.claude-plugin', 'plugin.json')
if os.path.exists(plugin_json_path):
metadata = parse_plugin_json(plugin_json_path)
if metadata:
return metadata
# Fallback to SKILL.md
skill_md_path = os.path.join(skill_path, 'SKILL.md')
if os.path.exists(skill_md_path):
metadata = parse_skill_frontmatter(skill_md_path)
if metadata:
return metadata
return None
@ -492,7 +395,7 @@ def get_official_skills(base_dir: str) -> List[SkillItem]:
for skill_name in os.listdir(official_skills_dir):
skill_path = os.path.join(official_skills_dir, skill_name)
if os.path.isdir(skill_path):
metadata = get_skill_metadata_legacy(skill_path)
metadata = get_skill_metadata(skill_path)
if metadata:
skills.append(SkillItem(
name=metadata['name'],
@ -524,7 +427,7 @@ def get_user_skills(base_dir: str, bot_id: str) -> List[SkillItem]:
for skill_name in os.listdir(user_skills_dir):
skill_path = os.path.join(user_skills_dir, skill_name)
if os.path.isdir(skill_path):
metadata = get_skill_metadata_legacy(skill_path)
metadata = get_skill_metadata(skill_path)
if metadata:
skills.append(SkillItem(
name=metadata['name'],
@ -672,45 +575,6 @@ async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = For
extract_target, has_top_level_dirs
)
# 验证 skill 格式
# 如果 zip 包含多个顶<E4B8AA><E9A1B6><EFBFBD>目录需要验证每个目录
skill_dirs_to_validate = []
if has_top_level_dirs:
# 获取所有解压后的 skill 目录
for item in os.listdir(final_extract_path):
item_path = os.path.join(final_extract_path, item)
if os.path.isdir(item_path):
skill_dirs_to_validate.append(item_path)
else:
skill_dirs_to_validate.append(final_extract_path)
# 验证每个 skill 目录的格式
validation_errors = []
for skill_dir in skill_dirs_to_validate:
validation_result = await asyncio.to_thread(get_skill_metadata, skill_dir)
if not validation_result.valid:
skill_dir_name = os.path.basename(skill_dir)
validation_errors.append(f"{skill_dir_name}: {validation_result.error_message}")
logger.warning(f"Skill format validation failed for {skill_dir}: {validation_result.error_message}")
# 如果有验证错误,清理已解压的文件并返回错误
if validation_errors:
# 清理解压的目录
for skill_dir in skill_dirs_to_validate:
try:
await asyncio.to_thread(shutil.rmtree, skill_dir)
logger.info(f"Cleaned up invalid skill directory: {skill_dir}")
except Exception as cleanup_error:
logger.error(f"Failed to cleanup skill directory {skill_dir}: {cleanup_error}")
# 如果只有一个错误,直接返回该错误
if len(validation_errors) == 1:
error_detail = validation_errors[0]
else:
error_detail = "多个 skill 格式验证失败:\n" + "\n".join(validation_errors)
raise HTTPException(status_code=400, detail=error_detail)
# 获取最终的 skill 名称
if has_top_level_dirs:
final_skill_name = folder_name