feat: 实现 Claude Plugins 模式的 Hook 机制

- 新增 agent/plugin_hook_loader.py:支持通过 .claude-plugin/plugin.json 配置 hooks 和 mcpServers
- 修改 agent/prompt_loader.py:集成 PrePrompt hooks,优先读取 skill MCP 配置
- 修改 routes/chat.py:添加 PostAgent 和 PreSave hooks
- 修改 routes/skill_manager.py:优先从 plugin.json 读取 name/description,fallback 到 SKILL.md
- 删除旧的 agent/skill_hook_loader.py
- 新增示例 skill user-context-loader,演示完整的 hooks 用法

Hook 类型:
- PrePrompt: 在 system_prompt 加载时注入内容
- PostAgent: 在 agent 执行后处理
- PreSave: 在消息保存前处理

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
朱潮 2026-02-06 20:15:54 +08:00
parent e67d50b4fc
commit c7e9f305b7
11 changed files with 682 additions and 345 deletions

208
agent/plugin_hook_loader.py Normal file
View File

@ -0,0 +1,208 @@
"""
Claude Plugins 模式的 Hook 加载器
支持通过 .claude-plugin/plugin.json 配置 hooks mcpServers
"""
import os
import json
import logging
import asyncio
import subprocess
from typing import List, Dict, Optional, Any
logger = logging.getLogger('app')
# Hook 类型定义
HOOK_TYPES = {
'PrePrompt': '在system_prompt加载时注入内容',
'PostAgent': '在agent执行后处理',
'PreSave': '在保存消息前处理',
}
async def execute_hooks(hook_type: str, config, **kwargs) -> Any:
"""
执行指定类型的所有 hooks
Args:
hook_type: hook 类型 (PrePrompt, PostAgent, PreSave)
config: AgentConfig 对象
**kwargs: hook 特定的参数
- PrePrompt: 无额外参数返回 str
- PostAgent: response (str), metadata (dict)返回 None
- PreSave: content (str), role (str)返回 str
Returns:
- PrePrompt: str (注入内容)
- PostAgent: None
- PreSave: str (处理后的内容)
"""
hook_results = []
bot_id = getattr(config, 'bot_id', '')
skill_dirs = _get_skill_dirs(bot_id)
for skill_dir in skill_dirs:
if not os.path.exists(skill_dir):
continue
# 遍历 skill 目录下的每个子文件夹
for skill_name in os.listdir(skill_dir):
skill_path = os.path.join(skill_dir, skill_name)
if not os.path.isdir(skill_path):
continue
plugin_json = os.path.join(skill_path, '.claude-plugin', 'plugin.json')
if not os.path.exists(plugin_json):
continue
try:
plugin_config = _load_plugin_config(plugin_json)
hooks = plugin_config.get('hooks', {}).get(hook_type, [])
for hook_config in hooks:
if hook_config.get('type') == 'command':
command = hook_config.get('command')
if command:
# 在 skill 目录下执行命令
result = await _execute_command(
skill_path, command, hook_type, config, **kwargs
)
if result:
hook_results.append(result)
logger.info(f"Executed {hook_type} hook from {skill_name}")
except Exception as e:
logger.error(f"Failed to load hooks from {plugin_json}: {e}")
# 根据hook类型返回结果
if hook_type == 'PrePrompt':
return "\n\n".join(hook_results)
elif hook_type == 'PreSave':
# PreSave 返回处理后的内容
# 如果有hook返回内容使用最后一个hook的结果
# 否则返回原始内容
return hook_results[-1] if hook_results else kwargs.get('content', '')
return None
async def merge_skill_mcp_configs(bot_id: str) -> List[Dict]:
"""
从所有 skill 目录的 plugin.json 中读取 mcpServers 并合并
Args:
bot_id: Bot ID
Returns:
List[Dict]: 合并后的MCP设置列表
"""
skill_dirs = _get_skill_dirs(bot_id)
merged_servers = {}
for skill_dir in skill_dirs:
if not os.path.exists(skill_dir):
continue
for skill_name in os.listdir(skill_dir):
skill_path = os.path.join(skill_dir, skill_name)
if not os.path.isdir(skill_path):
continue
plugin_json = os.path.join(skill_path, '.claude-plugin', 'plugin.json')
if os.path.exists(plugin_json):
try:
with open(plugin_json, 'r', encoding='utf-8') as f:
plugin_config = json.load(f)
servers = plugin_config.get('mcpServers', {})
if servers:
merged_servers.update(servers)
logger.info(f"Loaded MCP config from skill: {skill_name}")
except Exception as e:
logger.error(f"Failed to load mcpServers from {skill_name}: {e}")
if merged_servers:
return [{"mcpServers": merged_servers}]
return []
def _load_plugin_config(plugin_json_path: str) -> Dict:
"""加载 plugin.json 配置"""
try:
with open(plugin_json_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load plugin.json: {e}")
return {}
def _get_skill_dirs(bot_id: str) -> List[str]:
"""获取需要扫描的skill目录列表"""
dirs = []
# 用户上传的skills目录
if bot_id:
robot_skills = f"projects/robot/{bot_id}/skills"
if os.path.exists(robot_skills):
dirs.append(robot_skills)
return dirs
async def _execute_command(skill_path: str, command: str, hook_type: str, config, **kwargs) -> Optional[str]:
"""执行 hook 命令
Args:
skill_path: skill 目录路径作为工作目录
command: 要执行的命令
hook_type: hook 类型
config: AgentConfig 对象
**kwargs: 额外参数
Returns:
str: 命令的 stdout 输出
"""
try:
# 设置环境变量,传递给子进程
env = os.environ.copy()
env['BOT_ID'] = getattr(config, 'bot_id', '')
env['USER_IDENTIFIER'] = getattr(config, 'user_identifier', '')
env['SESSION_ID'] = getattr(config, 'session_id', '')
env['LANGUAGE'] = getattr(config, 'language', '')
env['HOOK_TYPE'] = hook_type
# 对于 PreSave传递 content
if hook_type == 'PreSave':
env['CONTENT'] = kwargs.get('content', '')
env['ROLE'] = kwargs.get('role', '')
# 对于 PostAgent传递 response
if hook_type == 'PostAgent':
env['RESPONSE'] = kwargs.get('response', '')
metadata = kwargs.get('metadata', {})
env['METADATA'] = json.dumps(metadata) if metadata else ''
# 使用 subprocess 执行命令,捕获 stdout
process = await asyncio.create_subprocess_shell(
command,
cwd=skill_path,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = await process.communicate()
if stdout:
result = stdout.decode('utf-8').strip()
return result
if stderr and process.returncode != 0:
logger.warning(f"Hook command stderr: {stderr.decode('utf-8')}")
return None
except Exception as e:
logger.error(f"Error executing hook command '{command}': {e}")
return None

View File

@ -10,6 +10,7 @@ from datetime import datetime, timezone, timedelta
import logging
from utils.settings import BACKEND_HOST, MASTERKEY
logger = logging.getLogger('app')
from .plugin_hook_loader import execute_hooks, merge_skill_mcp_configs
def format_datetime_by_language(language: str) -> str:
"""
@ -127,9 +128,8 @@ async def load_system_prompt_async(config) -> str:
trace_id=trace_id or ""
)
# ============ 执行skill hooks ============
from .skill_hook_loader import execute_skill_hooks
hook_content = await execute_skill_hooks(config)
# ============ 执行 PrePrompt hooks ============
hook_content = await execute_hooks('PrePrompt', config)
if hook_content:
# 将hook内容注入到prompt的末尾
prompt = f"{prompt}\n\n## Context from Skills\n\n{hook_content}"
@ -184,23 +184,43 @@ async def load_mcp_settings_async(config) -> List[Dict]:
mcp_settings = getattr(config, 'mcp_settings', None)
bot_id = getattr(config, 'bot_id', '')
# 1. 首先读取默认MCP设置
# 1. ============ 首先合并skill目录下的plugin.json配置不使用缓存确保改动生效============
skill_mcp_settings = await merge_skill_mcp_configs(bot_id)
merged_settings = []
if skill_mcp_settings and len(skill_mcp_settings) > 0:
merged_settings = skill_mcp_settings.copy()
skill_mcp_servers = skill_mcp_settings[0].get('mcpServers', {})
logger.info(f"Loaded {len(skill_mcp_servers)} MCP servers from skills")
# ===========================================================================================
# 2. 读取默认MCP设置使用缓存
default_mcp_settings = []
try:
# 使用缓存读取默认MCP设置文件
default_mcp_file = os.path.join("mcp", f"mcp_settings.json")
default_mcp_settings = await config_cache.get_json_file(default_mcp_file) or []
if default_mcp_settings:
logger.info(f"Using cached default mcp_settings from mcp folder")
else:
logger.warning(f"No default mcp_settings found, using empty default settings")
except Exception as e:
logger.error(f"Failed to load default mcp_settings: {str(e)}")
default_mcp_settings = []
# 遍历mcpServers工具给每个工具增加env参数
# 3. 合并默认设置到merged_settings默认设置被skill覆盖
if default_mcp_settings and len(default_mcp_settings) > 0:
mcp_servers = default_mcp_settings[0].get('mcpServers', {})
default_mcp_servers = default_mcp_settings[0].get('mcpServers', {})
if merged_settings and len(merged_settings) > 0:
# skill配置已存在将默认配置合并进去skill优先
skill_mcp_servers = merged_settings[0].get('mcpServers', {})
# 默认配置中不存在的才添加
for server_name, server_config in default_mcp_servers.items():
if server_name not in skill_mcp_servers:
skill_mcp_servers[server_name] = server_config
else:
# 没有skill配置直接使用默认配置
merged_settings = default_mcp_settings.copy()
# 遍历mcpServers工具给每个工具增加env参数
if merged_settings and len(merged_settings) > 0:
mcp_servers = merged_settings[0].get('mcpServers', {})
for server_name, server_config in mcp_servers.items():
if isinstance(server_config, dict):
# 如果还没有env字段则创建一个
@ -210,7 +230,7 @@ async def load_mcp_settings_async(config) -> List[Dict]:
server_config['env']['BACKEND_HOST'] = BACKEND_HOST
server_config['env']['MASTERKEY'] = MASTERKEY
# 2. 处理传入的mcp_settings参数
# 4. 处理传入的mcp_settings参数优先级最高覆盖所有
input_mcp_settings = []
if mcp_settings is not None:
if isinstance(mcp_settings, list):
@ -219,25 +239,17 @@ async def load_mcp_settings_async(config) -> List[Dict]:
input_mcp_settings = [mcp_settings]
logger.warning(f"Warning: mcp_settings is not a list, converting to list format")
# 3. 合并默认设置和传入设置
merged_settings = []
# 如果有默认设置,以此为基准
if default_mcp_settings:
merged_settings = default_mcp_settings.copy()
# 如果有传入设置合并mcpServers对象
# 5. 合并用户传入的mcp_settings
if input_mcp_settings and len(input_mcp_settings) > 0 and len(merged_settings) > 0:
default_mcp_servers = merged_settings[0].get('mcpServers', {})
merged_mcp_servers = merged_settings[0].get('mcpServers', {})
input_mcp_servers = input_mcp_settings[0].get('mcpServers', {})
# 合并mcpServers对象传入的设置覆盖默认设置中相同的key
default_mcp_servers.update(input_mcp_servers)
merged_settings[0]['mcpServers'] = default_mcp_servers
logger.info(f"Merged mcpServers: default + {len(input_mcp_servers)} input servers")
# 如果没有默认设置但有传入设置,直接使用传入设置
elif input_mcp_settings:
# 合并mcpServers对象传入的设置覆盖已有设置
merged_mcp_servers.update(input_mcp_servers)
merged_settings[0]['mcpServers'] = merged_mcp_servers
logger.info(f"Merged mcpServers: existing + {len(input_mcp_servers)} input servers")
elif input_mcp_settings and not merged_settings:
# 如果没有其他配置,直接使用传入设置
merged_settings = input_mcp_settings.copy()
# 确保返回的是列表格式

View File

@ -1,166 +0,0 @@
"""
Skill Hook 加载和执行器
在agent执行前自动执行skill的预处理逻辑用于动态增强系统提示词或上下文
"""
import os
import asyncio
import logging
import re
import yaml
from pathlib import Path
from typing import List, Optional
logger = logging.getLogger('app')
async def execute_skill_hooks(config) -> str:
"""
执行所有skill的hook返回需要注入到system_prompt的内容
Args:
config: AgentConfig 对象
Returns:
str: 需要注入到system_prompt的内容多个hook用\n\n分隔
"""
hook_contents = []
bot_id = getattr(config, 'bot_id', '')
user_identifier = getattr(config, 'user_identifier', '')
project_dir = getattr(config, 'project_dir', None)
# 遍历skill目录官方skills、用户上传skills、robot项目skills
skill_dirs = _get_skill_dirs(bot_id)
for skill_dir in skill_dirs:
if not os.path.exists(skill_dir):
continue
for skill_name in os.listdir(skill_dir):
skill_path = os.path.join(skill_dir, skill_name)
if not os.path.isdir(skill_path):
continue
# 检查是否有SKILL.md且包含hook配置
skill_md = os.path.join(skill_path, 'SKILL.md')
if not os.path.exists(skill_md):
continue
hook_type = _parse_hook_type(skill_md)
if hook_type != 'pre_prompt':
continue
# 执行hook错误时静默跳过
try:
content = await _execute_hook(skill_path, config)
if content:
hook_contents.append(content)
logger.info(f"Executed hook: {skill_name}")
except Exception as e:
logger.error(f"Hook execution failed for {skill_name}: {e}")
return "\n\n".join(hook_contents)
def _get_skill_dirs(bot_id: str) -> List[str]:
"""获取需要扫描的skill目录列表"""
dirs = []
# 官方skills目录
official_skills = "skills"
if os.path.exists(official_skills):
dirs.append(official_skills)
# 用户上传的skills目录和robot项目的skills目录
if bot_id:
robot_skills = f"projects/robot/{bot_id}/skills"
if os.path.exists(robot_skills):
dirs.append(robot_skills)
return dirs
def _parse_hook_type(skill_md_path: str) -> Optional[str]:
"""从SKILL.md解析hook类型"""
try:
with open(skill_md_path, 'r', encoding='utf-8') as f:
content = f.read()
frontmatter_match = re.match(r'^---\s*\n(.*?)\n---', content, re.DOTALL)
if not frontmatter_match:
return None
metadata = yaml.safe_load(frontmatter_match.group(1))
return metadata.get('hook') if isinstance(metadata, dict) else None
except Exception as e:
logger.warning(f"Failed to parse hook type from {skill_md_path}: {e}")
return None
async def _execute_hook(skill_path: str, config) -> Optional[str]:
"""
执行单个skill的hook
优先级: hooks/pre_prompt.py > frontmatter hook_content
"""
hook_py = os.path.join(skill_path, 'hooks', 'pre_prompt.py')
if os.path.exists(hook_py):
# 动态执行hooks/pre_prompt.py
return await _execute_hook_script(hook_py, config)
else:
# 从frontmatter读取hook内容
skill_md = os.path.join(skill_path, 'SKILL.md')
return _get_hook_content_from_frontmatter(skill_md)
async def _execute_hook_script(hook_py_path: str, config) -> Optional[str]:
"""执行hook脚本"""
import importlib.util
import sys
try:
# 动态加载hook模块
spec = importlib.util.spec_from_file_location("hook_module", hook_py_path)
if spec is None or spec.loader is None:
logger.warning(f"Failed to load hook module from {hook_py_path}")
return None
module = importlib.util.module_from_spec(spec)
sys.modules['hook_module'] = module
await asyncio.to_thread(spec.loader.exec_module, module)
# 调用execute函数传入config
if hasattr(module, 'execute'):
result = await asyncio.to_thread(module.execute, config)
return result if isinstance(result, str) else None
else:
logger.warning(f"Hook script {hook_py_path} missing 'execute' function")
return None
except Exception as e:
logger.error(f"Error executing hook script {hook_py_path}: {e}")
return None
def _get_hook_content_from_frontmatter(skill_md_path: str) -> Optional[str]:
"""从SKILL.md frontmatter获取hook内容"""
try:
with open(skill_md_path, 'r', encoding='utf-8') as f:
content = f.read()
frontmatter_match = re.match(r'^---\s*\n(.*?)\n---', content, re.DOTALL)
if not frontmatter_match:
return None
metadata = yaml.safe_load(frontmatter_match.group(1))
if isinstance(metadata, dict):
# 返回hook_content字段
return metadata.get('hook_content') or metadata.get('hook_content_template')
except Exception as e:
logger.warning(f"Failed to get hook content from {skill_md_path}: {e}")
return None
return None

View File

@ -124,6 +124,12 @@ async def enhanced_generate_stream_response(
# 发送最终chunk
final_chunk = create_stream_chunk(f"chatcmpl-{chunk_id + 1}", config.model_name, finish_reason="stop")
await output_queue.put(("agent", f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"))
# ============ 执行 PostAgent hooks ============
# 注意:这里在单独的异步任务中执行,不阻塞流式输出
full_response = "".join(full_response_content)
asyncio.create_task(_execute_post_agent_hooks(config, full_response))
# ===========================================
await output_queue.put(("agent_done", None))
except Exception as e:
@ -219,6 +225,11 @@ async def create_agent_and_generate_response(
# 使用更新后的 messages
agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS)
# ============ 执行 PostAgent hooks ============
# 注意这里在非流式模式下同步执行hooks
await _execute_post_agent_hooks(config, "")
# ===========================================
# 从后往前找第一个 HumanMessage之后的内容都给 append_messages
all_messages = agent_responses["messages"]
first_human_idx = None
@ -285,6 +296,7 @@ async def _save_user_messages(config: AgentConfig) -> None:
try:
from agent.chat_history_manager import get_chat_history_manager
from agent.plugin_hook_loader import execute_hooks
manager = get_chat_history_manager()
@ -294,6 +306,12 @@ async def _save_user_messages(config: AgentConfig) -> None:
role = msg.get("role", "")
content = msg.get("content", "")
if role == "user" and content:
# ============ 执行 PreSave hooks ============
processed_content = await execute_hooks('PreSave', config, content=content, role=role)
if processed_content:
content = processed_content
# ================================================
await manager.manager.save_message(
session_id=config.session_id,
role=role,
@ -326,9 +344,16 @@ async def _save_assistant_response(config: AgentConfig, assistant_response: str)
try:
from agent.chat_history_manager import get_chat_history_manager
from agent.plugin_hook_loader import execute_hooks
manager = get_chat_history_manager()
# ============ 执行 PreSave hooks ============
processed_response = await execute_hooks('PreSave', config, content=assistant_response, role='assistant')
if processed_response:
assistant_response = processed_response
# ================================================
# 保存 AI 助手的响应
await manager.manager.save_message(
session_id=config.session_id,
@ -344,6 +369,31 @@ async def _save_assistant_response(config: AgentConfig, assistant_response: str)
logger.error(f"Failed to save assistant response: {e}")
async def _execute_post_agent_hooks(config: AgentConfig, response: str) -> None:
"""
执行 PostAgent hooks在agent执行后
Args:
config: AgentConfig 对象
response: Agent 的完整响应内容
"""
try:
from agent.plugin_hook_loader import execute_hooks
metadata = {
"bot_id": config.bot_id,
"user_identifier": config.user_identifier,
"session_id": config.session_id,
"language": config.language,
}
await execute_hooks('PostAgent', config, response=response, metadata=metadata)
logger.debug(f"Executed PostAgent hooks for session_id={config.session_id}")
except Exception as e:
# hook执行失败不影响主流程
logger.error(f"Failed to execute PostAgent hooks: {e}")
@router.post("/api/v1/chat/completions")
async def chat_completions(request: ChatRequest, authorization: Optional[str] = Header(None)):
"""

View File

@ -206,7 +206,7 @@ async def validate_and_rename_skill_folder(
) -> str:
"""验证并重命名解压后的 skill 文件夹
检查解压后文件夹名称是否与 SKILL.md 中的 name 匹配
检查解压后文件夹名称是否与 skill metadata (plugin.json SKILL.md) 中的 name 匹配
如果不匹配则重命名文件夹
Args:
@ -222,10 +222,8 @@ async def validate_and_rename_skill_folder(
for folder_name in os.listdir(extract_dir):
folder_path = os.path.join(extract_dir, folder_name)
if os.path.isdir(folder_path):
skill_md_path = os.path.join(folder_path, 'SKILL.md')
if os.path.exists(skill_md_path):
metadata = await asyncio.to_thread(
parse_skill_frontmatter, skill_md_path
get_skill_metadata, folder_path
)
if metadata and 'name' in metadata:
expected_name = metadata['name']
@ -239,11 +237,9 @@ async def validate_and_rename_skill_folder(
)
return extract_dir
else:
# zip 直接包含文件,检查当前目录的 SKILL.md
skill_md_path = os.path.join(extract_dir, 'SKILL.md')
if os.path.exists(skill_md_path):
# zip 直接包含文件,检查当前目录的 metadata
metadata = await asyncio.to_thread(
parse_skill_frontmatter, skill_md_path
get_skill_metadata, extract_dir
)
if metadata and 'name' in metadata:
expected_name = metadata['name']
@ -275,6 +271,39 @@ async def save_upload_file_async(file: UploadFile, destination: str) -> None:
await f.write(chunk)
def parse_plugin_json(plugin_json_path: str) -> Optional[dict]:
"""Parse the plugin.json file for name and description
Args:
plugin_json_path: Path to the plugin.json file
Returns:
dict with 'name' and 'description' if found, None otherwise
"""
try:
import json
with open(plugin_json_path, 'r', encoding='utf-8') as f:
plugin_config = json.load(f)
if not isinstance(plugin_config, dict):
logger.warning(f"Invalid plugin.json format in {plugin_json_path}")
return None
# Return name and description if both exist
if 'name' in plugin_config and 'description' in plugin_config:
return {
'name': plugin_config['name'],
'description': plugin_config['description']
}
logger.warning(f"Missing name or description in {plugin_json_path}")
return None
except Exception as e:
logger.error(f"Error parsing {plugin_json_path}: {e}")
return None
def parse_skill_frontmatter(skill_md_path: str) -> Optional[dict]:
"""Parse the YAML frontmatter from SKILL.md file
@ -317,6 +346,32 @@ def parse_skill_frontmatter(skill_md_path: str) -> Optional[dict]:
return None
def get_skill_metadata(skill_path: str) -> Optional[dict]:
"""Get skill metadata, trying plugin.json first, then SKILL.md
Args:
skill_path: Path to the skill directory
Returns:
dict with 'name' and 'description' if found, None otherwise
"""
# Try plugin.json first
plugin_json_path = os.path.join(skill_path, '.claude-plugin', 'plugin.json')
if os.path.exists(plugin_json_path):
metadata = parse_plugin_json(plugin_json_path)
if metadata:
return metadata
# Fallback to SKILL.md
skill_md_path = os.path.join(skill_path, 'SKILL.md')
if os.path.exists(skill_md_path):
metadata = parse_skill_frontmatter(skill_md_path)
if metadata:
return metadata
return None
def get_official_skills(base_dir: str) -> List[SkillItem]:
"""Get all official skills from the skills directory
@ -340,9 +395,7 @@ def get_official_skills(base_dir: str) -> List[SkillItem]:
for skill_name in os.listdir(official_skills_dir):
skill_path = os.path.join(official_skills_dir, skill_name)
if os.path.isdir(skill_path):
skill_md_path = os.path.join(skill_path, 'SKILL.md')
if os.path.exists(skill_md_path):
metadata = parse_skill_frontmatter(skill_md_path)
metadata = get_skill_metadata(skill_path)
if metadata:
skills.append(SkillItem(
name=metadata['name'],
@ -374,9 +427,7 @@ def get_user_skills(base_dir: str, bot_id: str) -> List[SkillItem]:
for skill_name in os.listdir(user_skills_dir):
skill_path = os.path.join(user_skills_dir, skill_name)
if os.path.isdir(skill_path):
skill_md_path = os.path.join(skill_path, 'SKILL.md')
if os.path.exists(skill_md_path):
metadata = parse_skill_frontmatter(skill_md_path)
metadata = get_skill_metadata(skill_path)
if metadata:
skills.append(SkillItem(
name=metadata['name'],

View File

@ -0,0 +1,31 @@
{
"name": "user-context-loader",
"description": "用户上下文加载器示例 Skill。演示如何使用 Claude Plugins 模式的 hooks 机制在 agent 执行的不同阶段注入自定义逻辑。",
"hooks": {
"PrePrompt": [
{
"type": "command",
"command": "python hooks/pre_prompt.py"
}
],
"PostAgent": [
{
"type": "command",
"command": "python hooks/post_agent.py"
}
],
"PreSave": [
{
"type": "command",
"command": "python hooks/pre_save.py"
}
]
},
"mcpServers": {
"user-context-example": {
"command": "echo",
"args": ["Example MCP server for user context loader"],
"comment": "这是一个示例 MCP 配置,实际使用时替换为真实的 MCP 服务器"
}
}
}

View File

@ -0,0 +1,153 @@
# User Context Loader
用户上下文加载器示例 Skill演示 Claude Plugins 模式的 hooks 机制。
## 功能说明
本 Skill 演示了三种 Hook 类型:
### PrePrompt Hook
在 system_prompt 加载时执行,动态注入用户上下文信息。
- 文件: `hooks/pre_prompt.py`
- 用途: 查询用户信息、偏好设置、历史记录等,注入到 prompt 中
### PostAgent Hook
在 agent 执行完成后执行,用于后处理。
- 文件: `hooks/post_agent.py`
- 用途: 记录分析数据、触发异步任务、发送通知等
### PreSave Hook
在消息保存前执行,用于内容处理。
- 文件: `hooks/pre_save.py`
- 用途: 内容过滤、敏感信息脱敏、格式转换等
## 目录结构
```
user-context-loader/
├── README.md # Skill 说明文档
├── .claude-plugin/
│ └── plugin.json # Hook 和 MCP 配置文件
└── hooks/
├── pre_prompt.py # PrePrompt hook 脚本
├── post_agent.py # PostAgent hook 脚本
└── pre_save.py # PreSave hook 脚本
```
## plugin.json 格式
```json
{
"name": "user-context-loader",
"description": "用户上下文加载器示例 Skill",
"hooks": {
"PrePrompt": [
{
"type": "command",
"command": "python hooks/pre_prompt.py"
}
],
"PostAgent": [
{
"type": "command",
"command": "python hooks/post_agent.py"
}
],
"PreSave": [
{
"type": "command",
"command": "python hooks/pre_save.py"
}
]
},
"mcpServers": {
"server-name": {
"command": "node",
"args": ["path/to/server.js"],
"env": {
"API_KEY": "${API_KEY}"
}
}
}
}
```
## Hook 脚本格式
Hook 脚本通过子进程执行,通过环境变量接收参数,通过 stdout 返回结果。
### 可用环境变量
| 环境变量 | 说明 | 适用于 |
|---------|------|--------|
| `BOT_ID` | Bot ID | 所有 hook |
| `USER_IDENTIFIER` | 用户标识 | 所有 hook |
| `SESSION_ID` | 会话 ID | 所有 hook |
| `LANGUAGE` | 语言代码 | 所有 hook |
| `HOOK_TYPE` | Hook 类型 | 所有 hook |
| `CONTENT` | 消息内容 | PreSave |
| `ROLE` | 消息角色 | PreSave |
| `RESPONSE` | Agent 响应 | PostAgent |
| `METADATA` | 元数据 JSON | PostAgent |
### PrePrompt 示例
```python
#!/usr/bin/env python3
import os
import sys
def main():
user_identifier = os.environ.get('USER_IDENTIFIER', '')
bot_id = os.environ.get('BOT_ID', '')
# 输出要注入到 prompt 中的内容
print(f"## User Context\n\n用户: {user_identifier}")
return 0
if __name__ == '__main__':
sys.exit(main())
```
### PreSave 示例
```python
#!/usr/bin/env python3
import os
import sys
def main():
content = os.environ.get('CONTENT', '')
# 处理内容并输出
print(content) # 输出处理后的内容
return 0
if __name__ == '__main__':
sys.exit(main())
```
### PostAgent 示例
```python
#!/usr/bin/env python3
import os
import sys
def main():
response = os.environ.get('RESPONSE', '')
session_id = os.environ.get('SESSION_ID', '')
# 记录日志(输出到 stderr
print(f"Session {session_id}: Response length {len(response)}", file=sys.stderr)
return 0
if __name__ == '__main__':
sys.exit(main())
```
## 使用场景
1. **PrePrompt**: 用户登录时自动加载其偏好设置、历史订单等
2. **PostAgent**: 记录对话分析数据,触发后续业务流程
3. **PreSave**: 敏感信息脱敏后再存储,如手机号、邮箱等

View File

@ -1,33 +0,0 @@
---
name: user-context-loader
description: Load user context information (location, name, sensor_id) before agent execution
hook: pre_prompt
---
# User Context Loader
This skill automatically loads user context information before agent execution.
## Hook Function
The `pre_prompt` hook will:
1. Query user information by email/identifier
2. Retrieve location, name, sensor_id
3. Inject into system prompt
## Directory Structure
```
user-context-loader/
├── SKILL.md # This file
└── hooks/
└── pre_prompt.py # Hook script that executes before agent
```
## Usage
This skill is automatically executed when the bot has:
- A `bot_id` configured
- A `user_identifier` provided in the request
The hook will call the backend API to fetch user information and inject it into the system prompt, allowing the agent to answer questions about the user without needing to make additional API calls.

View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
"""
PostAgent Hook - 响应后处理示例
agent 执行完成后执行可用于记录分析数据触发后续流程等
"""
import os
import sys
def main():
"""从环境变量读取参数并处理"""
response = os.environ.get('RESPONSE', '')
metadata = os.environ.get('METADATA', '')
user_identifier = os.environ.get('USER_IDENTIFIER', '')
session_id = os.environ.get('SESSION_ID', '')
# 示例:记录响应长度用于分析
if response:
response_length = len(response)
print(f"PostAgent hook: User={user_identifier}, Session={session_id}, Response Length={response_length}", file=sys.stderr)
# 这里可以添加更多逻辑,例如:
# - 发送分析数据到监控系统
# - 触发异步任务(如发送通知邮件)
# - 记录用户行为用于个性化推荐
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -1,73 +1,34 @@
#!/usr/bin/env python3
"""
Hook script to load user context before agent execution
PrePrompt Hook - 用户上下文加载器示例
This script is executed automatically before the agent processes the user's message.
It fetches user information from the backend API and injects it into the system prompt.
system_prompt 加载时执行可以动态注入用户相关信息到 prompt
"""
import logging
import os
logger = logging.getLogger('app')
import sys
def execute(config) -> str:
def main():
"""从环境变量读取参数并输出注入内容"""
user_identifier = os.environ.get('USER_IDENTIFIER', '')
bot_id = os.environ.get('BOT_ID', '')
# 示例:根据 user_identifier 查询用户上下文
# 这里只是演示,实际应该从数据库或其他服务获取
if user_identifier:
context_info = f"""## User Context
用户标识: {user_identifier}
Bot ID: {bot_id}
> 此内容由 user-context-loader skill PrePrompt hook 注入
> 实际使用时可以在这里查询用户的位置偏好历史记录等信息
"""
Execute hook to load user context
print(context_info)
return 0
Args:
config: AgentConfig 对象
return 0
Returns:
str: Content to inject into system prompt
"""
try:
# 从config获取参数
user_identifier = getattr(config, 'user_identifier', '')
bot_id = getattr(config, 'bot_id', '')
# 如果没有user_identifier返回空
if not user_identifier:
return ""
# 这里可以调用后端API获取用户信息
# 示例代码需要根据实际的后端API调整
#
# import requests
# from utils.settings import BACKEND_HOST, MASTERKEY
#
# url = f"{BACKEND_HOST}/api/user/info"
# headers = {"Authorization": f"Bearer {MASTERKEY}"}
# params = {"identifier": user_identifier}
#
# response = requests.get(url, headers=headers, params=params, timeout=2)
# if response.status_code == 200:
# user_data = response.json()
# else:
# return ""
# 示例模拟从API获取的用户数据
# 实际使用时请替换为真实的API调用
user_data = {
'name': 'Test User',
'email': user_identifier,
'location': 'Tokyo',
'sensor_id': 'sensor-12345'
}
# 构建注入内容
context_lines = [
f"**User Information:**",
f"- Name: {user_data.get('name', 'Unknown')}",
f"- Email: {user_data.get('email', user_identifier)}",
f"- Location: {user_data.get('location', 'Unknown')}",
f"- Sensor ID: {user_data.get('sensor_id', 'Unknown')}",
]
logger.info(f"Loaded user context for {user_identifier}")
return "\n".join(context_lines)
except Exception as e:
logger.error(f"Failed to load user context: {e}")
return ""
return ""
if __name__ == '__main__':
sys.exit(main())

View File

@ -0,0 +1,38 @@
#!/usr/bin/env python3
"""
PreSave Hook - 消息保存前处理示例
在消息保存到数据库前执行可用于内容过滤敏感信息脱敏等
"""
import os
import sys
def main():
"""从环境变量读取参数并处理"""
content = os.environ.get('CONTENT', '')
role = os.environ.get('ROLE', '')
# 示例:可以在这里添加敏感信息脱敏逻辑
# 例如:移除电话号码、邮箱等敏感信息
# 输出处理后的内容(如果不需要修改则输出原始内容)
print(content)
# 这里只是记录日志,不修改内容
if content:
# 示例:脱敏处理(注释掉,实际使用时根据需求启用)
# import re
# processed = content
# # 简单的手机号脱敏
# processed = re.sub(r'1[3-9]\d{9}', '[PHONE]', processed)
# # 邮箱脱敏
# processed = re.sub(r'\b[\w.-]+@[\w.-]+\.\w+\b', '[EMAIL]', processed)
# print(processed)
pass
return 0
if __name__ == '__main__':
sys.exit(main())