qwen_agent/agent/prompt_loader.py
2026-05-07 19:36:27 +08:00

317 lines
13 KiB
Python

#!/usr/bin/env python3
"""
System prompt and MCP settings loader utilities
"""
import os
import json
import asyncio
from typing import List, Dict, Optional, Any
from datetime import datetime, timezone, timedelta
import logging
from utils.settings import BACKEND_HOST, MASTERKEY, DAYTONA_ENABLED
logger = logging.getLogger('app')
from .plugin_hook_loader import execute_hooks, merge_skill_mcp_configs
from pathlib import Path
def format_datetime_by_language(language: str) -> str:
"""
Format the current time string by language, using UTC as the base time and calculating each timezone accordingly.
Args:
language: Language code, such as 'zh', 'en', 'ja', or 'jp'.
Returns:
str: A formatted time string that includes timezone information.
"""
try:
# Get the current UTC time.
utc_now = datetime.now(timezone.utc)
# Define the language-to-timezone mapping.
language_timezone_map = {
'zh': {'offset': 8, 'name': 'CST', 'display': '北京时间'},
'ja': {'offset': 9, 'name': 'JST', 'display': '日本時間'},
'jp': {'offset': 9, 'name': 'JST', 'display': '日本時間'},
'en': {'offset': 0, 'name': 'UTC', 'display': 'UTC'}, # Default to UTC for English users.
# Other languages can be extended here.
}
# Get the timezone information for the language, defaulting to UTC.
tz_info = language_timezone_map.get(language, language_timezone_map['en'])
offset_hours = tz_info['offset']
tz_name = tz_info['name']
# Compute local time.
local_time = utc_now + timedelta(hours=offset_hours)
# Format time according to language.
if language == 'zh':
# Chinese format: 2024年1月15日 14:30 (UTC+8 北京时间)
return local_time.strftime("%Y年%m月%d%H:%M") + f" (UTC{offset_hours:+d} {tz_info['display']})"
elif language in ['ja', 'jp']:
# Japanese format: 2024年1月15日 14:30 (JST UTC+9)
return local_time.strftime("%Y年%m月%d%H:%M") + f" ({tz_name} UTC{offset_hours:+d})"
elif language == 'en':
# English format: January 15, 2024 14:30 EST (UTC-5)
return local_time.strftime("%B %d, %Y %H:%M") + f" {tz_name} (UTC{offset_hours:+d})"
else:
# Default format: 2024-01-15 14:30:30 (timezone)
return local_time.strftime("%Y-%m-%d %H:%M:%S") + f" (UTC{offset_hours:+d})"
except Exception as e:
# If timezone processing fails, fall back to UTC time.
utc_now = datetime.now(timezone.utc)
if language == 'zh':
return utc_now.strftime("%Y年%m月%d%H:%M") + " UTC"
elif language in ['ja', 'jp']:
return utc_now.strftime("%Y年%m月%d%H:%M") + " UTC"
elif language == 'en':
return utc_now.strftime("%B %d, %Y %H:%M") + " UTC"
else:
return utc_now.strftime("%Y-%m-%d %H:%M:%S") + " UTC"
async def load_system_prompt_async(config) -> str:
"""Async version of the system prompt loader.
Args:
config: AgentConfig object containing all initialization parameters.
Returns:
str: The loaded system prompt content.
"""
from agent.config_cache import config_cache
# Get parameters from config.
project_dir = getattr(config, 'project_dir', None)
language = getattr(config, 'language', None)
system_prompt = getattr(config, 'system_prompt', None)
user_identifier = getattr(config, 'user_identifier', '')
trace_id = getattr(config, 'trace_id', '')
# Get the display name of the language.
language_display_map = {
'zh': '中文',
'en': 'English',
'ja': '日本語',
'jp': '日本語'
}
language_display = language_display_map.get(language, language if language else 'English')
# Get the formatted time string.
datetime_str = format_datetime_by_language(language) if language else format_datetime_by_language('en')
system_prompt_default = ""
try:
# Read the default prompt file through the cache.
default_prompt_file = os.path.join("prompt", f"system_prompt.md")
system_prompt_default = await config_cache.get_text_file(default_prompt_file)
if system_prompt_default:
logger.info(f"Using cached default system prompt ")
except Exception as e:
logger.error(f"Failed to load default system prompt: {str(e)}")
system_prompt_default = ""
readme = ""
# Only try to read README.md when project_dir is not None.
if project_dir is not None:
readme_path = os.path.join(project_dir, "README.md")
readme = await config_cache.get_text_file(readme_path) or ""
# ============ Execute PrePrompt hooks ============
hook_content = await execute_hooks('PrePrompt', config)
# agent_dir_path is effectively mapped to project_dir; this path is mainly shown to the AI.
agent_dir_path = "/workspace" if DAYTONA_ENABLED else f"{Path.cwd()}/projects/robot/{config.bot_id}"
prompt = system_prompt_default.format(
readme=str(readme),
extra_prompt=system_prompt or "",
language=language_display,
user_identifier=user_identifier,
datetime=datetime_str,
agent_dir_path=agent_dir_path,
trace_id=trace_id or "",
hook_content=f"# Context from Skills\n\n{hook_content}" if hook_content else ""
)
return prompt or ""
def replace_mcp_placeholders(mcp_settings: List[Dict], dataset_dir: str, bot_id: str, dataset_ids: List[str], shell_env: Optional[Dict[str, str]] = None) -> List[Dict]:
"""
Replace placeholders in MCP configuration.
Supported placeholder sources, from highest to lowest priority:
1. Built-in variables: {dataset_dir}, {bot_id}, {dataset_ids}
2. Custom environment variables in shell_env
3. System environment variables (os.environ)
"""
if not mcp_settings or not isinstance(mcp_settings, list):
return mcp_settings
dataset_id_str = ','.join(dataset_ids) if dataset_ids else ''
# Build the placeholder mapping: system env < shell_env < built-in variables.
import re
placeholders = {}
placeholders.update(os.environ)
if shell_env:
placeholders.update(shell_env)
placeholders.update({
'dataset_dir': dataset_dir,
'bot_id': bot_id,
'dataset_ids': dataset_id_str,
})
def _safe_format(s: str) -> str:
"""Safely replace placeholders in a string while leaving unmatched placeholders unchanged."""
try:
def _replacer(match):
key = match.group(1)
return placeholders.get(key, match.group(0))
return re.sub(r'\{(\w+)\}', _replacer, s)
except Exception:
return s
def replace_placeholders_in_obj(obj):
"""Recursively replace placeholders inside an object."""
if isinstance(obj, dict):
for key, value in obj.items():
if key == 'args' and isinstance(value, list):
obj[key] = [_safe_format(item) if isinstance(item, str) else item
for item in value]
elif isinstance(value, (dict, list)):
obj[key] = replace_placeholders_in_obj(value)
elif isinstance(value, str):
obj[key] = _safe_format(value)
elif isinstance(obj, list):
return [replace_placeholders_in_obj(item) if isinstance(item, (dict, list)) else
_safe_format(item) if isinstance(item, str) else item
for item in obj]
return obj
return replace_placeholders_in_obj(mcp_settings)
async def load_mcp_settings_async(config) -> List[Dict]:
"""Async version of the MCP settings loader.
Args:
config: AgentConfig object containing all initialization parameters.
Returns:
List[Dict]: The merged list of MCP settings.
Note:
Supports using the {dataset_dir} placeholder in args within the incoming
or merged mcp_settings, which will be replaced with the actual path in
init_modified_agent_service_with_files.
"""
# Get parameters from config.
project_dir = getattr(config, 'project_dir', None)
mcp_settings = getattr(config, 'mcp_settings', None)
bot_id = getattr(config, 'bot_id', '')
dataset_ids = getattr(config, 'dataset_ids', [])
# 1. ============ First merge plugin.json configurations from the skill directory (no cache, so changes take effect immediately) ============
skill_mcp_settings = await merge_skill_mcp_configs(bot_id)
merged_settings = []
if skill_mcp_settings and len(skill_mcp_settings) > 0:
merged_settings = skill_mcp_settings.copy()
skill_mcp_servers = skill_mcp_settings[0].get('mcpServers', {})
logger.info(f"Loaded {len(skill_mcp_servers)} MCP servers from skills")
# ===============================================================================================================================
if merged_settings and len(merged_settings) > 0:
mcp_servers = merged_settings[0].get('mcpServers', {})
for server_name, server_config in mcp_servers.items():
if isinstance(server_config, dict) and 'command' in server_config:
# Create the env field if it does not exist yet.
if 'env' not in server_config:
server_config['env'] = {}
# Add required environment variables.
server_config['env']['BACKEND_HOST'] = BACKEND_HOST
server_config['env']['MASTERKEY'] = MASTERKEY
# 4. Process the incoming mcp_settings parameter (highest priority, overrides everything).
input_mcp_settings = []
if mcp_settings is not None:
if isinstance(mcp_settings, list):
input_mcp_settings = mcp_settings
elif mcp_settings:
input_mcp_settings = [mcp_settings]
logger.warning(f"Warning: mcp_settings is not a list, converting to list format")
# 5. Merge user-provided mcp_settings.
if input_mcp_settings and len(input_mcp_settings) > 0 and len(merged_settings) > 0:
merged_mcp_servers = merged_settings[0].get('mcpServers', {})
input_mcp_servers = input_mcp_settings[0].get('mcpServers', {})
# Merge mcpServers objects, with incoming settings overriding existing ones.
merged_mcp_servers.update(input_mcp_servers)
merged_settings[0]['mcpServers'] = merged_mcp_servers
logger.info(f"Merged mcpServers: existing + {len(input_mcp_servers)} input servers")
elif input_mcp_settings and not merged_settings:
# If there is no other configuration, use the incoming settings directly.
merged_settings = input_mcp_settings.copy()
# Ensure the return value is always in list format.
if not merged_settings:
merged_settings = []
elif not isinstance(merged_settings, list):
logger.warning(f"Warning: merged_settings is not a list, converting to list format")
merged_settings = [merged_settings] if merged_settings else []
# Compute dataset_dir for replacing placeholders in MCP configuration.
# Only compute dataset_dir when project_dir is not None.
dataset_dir = os.path.join(project_dir, "datasets") if project_dir is not None else None
# Replace the {dataset_dir} placeholder in MCP configuration.
if dataset_dir is None:
dataset_dir = ""
shell_env = getattr(config, 'shell_env', None) or {}
merged_settings = replace_mcp_placeholders(merged_settings, dataset_dir, bot_id, dataset_ids, shell_env)
return merged_settings
def load_guideline_prompt(chat_history:str, memory_text: str, guidelines_text: str, tools: str, scenarios: str, language: str, user_identifier: str = "") -> str:
"""
Load and process the guideline prompt.
Args:
chat_history: Chat history.
memory_text: Memory text.
guidelines_text: Guideline text.
tools: Tool description text.
scenarios: Scenario description text.
language: Language code, such as 'zh', 'en', or 'jp'.
user_identifier: User identifier. Defaults to an empty string.
Returns:
str: The processed guideline prompt.
"""
guideline_template_file = os.path.join("prompt", "guideline_prompt.md")
with open(guideline_template_file, 'r', encoding='utf-8') as f:
guideline_template = f.read()
# Get the language display text.
language_display_map = {
'zh': '中文',
'en': 'English',
'ja': '日本語',
'jp': '日本語'
}
language_display = language_display_map.get(language, language if language else 'English')
datetime_str = format_datetime_by_language(language) if language else format_datetime_by_language('en')
# Replace placeholders in the template.
system_prompt = guideline_template.format(
chat_history=chat_history,
guidelines_text=guidelines_text,
tools=tools,
scenarios=scenarios,
language=language_display,
user_identifier=user_identifier,
datetime=datetime_str,
memory_text=memory_text
)
return system_prompt