167 lines
5.2 KiB
Python
167 lines
5.2 KiB
Python
"""
|
||
Skill Hook 加载和执行器
|
||
|
||
在agent执行前自动执行skill的预处理逻辑,用于动态增强系统提示词或上下文。
|
||
"""
|
||
import os
|
||
import asyncio
|
||
import logging
|
||
import re
|
||
import yaml
|
||
from pathlib import Path
|
||
from typing import List, Optional
|
||
|
||
logger = logging.getLogger('app')
|
||
|
||
|
||
async def execute_skill_hooks(config) -> str:
|
||
"""
|
||
执行所有skill的hook,返回需要注入到system_prompt的内容
|
||
|
||
Args:
|
||
config: AgentConfig 对象
|
||
|
||
Returns:
|
||
str: 需要注入到system_prompt的内容(多个hook用\n\n分隔)
|
||
"""
|
||
hook_contents = []
|
||
bot_id = getattr(config, 'bot_id', '')
|
||
user_identifier = getattr(config, 'user_identifier', '')
|
||
project_dir = getattr(config, 'project_dir', None)
|
||
|
||
# 遍历skill目录:官方skills、用户上传skills、robot项目skills
|
||
skill_dirs = _get_skill_dirs(bot_id)
|
||
|
||
for skill_dir in skill_dirs:
|
||
if not os.path.exists(skill_dir):
|
||
continue
|
||
|
||
for skill_name in os.listdir(skill_dir):
|
||
skill_path = os.path.join(skill_dir, skill_name)
|
||
if not os.path.isdir(skill_path):
|
||
continue
|
||
|
||
# 检查是否有SKILL.md且包含hook配置
|
||
skill_md = os.path.join(skill_path, 'SKILL.md')
|
||
if not os.path.exists(skill_md):
|
||
continue
|
||
|
||
hook_type = _parse_hook_type(skill_md)
|
||
if hook_type != 'pre_prompt':
|
||
continue
|
||
|
||
# 执行hook(错误时静默跳过)
|
||
try:
|
||
content = await _execute_hook(skill_path, config)
|
||
if content:
|
||
hook_contents.append(content)
|
||
logger.info(f"Executed hook: {skill_name}")
|
||
except Exception as e:
|
||
logger.error(f"Hook execution failed for {skill_name}: {e}")
|
||
|
||
return "\n\n".join(hook_contents)
|
||
|
||
|
||
def _get_skill_dirs(bot_id: str) -> List[str]:
|
||
"""获取需要扫描的skill目录列表"""
|
||
dirs = []
|
||
|
||
# 官方skills目录
|
||
official_skills = "skills"
|
||
if os.path.exists(official_skills):
|
||
dirs.append(official_skills)
|
||
|
||
# 用户上传的skills目录和robot项目的skills目录
|
||
if bot_id:
|
||
robot_skills = f"projects/robot/{bot_id}/skills"
|
||
if os.path.exists(robot_skills):
|
||
dirs.append(robot_skills)
|
||
|
||
return dirs
|
||
|
||
|
||
def _parse_hook_type(skill_md_path: str) -> Optional[str]:
|
||
"""从SKILL.md解析hook类型"""
|
||
try:
|
||
with open(skill_md_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
frontmatter_match = re.match(r'^---\s*\n(.*?)\n---', content, re.DOTALL)
|
||
if not frontmatter_match:
|
||
return None
|
||
|
||
metadata = yaml.safe_load(frontmatter_match.group(1))
|
||
return metadata.get('hook') if isinstance(metadata, dict) else None
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to parse hook type from {skill_md_path}: {e}")
|
||
return None
|
||
|
||
|
||
async def _execute_hook(skill_path: str, config) -> Optional[str]:
|
||
"""
|
||
执行单个skill的hook
|
||
|
||
优先级: hooks/pre_prompt.py > frontmatter hook_content
|
||
"""
|
||
hook_py = os.path.join(skill_path, 'hooks', 'pre_prompt.py')
|
||
|
||
if os.path.exists(hook_py):
|
||
# 动态执行hooks/pre_prompt.py
|
||
return await _execute_hook_script(hook_py, config)
|
||
else:
|
||
# 从frontmatter读取hook内容
|
||
skill_md = os.path.join(skill_path, 'SKILL.md')
|
||
return _get_hook_content_from_frontmatter(skill_md)
|
||
|
||
|
||
async def _execute_hook_script(hook_py_path: str, config) -> Optional[str]:
|
||
"""执行hook脚本"""
|
||
import importlib.util
|
||
import sys
|
||
|
||
try:
|
||
# 动态加载hook模块
|
||
spec = importlib.util.spec_from_file_location("hook_module", hook_py_path)
|
||
if spec is None or spec.loader is None:
|
||
logger.warning(f"Failed to load hook module from {hook_py_path}")
|
||
return None
|
||
|
||
module = importlib.util.module_from_spec(spec)
|
||
sys.modules['hook_module'] = module
|
||
await asyncio.to_thread(spec.loader.exec_module, module)
|
||
|
||
# 调用execute函数,传入config
|
||
if hasattr(module, 'execute'):
|
||
result = await asyncio.to_thread(module.execute, config)
|
||
return result if isinstance(result, str) else None
|
||
else:
|
||
logger.warning(f"Hook script {hook_py_path} missing 'execute' function")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error executing hook script {hook_py_path}: {e}")
|
||
return None
|
||
|
||
|
||
def _get_hook_content_from_frontmatter(skill_md_path: str) -> Optional[str]:
|
||
"""从SKILL.md frontmatter获取hook内容"""
|
||
try:
|
||
with open(skill_md_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
frontmatter_match = re.match(r'^---\s*\n(.*?)\n---', content, re.DOTALL)
|
||
if not frontmatter_match:
|
||
return None
|
||
|
||
metadata = yaml.safe_load(frontmatter_match.group(1))
|
||
if isinstance(metadata, dict):
|
||
# 返回hook_content字段
|
||
return metadata.get('hook_content') or metadata.get('hook_content_template')
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to get hook content from {skill_md_path}: {e}")
|
||
return None
|
||
|
||
return None
|