""" Hook loader for Claude Plugins mode. Supports configuring hooks and mcpServers through .claude-plugin/plugin.json. """ import os import json import copy import logging import asyncio import subprocess from typing import List, Dict, Optional, Any logger = logging.getLogger('app') # Hook type definitions HOOK_TYPES = { 'PrePrompt': 'Inject content when loading the system prompt', 'PostAgent': 'Process output after agent execution', 'PreSave': 'Process content before saving messages', 'PreMemoryPrompt': 'Inject content when loading the memory extraction prompt', } async def execute_hooks(hook_type: str, config, **kwargs) -> Any: """ Execute all hooks of the specified type. Args: hook_type: Hook type (PrePrompt, PostAgent, PreSave) config: AgentConfig object **kwargs: Hook-specific parameters - PrePrompt: no extra parameters, returns str - PostAgent: response (str), metadata (dict), returns None - PreSave: content (str), role (str), returns str Returns: - PrePrompt: str (injected content) - PostAgent: None - PreSave: str (processed content) """ hook_results = [] bot_id = getattr(config, 'bot_id', '') skill_dirs = _get_skill_dirs(bot_id) for skill_dir in skill_dirs: if not os.path.exists(skill_dir): continue # Traverse each subdirectory under the skill directory for skill_name in os.listdir(skill_dir): skill_path = os.path.join(skill_dir, skill_name) if not os.path.isdir(skill_path): continue plugin_json = os.path.join(skill_path, '.claude-plugin', 'plugin.json') if not os.path.exists(plugin_json): continue try: plugin_config = _load_plugin_config(plugin_json) hooks = plugin_config.get('hooks', {}).get(hook_type, []) for hook_config in hooks: if hook_config.get('type') == 'command': command = hook_config.get('command') if command: # Run the command inside the skill directory result = await _execute_command( skill_path, command, hook_type, config, **kwargs ) if result: hook_results.append(result) logger.info(f"Executed {hook_type} hook from {skill_name}") except Exception as e: logger.error(f"Failed to load hooks from {plugin_json}: {e}") # Return values based on hook type if hook_type in ('PrePrompt', 'PreMemoryPrompt'): return "\n\n".join(hook_results) elif hook_type == 'PreSave': # PreSave returns processed content # If any hook returned content, use the last hook result # Otherwise return the original content return hook_results[-1] if hook_results else kwargs.get('content', '') return None async def merge_skill_mcp_configs(bot_id: str) -> List[Dict]: """ Read and merge mcpServers from plugin.json in all skill directories. Args: bot_id: Bot ID Returns: List[Dict]: Merged MCP settings list """ skill_dirs = _get_skill_dirs(bot_id) merged_servers = {} for skill_dir in skill_dirs: if not os.path.exists(skill_dir): continue for skill_name in os.listdir(skill_dir): skill_path = os.path.join(skill_dir, skill_name) if not os.path.isdir(skill_path): continue plugin_json = os.path.join(skill_path, '.claude-plugin', 'plugin.json') if os.path.exists(plugin_json): try: with open(plugin_json, 'r', encoding='utf-8') as f: plugin_config = json.load(f) servers = plugin_config.get('mcpServers', {}) if servers: normalized_servers = _normalize_skill_mcp_servers(servers, skill_path) merged_servers.update(normalized_servers) logger.info(f"Loaded MCP config from skill: {skill_name}") except Exception as e: logger.error(f"Failed to load mcpServers from {skill_name}: {e}") if merged_servers: return [{"mcpServers": merged_servers}] return [] def collect_main_agent_hidden_tools(bot_id: str) -> set: """Collect tool names that must be hidden from the main agent. Scans every skill's plugin.json for a top-level "mainAgentHiddenTools" list and merges them into a single set. These tools are removed from the main agent's tool list but remain available to sub-agents. Args: bot_id: Bot ID Returns: set[str]: Union of all hidden tool names. Empty set if none configured. """ hidden_tools = set() skill_dirs = _get_skill_dirs(bot_id) for skill_dir in skill_dirs: if not os.path.exists(skill_dir): continue for skill_name in os.listdir(skill_dir): skill_path = os.path.join(skill_dir, skill_name) if not os.path.isdir(skill_path): continue plugin_json = os.path.join(skill_path, '.claude-plugin', 'plugin.json') if not os.path.exists(plugin_json): continue try: plugin_config = _load_plugin_config(plugin_json) names = plugin_config.get('mainAgentHiddenTools', []) if isinstance(names, list): for name in names: if isinstance(name, str) and name.strip(): hidden_tools.add(name.strip()) else: logger.warning( f"Invalid 'mainAgentHiddenTools' in {skill_name}, expected list" ) except Exception as e: logger.error(f"Failed to load mainAgentHiddenTools from {skill_name}: {e}") return hidden_tools def _normalize_skill_mcp_servers(servers: Dict[str, Any], skill_path: str) -> Dict[str, Any]: """Normalize relative paths in stdio MCP servers to absolute paths based on the skill directory.""" normalized_servers = copy.deepcopy(servers) for server_name, server_config in normalized_servers.items(): if not isinstance(server_config, dict): continue transport = server_config.get('transport') if not transport: transport = 'http' if 'url' in server_config else 'stdio' if transport != 'stdio': continue command = server_config.get('command') if isinstance(command, str): server_config['command'] = _resolve_skill_relative_path(command, skill_path) args = server_config.get('args') if isinstance(args, list): server_config['args'] = [ _resolve_skill_relative_path(arg, skill_path) if isinstance(arg, str) else arg for arg in args ] return normalized_servers def _resolve_skill_relative_path(value: str, skill_path: str) -> str: """Convert placeholder-free paths starting with ./ or ../ into absolute paths based on the skill directory.""" if '{' in value or '}' in value: return value if not value.startswith(('./', '../')): return value normalized_path = os.path.abspath(os.path.join(skill_path, value)) logger.debug(f"Resolved skill MCP path: {value} -> {normalized_path}") return normalized_path def _load_plugin_config(plugin_json_path: str) -> Dict: """Load plugin.json configuration.""" try: with open(plugin_json_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"Failed to load plugin.json: {e}") return {} def _get_skill_dirs(bot_id: str) -> List[str]: """Get the list of skill directories that should be scanned.""" dirs = [] # Skills directory uploaded by the user if bot_id: robot_skills = f"projects/robot/{bot_id}/skills" if os.path.exists(robot_skills): dirs.append(robot_skills) return dirs async def _execute_command(skill_path: str, command: str, hook_type: str, config, **kwargs) -> Optional[str]: """Execute a hook command. Args: skill_path: Skill directory path used as the working directory command: Command to execute hook_type: Hook type config: AgentConfig object **kwargs: Extra parameters Returns: str: Command stdout output """ try: # Set environment variables and pass them to the subprocess # subprocess requires all env values to be strings. # getattr may return None when the attribute exists but its value is None, # so we must ensure every value is converted to str. env = os.environ.copy() env['ASSISTANT_ID'] = str(getattr(config, 'bot_id', '')) env['USER_IDENTIFIER'] = str(getattr(config, 'user_identifier', '')) env['TRACE_ID'] = str(getattr(config, 'trace_id', '')) env['ENABLE_SELF_KNOWLEDGE'] = str(getattr(config, 'enable_self_knowledge', False)).lower() env['SESSION_ID'] = str(getattr(config, 'session_id', '')) env['LANGUAGE'] = str(getattr(config, 'language', '')) env['HOOK_TYPE'] = hook_type # Merge custom shell environment variables from config shell_env = getattr(config, 'shell_env', None) if shell_env: # Ensure all custom environment variable values are also strings env.update({k: str(v) if v is not None else '' for k, v in shell_env.items()}) # For PreSave, pass content if hook_type == 'PreSave': env['CONTENT'] = str(kwargs.get('content', '') or '') env['ROLE'] = str(kwargs.get('role', '') or '') # For PostAgent, pass response if hook_type == 'PostAgent': env['RESPONSE'] = str(kwargs.get('response', '') or '') metadata = kwargs.get('metadata', {}) env['METADATA'] = json.dumps(metadata) if metadata else '' # Execute the command with subprocess and capture stdout process = await asyncio.create_subprocess_shell( command, cwd=skill_path, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = await process.communicate() if stdout: result = stdout.decode('utf-8').strip() return result if stderr and process.returncode != 0: logger.warning(f"Hook command stderr: {stderr.decode('utf-8')}") return None except Exception as e: logger.error(f"Error executing hook command '{command}': {e}") return None