qwen_agent/agent/guideline_middleware.py
朱潮 425f3c5bb4 chore: replace Chinese comments and log messages with English
Convert all Chinese comments, docstrings, logger/print output,
HTTPException detail messages, and API response messages to English
across the entire codebase. Functional zh/ja localized strings
(e.g. prompt templates, timezone display names, date formats) are
preserved as-is.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-30 19:45:35 +08:00

181 lines
8.6 KiB
Python

from langchain.agents.middleware import AgentState, AgentMiddleware, ModelRequest, ModelResponse
from langchain_core.messages import convert_to_openai_messages
from agent.prompt_loader import load_guideline_prompt
from utils.fastapi_utils import (extract_block_from_system_prompt, format_messages_to_chat_history, get_user_last_message_content)
from langchain.chat_models import BaseChatModel
from langgraph.runtime import Runtime
from langchain_core.messages import SystemMessage, HumanMessage
from typing import Any, Callable
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from .agent_config import AgentConfig
import logging
import re
logger = logging.getLogger('app')
class GuidelineMiddleware(AgentMiddleware):
def __init__(self, model:BaseChatModel, config:AgentConfig, prompt: str):
self.model = model
self.config = config # Preserve the full config to access _mem0_context
self.bot_id = config.bot_id
processed_system_prompt, guidelines, tool_description, scenarios, terms_list = extract_block_from_system_prompt(prompt)
self.processed_system_prompt = processed_system_prompt
self.guidelines = guidelines
self.tool_description = tool_description
self.scenarios = scenarios
self.language = config.language
self.user_identifier = config.user_identifier
self.terms_list = terms_list
self.messages = config.messages
if not self.guidelines:
self.guidelines = """
1. General Inquiries
Condition: User inquiries about products, policies, troubleshooting, factual questions, definitions, workflows, data lookups, or other knowledge-seeking requests.
Action: First choose the most suitable 【Knowledge Base Retrieval】 tool by scenario. Use table_rag_retrieve first for structured data, lists, statistics, comparisons, extraction, mixed requests, or unclear cases. Use rag_retrieve first only for clearly pure concept / definition / workflow / policy explanation questions. If the first retrieval result is empty, errored, irrelevant, or only partially answers the request, call the other retrieval tool before replying. Only reply that no relevant information was found after both retrieval tools have been tried and still provide no sufficient evidence.
2.Social Dialogue
Condition: User intent involves small talk, greetings, expressions of thanks, compliments, or other non-substantive conversations.
Action: Provide concise, friendly, and personified natural responses.
"""
if not self.tool_description:
self.tool_description = """
- **Knowledge Base Retrieval**: Choose retrieval order by scenario. Default to `table_rag_retrieve -> rag_retrieve` for structured, list, mixed, or unclear requests. Use `rag_retrieve -> table_rag_retrieve` only for clearly pure concept or workflow questions. Do not answer with "no result" until both tools have been tried when retrieval is needed.
"""
def get_guideline_prompt(self, config: AgentConfig) -> str:
"""Generate the guideline prompt.
Args:
config: AgentConfig object containing _session_history and _mem0_context
Returns:
str: The generated guideline prompt
"""
messages = convert_to_openai_messages(config._session_history)
memory_text = config._mem0_context
# Process terms and update self.processed_system_prompt
self.get_term_analysis(messages)
guideline_prompt = ""
if self.guidelines:
chat_history = format_messages_to_chat_history(messages)
guideline_prompt = load_guideline_prompt(chat_history, memory_text, self.guidelines, self.tool_description, self.scenarios, self.language, self.user_identifier)
return guideline_prompt
def get_term_analysis(self, messages: list[dict[str, Any]]) -> str:
## Process terms
terms_analysis = ""
if self.terms_list:
logger.info(f"Processing terms: {len(self.terms_list)} terms")
try:
from embedding.embedding import process_terms_with_embedding
query_text = get_user_last_message_content(messages)
terms_analysis = process_terms_with_embedding(terms_list, self.bot_id, query_text)
if terms_analysis:
self.processed_system_prompt = self.processed_system_prompt.replace("#terms#", terms_analysis)
logger.info(f"Terms analysis completed: {len(terms_analysis)} chars")
except Exception as e:
logger.error(f"Error processing terms with embedding: {e}")
terms_analysis = ""
else:
# When terms_list is empty, delete the corresponding pkl cache file
try:
import os
cache_file = f"projects/cache/{self.bot_id}_terms.pkl"
if os.path.exists(cache_file):
os.remove(cache_file)
logger.info(f"Removed empty terms cache file: {cache_file}")
except Exception as e:
logger.error(f"Error removing terms cache file: {e}")
return terms_analysis
def before_agent(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
if not self.guidelines:
return None
guideline_prompt = self.get_guideline_prompt(self.config)
# Prepare the full message list
messages = state['messages'].copy()
# Add guideline_prompt to the message list as a system message
system_message = SystemMessage(content=guideline_prompt)
messages = [system_message,messages[-1]]
# Call the model using the callback handler
response = self.model.invoke(
messages,
config={"metadata": {"message_tag": "THINK"}}
)
response.additional_kwargs["message_tag"] = "THINK"
response.content = f"<think>{response.content}</think>"
# Add the response to the original message list and append a HumanMessage to ensure the last message is from the user
# Some models do not support assistant message prefilling and require the last message to be from the user
state['messages'] = state['messages'] + [response, HumanMessage(content=self._get_follow_up_prompt())]
return state
async def abefore_agent(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
if not self.guidelines:
return None
# Prepare the full message list
messages = state['messages'].copy()
guideline_prompt = self.get_guideline_prompt(self.config)
# Add guideline_prompt to the message list as a system message
system_message = SystemMessage(content=guideline_prompt)
messages = [system_message,messages[-1]]
# Call the model using the callback handler
response = await self.model.ainvoke(
messages,
config={"metadata": {"message_tag": "THINK"}}
)
response.additional_kwargs["message_tag"] = "THINK"
response.content = f"<think>{response.content}</think>"
# Add the response to the original message list and append a HumanMessage to ensure the last message is from the user
# Some models do not support assistant message prefilling and require the last message to be from the user
state['messages'] = state['messages'] + [response, HumanMessage(content=self._get_follow_up_prompt())]
return state
def _get_follow_up_prompt(self) -> str:
"""Return a prompt that guides the main agent response based on language."""
prompts = {
"ja": "以上の分析に基づいて、ユーザーに返信してください。",
"jp": "以上の分析に基づいて、ユーザーに返信してください。",
"zh": "请根据以上分析,回复用户。",
"zh-TW": "請根據以上分析,回覆用戶。",
"ko": "위 분석을 바탕으로 사용자에게 답변해 주세요.",
"en": "Based on the above analysis, please respond to the user.",
}
return prompts.get(self.language, prompts["en"])
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
return handler(request.override(system_prompt=self.processed_system_prompt))
async def awrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
return await handler(request.override(system_prompt=self.processed_system_prompt))