qwen_agent/routes/skill_manager.py
朱潮 6b9ae7f86a fix: skill 删除时同步清理 robot 目录,解压时跳过已存在的 skill
1. remove_skill 删除 uploads 下的 skill 后,同步删除 projects/robot/{bot_id}/skills/ 下的副本
2. _extract_skills_to_robot 不再每次全量清空重建,已存在的 skill 直接跳过

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 15:47:28 +08:00

812 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import json
import shutil
import zipfile
import logging
import asyncio
import yaml
from typing import List, Optional
from dataclasses import dataclass
from fastapi import APIRouter, HTTPException, Query, UploadFile, File, Form
from pydantic import BaseModel
from utils.settings import SKILLS_DIR
import aiofiles
logger = logging.getLogger('app')
router = APIRouter()
class SkillItem(BaseModel):
name: str
description: str
user_skill: bool = False
class SkillListResponse(BaseModel):
skills: List[SkillItem]
total: int
@dataclass
class SkillValidationResult:
"""Skill 格式验证结果"""
valid: bool
name: Optional[str] = None
description: Optional[str] = None
error_message: Optional[str] = None
# ============ 安全常量 ============
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB 最大上传文件大小
MAX_UNCOMPRESSED_SIZE = 500 * 1024 * 1024 # 500MB 解压后最大大小
MAX_COMPRESSION_RATIO = 100 # 最大压缩比例 100:1
MAX_ZIP_ENTRIES = 1000 # zip 文件中最多文件数量
def validate_bot_id(bot_id: str) -> str:
"""验证 bot_id 格式,防止路径遍历攻击"""
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id 不能为空")
# 检查路径遍历字符
if '..' in bot_id or '/' in bot_id or '\\' in bot_id:
raise HTTPException(status_code=400, detail="bot_id 包含非法字符")
# 验证 UUID 格式(可选,根据实际需求)
uuid_pattern = r'^[a-fA-F0-9-]{36}$'
if not re.match(uuid_pattern, bot_id):
logger.warning(f"bot_id 格式可能无效: {bot_id}")
return bot_id
def validate_skill_name(skill_name: str) -> str:
"""验证 skill_name 格式,防止路径遍历攻击"""
if not skill_name:
raise HTTPException(status_code=400, detail="skill_name 不能为空")
# 检查路径遍历字符
if '..' in skill_name or '/' in skill_name or '\\' in skill_name:
raise HTTPException(status_code=400, detail="skill_name 包含非法字符")
return skill_name
async def validate_upload_file_size(file: UploadFile) -> int:
"""验证上传文件大小,返回实际文件大小"""
file_size = 0
chunk_size = 8192
# 保存当前位置以便后续重置
await file.seek(0)
while chunk := await file.read(chunk_size):
file_size += len(chunk)
if file_size > MAX_FILE_SIZE:
await file.seek(0) # 重置文件指针
raise HTTPException(
status_code=413,
detail=f"文件过大,最大允许 {MAX_FILE_SIZE // (1024*1024)}MB"
)
await file.seek(0) # 重置文件指针供后续使用
return file_size
def has_skill_metadata_files(dir_path: str) -> bool:
"""检查目录是否包含 skill 元数据文件SKILL.md 或 .claude-plugin/plugin.json
Args:
dir_path: 要检查的目录路径
Returns:
bool: 如果包含元数据文件则返回 True
"""
skill_md = os.path.join(dir_path, 'SKILL.md')
plugin_json = os.path.join(dir_path, '.claude-plugin', 'plugin.json')
return os.path.exists(skill_md) or os.path.exists(plugin_json)
def detect_skill_structure(extract_dir: str) -> str:
"""检测解压后目录中 skill 元数据的位置
优先检查根目录是否直接包含 SKILL.md 或 .claude-plugin/plugin.json
如果没有,再检查第二级子目录。
Args:
extract_dir: 解压后的目录路径
Returns:
"root" - 根目录直接包含 SKILL.md 或 .claude-plugin/plugin.json
"subdirs" - 第二级子目录包含 skill 元数据
"unknown" - 未找到有效的 skill 元数据
"""
# 第一步:检查根目录
if has_skill_metadata_files(extract_dir):
logger.info(f"Skill metadata found at root level: {extract_dir}")
return "root"
# 第二步:检查第二级子目录
try:
for item in os.listdir(extract_dir):
item_path = os.path.join(extract_dir, item)
if os.path.isdir(item_path) and item != '__MACOSX':
if has_skill_metadata_files(item_path):
logger.info(f"Skill metadata found in subdirectory: {item}")
return "subdirs"
except OSError as e:
logger.warning(f"Error scanning directory {extract_dir}: {e}")
logger.warning(f"No skill metadata found in {extract_dir}")
return "unknown"
async def safe_extract_zip(zip_path: str, extract_dir: str) -> None:
"""安全地解压 zip 文件,防止 ZipSlip 和 zip 炸弹攻击
Args:
zip_path: zip 文件路径
extract_dir: 解压目标目录
Raises:
HTTPException: 如果检测到恶意文件
"""
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
# 检查文件数量
file_list = zip_ref.infolist()
if len(file_list) > MAX_ZIP_ENTRIES:
raise zipfile.BadZipFile(f"zip 文件包含过多文件: {len(file_list)}")
# 检查压缩比例和总大小
compressed_size = sum(z.file_size for z in file_list)
uncompressed_size = sum(z.compress_size for z in file_list)
if uncompressed_size > MAX_UNCOMPRESSED_SIZE:
raise zipfile.BadZipFile(
f"解压后大小 {uncompressed_size // (1024*1024)}MB 超过限制 "
f"{MAX_UNCOMPRESSED_SIZE // (1024*1024)}MB"
)
# 检查压缩比例(防止 zip 炸弹)
if compressed_size > 0:
ratio = uncompressed_size / compressed_size
if ratio > MAX_COMPRESSION_RATIO:
raise zipfile.BadZipFile(
f"压缩比例 {ratio:.1f}:1 超过限制 {MAX_COMPRESSION_RATIO}:1"
f"可能是 zip 炸弹攻击"
)
# 规范化目标目录路径
extract_dir_real = os.path.realpath(extract_dir)
# 安全地解压每个文件
for zip_info in file_list:
# 检查路径遍历攻击
if '..' in zip_info.filename or zip_info.filename.startswith('/'):
raise zipfile.BadZipFile(
f"检测到路径遍历攻击: {zip_info.filename}"
)
# 构建完整的目标路径
target_path = os.path.realpath(os.path.join(extract_dir, zip_info.filename))
# 确保目标路径在解压目录内
if not target_path.startswith(extract_dir_real + os.sep):
if target_path != extract_dir_real: # 允许目录本身
raise zipfile.BadZipFile(
f"文件将被解压到目标目录之外: {zip_info.filename}"
)
# 检查符号链接(兼容 Python 3.8+
# is_symlink() 方法在 Python 3.9+ 才有,使用 hasattr 兼容旧版本
is_symlink = (
hasattr(zip_info, 'is_symlink') and zip_info.is_symlink()
) or (
# 通过 external_attr 检查(兼容所有版本)
(zip_info.external_attr >> 16) & 0o170000 == 0o120000
)
if is_symlink:
raise zipfile.BadZipFile(
f"不允许符号链接: {zip_info.filename}"
)
# 解压文件(使用线程池避免阻塞)
await asyncio.to_thread(zip_ref.extract, zip_info, extract_dir)
except zipfile.BadZipFile as e:
raise HTTPException(status_code=400, detail=f"无效的 zip 文件: {str(e)}")
async def save_upload_file_async(file: UploadFile, destination: str) -> None:
"""异步保存上传文件到目标路径"""
async with aiofiles.open(destination, 'wb') as f:
chunk_size = 8192
while chunk := await file.read(chunk_size):
await f.write(chunk)
def parse_plugin_json(plugin_json_path: str) -> SkillValidationResult:
"""Parse the plugin.json file for name and description
Args:
plugin_json_path: Path to the plugin.json file
Returns:
SkillValidationResult with validation result and error message if invalid
"""
try:
with open(plugin_json_path, 'r', encoding='utf-8') as f:
plugin_config = json.load(f)
if not isinstance(plugin_config, dict):
logger.warning(f"Invalid plugin.json format in {plugin_json_path}")
return SkillValidationResult(
valid=False,
error_message="plugin.json 格式不正确:文件内容必须是一个 JSON 对象"
)
# Check for required fields
missing_fields = []
if 'name' not in plugin_config:
missing_fields.append('name')
if 'description' not in plugin_config:
missing_fields.append('description')
if missing_fields:
logger.warning(f"Missing fields {missing_fields} in {plugin_json_path}")
return SkillValidationResult(
valid=False,
error_message=f"plugin.json 缺少必需字段:请确保包含 {', '.join(missing_fields)} 字段"
)
return SkillValidationResult(
valid=True,
name=plugin_config['name'],
description=plugin_config['description']
)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error in {plugin_json_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="plugin.json 格式不正确:请确保文件是有效的 JSON 格式"
)
except Exception as e:
logger.error(f"Error parsing {plugin_json_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="读取 plugin.json 时发生未知错误,请检查文件权限或格式"
)
def parse_skill_frontmatter(skill_md_path: str) -> SkillValidationResult:
"""Parse the YAML frontmatter from SKILL.md file
Args:
skill_md_path: Path to the SKILL.md file
Returns:
SkillValidationResult with validation result and error message if invalid
"""
try:
with open(skill_md_path, 'r', encoding='utf-8') as f:
content = f.read()
# Match YAML frontmatter between --- delimiters
frontmatter_match = re.match(r'^---\s*\n(.*?)\n---', content, re.DOTALL)
if not frontmatter_match:
logger.warning(f"No frontmatter found in {skill_md_path}")
return SkillValidationResult(
valid=False,
error_message="SKILL.md 格式不正确:文件开头需要包含 YAML frontmatter以 --- 开始和结束),并包含 name 和 description 字段"
)
frontmatter = frontmatter_match.group(1)
# Parse YAML using yaml.safe_load
metadata = yaml.safe_load(frontmatter)
if not isinstance(metadata, dict):
logger.warning(f"Invalid frontmatter format in {skill_md_path}")
return SkillValidationResult(
valid=False,
error_message="SKILL.md frontmatter 格式不正确YAML 内容必须是一个对象"
)
# Check for required fields
missing_fields = []
if 'name' not in metadata:
missing_fields.append('name')
if 'description' not in metadata:
missing_fields.append('description')
if missing_fields:
logger.warning(f"Missing fields {missing_fields} in {skill_md_path}")
return SkillValidationResult(
valid=False,
error_message=f"SKILL.md 缺少必需字段:请确保 frontmatter 中包含 {', '.join(missing_fields)} 字段"
)
return SkillValidationResult(
valid=True,
name=metadata['name'],
description=metadata['description']
)
except yaml.YAMLError as e:
logger.error(f"YAML parse error in {skill_md_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="SKILL.md frontmatter 格式不正确:请确保 YAML 格式有效"
)
except Exception as e:
logger.error(f"Error parsing {skill_md_path}: {e}")
return SkillValidationResult(
valid=False,
error_message="读取 SKILL.md 时发生未知错误,请检查文件权限或格式"
)
def get_skill_metadata(skill_path: str) -> SkillValidationResult:
"""Get skill metadata, trying plugin.json first, then SKILL.md
Args:
skill_path: Path to the skill directory
Returns:
SkillValidationResult with validation result and error message if invalid
"""
plugin_json_path = os.path.join(skill_path, '.claude-plugin', 'plugin.json')
skill_md_path = os.path.join(skill_path, 'SKILL.md')
has_plugin_json = os.path.exists(plugin_json_path)
has_skill_md = os.path.exists(skill_md_path)
# Check if at least one metadata file exists
if not has_plugin_json and not has_skill_md:
return SkillValidationResult(
valid=False,
error_message="Skill 格式不正确:请确保 skill 包含 SKILL.md 文件(包含 YAML frontmatter或 .claude-plugin/plugin.json 文件"
)
# Try plugin.json first
if has_plugin_json:
result = parse_plugin_json(plugin_json_path)
if result.valid:
return result
# If plugin.json exists but is invalid, return its error
# (unless SKILL.md also exists and might be valid)
if not has_skill_md:
return result
# If both exist, prefer plugin.json error message
skill_md_result = parse_skill_frontmatter(skill_md_path)
if skill_md_result.valid:
return skill_md_result
# Both invalid, return plugin.json error
return result
# Fallback to SKILL.md
if has_skill_md:
return parse_skill_frontmatter(skill_md_path)
return SkillValidationResult(
valid=False,
error_message="Skill 格式不正确:无法读取有效的元数据"
)
def get_skill_metadata_legacy(skill_path: str) -> Optional[dict]:
"""Legacy function for backward compatibility - returns dict or None
Args:
skill_path: Path to the skill directory
Returns:
dict with 'name' and 'description' if found, None otherwise
"""
result = get_skill_metadata(skill_path)
if result.valid:
return {
'name': result.name,
'description': result.description
}
return None
def get_official_skills(base_dir: str) -> List[SkillItem]:
"""Get all official skills from the skills directory
Args:
base_dir: Base directory of the project
Returns:
List of SkillItem objects
"""
skills = []
# Use SKILLS_DIR from settings, relative to base_dir
if os.path.isabs(SKILLS_DIR):
official_skills_dir = SKILLS_DIR
else:
official_skills_dir = os.path.join(base_dir, SKILLS_DIR)
if not os.path.exists(official_skills_dir):
logger.warning(f"Official skills directory not found: {official_skills_dir}")
return skills
for skill_name in os.listdir(official_skills_dir):
skill_path = os.path.join(official_skills_dir, skill_name)
if os.path.isdir(skill_path):
metadata = get_skill_metadata_legacy(skill_path)
if metadata:
skills.append(SkillItem(
name=metadata['name'],
description=metadata['description'],
user_skill=False
))
logger.debug(f"Found official skill: {metadata['name']}")
return skills
def get_user_skills(base_dir: str, bot_id: str) -> List[SkillItem]:
"""Get all user uploaded skills for a specific bot
Args:
base_dir: Base directory of the project
bot_id: Bot ID to look up user skills for
Returns:
List of SkillItem objects
"""
skills = []
user_skills_dir = os.path.join(base_dir, 'projects', 'uploads', bot_id, 'skills')
if not os.path.exists(user_skills_dir):
logger.info(f"No user skills directory found for bot {bot_id}: {user_skills_dir}")
return skills
for skill_name in os.listdir(user_skills_dir):
skill_path = os.path.join(user_skills_dir, skill_name)
if os.path.isdir(skill_path):
metadata = get_skill_metadata_legacy(skill_path)
if metadata:
skills.append(SkillItem(
name=metadata['name'],
description=metadata['description'],
user_skill=True
))
logger.debug(f"Found user skill: {metadata['name']}")
return skills
@router.get("/api/v1/skill/list", response_model=SkillListResponse)
async def list_skills(
bot_id: str = Query(..., description="Bot ID to fetch user skills for")
):
"""
Get list of all available skills (official + user uploaded)
Args:
bot_id: Bot ID to fetch user uploaded skills for
Returns:
SkillListResponse containing all skills
Notes:
- Official skills are read from the /skills directory
- User skills are read from /projects/uploads/{bot_id}/skills directory
- User skills are marked with user_skill: true
"""
try:
# Get the project base directory
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Get official skills
official_skills = get_official_skills(base_dir)
# Get user skills for the specific bot
user_skills = get_user_skills(base_dir, bot_id)
# Combine both lists (user skills first)
all_skills = user_skills + official_skills
logger.info(f"Found {len(official_skills)} official skills and {len(user_skills)} user skills for bot {bot_id}")
return SkillListResponse(
skills=all_skills,
total=len(all_skills)
)
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in list_skills: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/api/v1/skill/upload")
async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = Form(None)):
"""
Skill文件上传API接口上传zip文件到 ./projects/uploads/ 目录下并自动解压
安全改进:
- P1-001: ZipSlip 路径遍历防护 - 检查每个文件的解压路径
- P1-004: 文件大小限制 - 最大 50MB
- P1-005: Zip 炸弹防护 - 检查压缩比例(最大 100:1和解压后大小最大 500MB
- P1-008: 异步 I/O - 使用 aiofiles 和 asyncio.to_thread
Args:
file: 上传的zip文件
bot_id: Bot ID用于创建用户专属的skills目录
Returns:
dict: 包含文件路径、解压信息的响应
Notes:
- 仅支持.zip格式的skill文件
- 上传后会自动解压到 projects/uploads/{bot_id}/skills/{skill_name}/ 目录
- 文件大小限制: 50MB
- 解压后大小限制: 500MB
"""
file_path = None # 初始化以便在异常处理中使用
try:
# 验证 bot_id (P1-006 路径遍历防护)
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id 不能为空")
bot_id = validate_bot_id(bot_id)
# 验证文件名
if not file.filename:
raise HTTPException(status_code=400, detail="文件名不能为空")
logger.info(f"Skill upload - bot_id: {bot_id}, filename: {file.filename}")
# 验证是否为zip文件
original_filename = file.filename
name_without_ext, file_extension = os.path.splitext(original_filename)
if file_extension.lower() != '.zip':
raise HTTPException(status_code=400, detail="仅支持上传.zip格式的skill文件")
# P1-004: 验证文件大小(异步读取,不阻塞事件循环)
file_size = await validate_upload_file_size(file)
logger.info(f"File size: {file_size // 1024}KB")
folder_name = name_without_ext
# 创建上传目录(先保存 zip 文件)
upload_dir = os.path.join("projects", "uploads", bot_id, "skill_zip")
await asyncio.to_thread(os.makedirs, upload_dir, exist_ok=True)
# 保存zip文件路径
file_path = os.path.join(upload_dir, original_filename)
# P1-008: 异步保存文件(使用 aiofiles不阻塞事件循环
await save_upload_file_async(file, file_path)
logger.info(f"Saved zip file: {file_path}")
# 统一解压到临时目录
tmp_extract_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp", folder_name)
await asyncio.to_thread(os.makedirs, tmp_extract_dir, exist_ok=True)
# P1-001, P1-005: 安全解压(防止 ZipSlip 和 zip 炸弹)
await safe_extract_zip(file_path, tmp_extract_dir)
logger.info(f"Extracted to tmp dir: {tmp_extract_dir}")
# 清理 macOS 自动生成的 __MACOSX 目录
macosx_dir = os.path.join(tmp_extract_dir, "__MACOSX")
if os.path.exists(macosx_dir):
await asyncio.to_thread(shutil.rmtree, macosx_dir)
logger.info(f"Cleaned up __MACOSX directory: {macosx_dir}")
# 基于 skill 元数据文件位置检测结构
skill_structure = await asyncio.to_thread(detect_skill_structure, tmp_extract_dir)
logger.info(f"Detected skill structure: {skill_structure}")
skills_dir = os.path.join("projects", "uploads", bot_id, "skills")
await asyncio.to_thread(os.makedirs, skills_dir, exist_ok=True)
skill_dirs_to_validate = []
if skill_structure == "root":
# 根目录直接包含 skill 元数据,整体作为一个 skill
result = await asyncio.to_thread(get_skill_metadata, tmp_extract_dir)
if result.valid and result.name:
skill_name = result.name
else:
skill_name = folder_name
target_dir = os.path.join(skills_dir, skill_name)
# 如果目标已存在,先删除
if os.path.exists(target_dir):
await asyncio.to_thread(shutil.rmtree, target_dir)
await asyncio.to_thread(shutil.move, tmp_extract_dir, target_dir)
skill_dirs_to_validate.append(target_dir)
logger.info(f"Moved skill to: {target_dir}")
elif skill_structure == "subdirs":
# 第二级子目录包含 skill 元数据,逐个移动
for item in os.listdir(tmp_extract_dir):
item_path = os.path.join(tmp_extract_dir, item)
if not os.path.isdir(item_path) or item == '__MACOSX':
continue
if has_skill_metadata_files(item_path):
result = await asyncio.to_thread(get_skill_metadata, item_path)
if result.valid and result.name:
skill_name = result.name
else:
skill_name = item
target_dir = os.path.join(skills_dir, skill_name)
if os.path.exists(target_dir):
await asyncio.to_thread(shutil.rmtree, target_dir)
await asyncio.to_thread(shutil.move, item_path, target_dir)
skill_dirs_to_validate.append(target_dir)
logger.info(f"Moved skill '{skill_name}' to: {target_dir}")
# 清理临时目录
if os.path.exists(tmp_extract_dir):
await asyncio.to_thread(shutil.rmtree, tmp_extract_dir)
else:
# unknown - 未找到有效的 skill 元数据
await asyncio.to_thread(shutil.rmtree, tmp_extract_dir)
raise HTTPException(
status_code=400,
detail="Skill 格式不正确:请确保 skill 包含 SKILL.md 文件(包含 YAML frontmatter或 .claude-plugin/plugin.json 文件"
)
# 验证每个 skill 目录的格式
validation_errors = []
for skill_dir in skill_dirs_to_validate:
validation_result = await asyncio.to_thread(get_skill_metadata, skill_dir)
if not validation_result.valid:
skill_dir_name = os.path.basename(skill_dir)
validation_errors.append(f"{skill_dir_name}: {validation_result.error_message}")
logger.warning(f"Skill format validation failed for {skill_dir}: {validation_result.error_message}")
# 如果有验证错误,清理已解压的文件并返回错误
if validation_errors:
for skill_dir in skill_dirs_to_validate:
try:
await asyncio.to_thread(shutil.rmtree, skill_dir)
logger.info(f"Cleaned up invalid skill directory: {skill_dir}")
except Exception as cleanup_error:
logger.error(f"Failed to cleanup skill directory {skill_dir}: {cleanup_error}")
if len(validation_errors) == 1:
error_detail = validation_errors[0]
else:
error_detail = "多个 skill 格式验证失败:\n" + "\n".join(validation_errors)
raise HTTPException(status_code=400, detail=error_detail)
# 获取最终的 skill 名称
if len(skill_dirs_to_validate) == 1:
final_extract_path = skill_dirs_to_validate[0]
final_skill_name = os.path.basename(final_extract_path)
else:
final_extract_path = skills_dir
final_skill_name = folder_name
return {
"success": True,
"message": f"Skill文件上传并解压成功",
"file_path": file_path,
"extract_path": final_extract_path,
"original_filename": original_filename,
"skill_name": final_skill_name
}
except HTTPException:
# 清理已上传的文件和临时目录
if file_path and os.path.exists(file_path):
try:
await asyncio.to_thread(os.remove, file_path)
logger.info(f"Cleaned up file: {file_path}")
except Exception as cleanup_error:
logger.error(f"Failed to cleanup file: {cleanup_error}")
tmp_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp") if bot_id else None
if tmp_dir and os.path.exists(tmp_dir):
try:
await asyncio.to_thread(shutil.rmtree, tmp_dir)
except Exception:
pass
raise
except Exception as e:
# 清理已上传的文件和临时目录
if file_path and os.path.exists(file_path):
try:
await asyncio.to_thread(os.remove, file_path)
logger.info(f"Cleaned up file: {file_path}")
except Exception as cleanup_error:
logger.error(f"Failed to cleanup file: {cleanup_error}")
tmp_dir = os.path.join("projects", "uploads", bot_id, "skill_tmp") if bot_id else None
if tmp_dir and os.path.exists(tmp_dir):
try:
await asyncio.to_thread(shutil.rmtree, tmp_dir)
except Exception:
pass
logger.error(f"Error uploading skill file: {str(e)}")
# 不暴露详细错误信息给客户端(安全考虑)
raise HTTPException(status_code=500, detail="Skill文件上传失败")
@router.delete("/api/v1/skill/remove")
async def remove_skill(
bot_id: str = Query(..., description="Bot ID"),
skill_name: str = Query(..., description="Skill name to remove")
):
"""
删除用户上传的 skill
Args:
bot_id: Bot ID
skill_name: 要删除的 skill 名称
Returns:
dict: 删除结果
Notes:
- 只能删除用户上传的 skills不能删除官方 skills
- 删除路径: projects/uploads/{bot_id}/skills/{skill_name}
"""
try:
# 验证参数(防止路径遍历攻击)
bot_id = validate_bot_id(bot_id)
skill_name = validate_skill_name(skill_name)
logger.info(f"Skill remove - bot_id: {bot_id}, skill_name: {skill_name}")
# 构建删除目录路径
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
skill_dir = os.path.join(base_dir, "projects", "uploads", bot_id, "skills", skill_name)
# 规范化路径并确保在允许的目录内
skill_dir_real = os.path.realpath(skill_dir)
allowed_base = os.path.realpath(os.path.join(base_dir, "projects", "uploads", bot_id, "skills"))
if not skill_dir_real.startswith(allowed_base + os.sep):
raise HTTPException(status_code=403, detail="非法的删除路径")
# 检查目录是否存在
if not os.path.exists(skill_dir_real):
raise HTTPException(status_code=404, detail="Skill 不存在")
# 检查是否为目录
if not os.path.isdir(skill_dir_real):
raise HTTPException(status_code=400, detail="目标路径不是目录")
# 使用线程池删除目录(避免阻塞事件循环)
await asyncio.to_thread(shutil.rmtree, skill_dir_real)
logger.info(f"Successfully removed skill directory: {skill_dir_real}")
# 同步删除 robot 目录下的 skill 副本
robot_skill_dir = os.path.join(base_dir, "projects", "robot", bot_id, "skills", skill_name)
if os.path.exists(robot_skill_dir):
await asyncio.to_thread(shutil.rmtree, robot_skill_dir)
logger.info(f"Also removed robot skill directory: {robot_skill_dir}")
return {
"success": True,
"message": f"Skill '{skill_name}' 删除成功",
"bot_id": bot_id,
"skill_name": skill_name
}
except HTTPException:
raise
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error removing skill: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail="删除 Skill 失败")