import os import re import shutil import zipfile import logging from typing import List, Optional from fastapi import APIRouter, HTTPException, Query, UploadFile, File, Form from pydantic import BaseModel from utils.settings import SKILLS_DIR logger = logging.getLogger('app') router = APIRouter() class SkillItem(BaseModel): name: str description: str user_skill: bool = False class SkillListResponse(BaseModel): skills: List[SkillItem] total: int def parse_skill_frontmatter(skill_md_path: str) -> Optional[dict]: """Parse the YAML frontmatter from SKILL.md file Args: skill_md_path: Path to the SKILL.md file Returns: dict with 'name' and 'description' if found, None otherwise """ try: with open(skill_md_path, 'r', encoding='utf-8') as f: content = f.read() # Match YAML frontmatter between --- delimiters frontmatter_match = re.match(r'^---\s*\n(.*?)\n---', content, re.DOTALL) if not frontmatter_match: logger.warning(f"No frontmatter found in {skill_md_path}") return None frontmatter = frontmatter_match.group(1) metadata = {} # Parse key: value pairs from frontmatter for line in frontmatter.split('\n'): line = line.strip() if ':' in line: key, value = line.split(':', 1) metadata[key.strip()] = value.strip() # Return name and description if both exist if 'name' in metadata and 'description' in metadata: return { 'name': metadata['name'], 'description': metadata['description'] } logger.warning(f"Missing name or description in {skill_md_path}") return None except Exception as e: logger.error(f"Error parsing {skill_md_path}: {e}") return None def get_official_skills(base_dir: str) -> List[SkillItem]: """Get all official skills from the skills directory Args: base_dir: Base directory of the project Returns: List of SkillItem objects """ skills = [] # Use SKILLS_DIR from settings, relative to base_dir if os.path.isabs(SKILLS_DIR): official_skills_dir = SKILLS_DIR else: official_skills_dir = os.path.join(base_dir, SKILLS_DIR) if not os.path.exists(official_skills_dir): logger.warning(f"Official skills directory not found: {official_skills_dir}") return skills for skill_name in os.listdir(official_skills_dir): skill_path = os.path.join(official_skills_dir, skill_name) if os.path.isdir(skill_path): skill_md_path = os.path.join(skill_path, 'SKILL.md') if os.path.exists(skill_md_path): metadata = parse_skill_frontmatter(skill_md_path) if metadata: skills.append(SkillItem( name=metadata['name'], description=metadata['description'], user_skill=False )) logger.debug(f"Found official skill: {metadata['name']}") return skills def get_user_skills(base_dir: str, bot_id: str) -> List[SkillItem]: """Get all user uploaded skills for a specific bot Args: base_dir: Base directory of the project bot_id: Bot ID to look up user skills for Returns: List of SkillItem objects """ skills = [] user_skills_dir = os.path.join(base_dir, 'projects', 'uploads', bot_id, 'skills') if not os.path.exists(user_skills_dir): logger.info(f"No user skills directory found for bot {bot_id}: {user_skills_dir}") return skills for skill_name in os.listdir(user_skills_dir): skill_path = os.path.join(user_skills_dir, skill_name) if os.path.isdir(skill_path): skill_md_path = os.path.join(skill_path, 'SKILL.md') if os.path.exists(skill_md_path): metadata = parse_skill_frontmatter(skill_md_path) if metadata: skills.append(SkillItem( name=metadata['name'], description=metadata['description'], user_skill=True )) logger.debug(f"Found user skill: {metadata['name']}") return skills @router.get("/api/v1/skill/list", response_model=SkillListResponse) async def list_skills( bot_id: str = Query(..., description="Bot ID to fetch user skills for") ): """ Get list of all available skills (official + user uploaded) Args: bot_id: Bot ID to fetch user uploaded skills for Returns: SkillListResponse containing all skills Notes: - Official skills are read from the /skills directory - User skills are read from /projects/uploads/{bot_id}/skills directory - User skills are marked with user_skill: true """ try: # Get the project base directory base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Get official skills official_skills = get_official_skills(base_dir) # Get user skills for the specific bot user_skills = get_user_skills(base_dir, bot_id) # Combine both lists (user skills first) all_skills = user_skills + official_skills logger.info(f"Found {len(official_skills)} official skills and {len(user_skills)} user skills for bot {bot_id}") return SkillListResponse( skills=all_skills, total=len(all_skills) ) except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in list_skills: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/api/v1/skill/upload") async def upload_skill(file: UploadFile = File(...), bot_id: Optional[str] = Form(None)): """ Skill文件上传API接口,上传zip文件到 ./projects/uploads/ 目录下并自动解压 Args: file: 上传的zip文件 bot_id: Bot ID,用于创建用户专属的skills目录 Returns: dict: 包含文件路径、解压信息的响应 Notes: - 仅支持.zip格式的skill文件 - 上传后会自动解压到 projects/uploads/{bot_id}/skills/{skill_name}/ 目录 """ try: logger.info(f"Skill upload - bot_id parameter: {bot_id}") logger.info(f"File received: {file.filename if file else 'None'}") # 验证bot_id if not bot_id: raise HTTPException(status_code=400, detail="bot_id is required") # 验证文件名 if not file.filename: raise HTTPException(status_code=400, detail="文件名不能为空") # 验证是否为zip文件 original_filename = file.filename name_without_ext, file_extension = os.path.splitext(original_filename) if file_extension.lower() != '.zip': raise HTTPException(status_code=400, detail="仅支持上传.zip格式的skill文件") folder_name = name_without_ext # 创建上传目录 upload_dir = os.path.join("projects", "uploads", bot_id, "skill_zip") extract_target = os.path.join("projects", "uploads", bot_id, "skills", folder_name) os.makedirs(extract_target, exist_ok=True) os.makedirs(upload_dir, exist_ok=True) try: # 保存zip文件(使用原始文件名) file_path = os.path.join(upload_dir, original_filename) with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) logger.info(f"Saved zip file: {file_path}") # 解压zip文件 with zipfile.ZipFile(file_path, 'r') as zip_ref: zip_ref.extractall(extract_target) logger.info(f"Extracted to: {extract_target}") return { "success": True, "message": f"Skill文件上传并解压成功", "file_path": file_path, "extract_path": extract_target, "original_filename": original_filename, "skill_name": folder_name } except zipfile.BadZipFile: # 解压失败,删除已保存的zip文件 if os.path.exists(file_path): os.remove(file_path) raise HTTPException(status_code=400, detail="上传的文件不是有效的zip文件") except Exception as e: # 解压失败,删除已保存的zip文件 if os.path.exists(file_path): os.remove(file_path) logger.error(f"解压失败: {str(e)}") raise HTTPException(status_code=500, detail=f"解压失败: {str(e)}") except HTTPException: raise except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error uploading skill file: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Skill文件上传失败: {str(e)}")