Convert all Chinese comments, docstrings, logger/print output, HTTPException detail messages, and API response messages to English across the entire codebase. Functional zh/ja localized strings (e.g. prompt templates, timezone display names, date formats) are preserved as-is. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
181 lines
8.6 KiB
Python
181 lines
8.6 KiB
Python
from langchain.agents.middleware import AgentState, AgentMiddleware, ModelRequest, ModelResponse
|
|
from langchain_core.messages import convert_to_openai_messages
|
|
from agent.prompt_loader import load_guideline_prompt
|
|
from utils.fastapi_utils import (extract_block_from_system_prompt, format_messages_to_chat_history, get_user_last_message_content)
|
|
from langchain.chat_models import BaseChatModel
|
|
from langgraph.runtime import Runtime
|
|
|
|
from langchain_core.messages import SystemMessage, HumanMessage
|
|
from typing import Any, Callable
|
|
from langchain_core.callbacks import BaseCallbackHandler
|
|
from langchain_core.outputs import LLMResult
|
|
from .agent_config import AgentConfig
|
|
import logging
|
|
import re
|
|
|
|
logger = logging.getLogger('app')
|
|
|
|
|
|
class GuidelineMiddleware(AgentMiddleware):
|
|
def __init__(self, model:BaseChatModel, config:AgentConfig, prompt: str):
|
|
self.model = model
|
|
self.config = config # Preserve the full config to access _mem0_context
|
|
self.bot_id = config.bot_id
|
|
|
|
processed_system_prompt, guidelines, tool_description, scenarios, terms_list = extract_block_from_system_prompt(prompt)
|
|
|
|
self.processed_system_prompt = processed_system_prompt
|
|
self.guidelines = guidelines
|
|
self.tool_description = tool_description
|
|
self.scenarios = scenarios
|
|
|
|
self.language = config.language
|
|
self.user_identifier = config.user_identifier
|
|
|
|
self.terms_list = terms_list
|
|
self.messages = config.messages
|
|
|
|
if not self.guidelines:
|
|
self.guidelines = """
|
|
1. General Inquiries
|
|
Condition: User inquiries about products, policies, troubleshooting, factual questions, definitions, workflows, data lookups, or other knowledge-seeking requests.
|
|
Action: First choose the most suitable 【Knowledge Base Retrieval】 tool by scenario. Use table_rag_retrieve first for structured data, lists, statistics, comparisons, extraction, mixed requests, or unclear cases. Use rag_retrieve first only for clearly pure concept / definition / workflow / policy explanation questions. If the first retrieval result is empty, errored, irrelevant, or only partially answers the request, call the other retrieval tool before replying. Only reply that no relevant information was found after both retrieval tools have been tried and still provide no sufficient evidence.
|
|
|
|
2.Social Dialogue
|
|
Condition: User intent involves small talk, greetings, expressions of thanks, compliments, or other non-substantive conversations.
|
|
Action: Provide concise, friendly, and personified natural responses.
|
|
"""
|
|
if not self.tool_description:
|
|
self.tool_description = """
|
|
- **Knowledge Base Retrieval**: Choose retrieval order by scenario. Default to `table_rag_retrieve -> rag_retrieve` for structured, list, mixed, or unclear requests. Use `rag_retrieve -> table_rag_retrieve` only for clearly pure concept or workflow questions. Do not answer with "no result" until both tools have been tried when retrieval is needed.
|
|
"""
|
|
|
|
def get_guideline_prompt(self, config: AgentConfig) -> str:
|
|
"""Generate the guideline prompt.
|
|
|
|
Args:
|
|
config: AgentConfig object containing _session_history and _mem0_context
|
|
|
|
Returns:
|
|
str: The generated guideline prompt
|
|
"""
|
|
messages = convert_to_openai_messages(config._session_history)
|
|
memory_text = config._mem0_context
|
|
|
|
# Process terms and update self.processed_system_prompt
|
|
self.get_term_analysis(messages)
|
|
|
|
guideline_prompt = ""
|
|
if self.guidelines:
|
|
chat_history = format_messages_to_chat_history(messages)
|
|
guideline_prompt = load_guideline_prompt(chat_history, memory_text, self.guidelines, self.tool_description, self.scenarios, self.language, self.user_identifier)
|
|
|
|
return guideline_prompt
|
|
|
|
def get_term_analysis(self, messages: list[dict[str, Any]]) -> str:
|
|
## Process terms
|
|
terms_analysis = ""
|
|
if self.terms_list:
|
|
logger.info(f"Processing terms: {len(self.terms_list)} terms")
|
|
try:
|
|
from embedding.embedding import process_terms_with_embedding
|
|
query_text = get_user_last_message_content(messages)
|
|
terms_analysis = process_terms_with_embedding(terms_list, self.bot_id, query_text)
|
|
if terms_analysis:
|
|
self.processed_system_prompt = self.processed_system_prompt.replace("#terms#", terms_analysis)
|
|
logger.info(f"Terms analysis completed: {len(terms_analysis)} chars")
|
|
except Exception as e:
|
|
logger.error(f"Error processing terms with embedding: {e}")
|
|
terms_analysis = ""
|
|
else:
|
|
# When terms_list is empty, delete the corresponding pkl cache file
|
|
try:
|
|
import os
|
|
cache_file = f"projects/cache/{self.bot_id}_terms.pkl"
|
|
if os.path.exists(cache_file):
|
|
os.remove(cache_file)
|
|
logger.info(f"Removed empty terms cache file: {cache_file}")
|
|
except Exception as e:
|
|
logger.error(f"Error removing terms cache file: {e}")
|
|
return terms_analysis
|
|
|
|
|
|
|
|
def before_agent(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
|
|
if not self.guidelines:
|
|
return None
|
|
|
|
guideline_prompt = self.get_guideline_prompt(self.config)
|
|
# Prepare the full message list
|
|
messages = state['messages'].copy()
|
|
|
|
# Add guideline_prompt to the message list as a system message
|
|
system_message = SystemMessage(content=guideline_prompt)
|
|
|
|
messages = [system_message,messages[-1]]
|
|
|
|
# Call the model using the callback handler
|
|
response = self.model.invoke(
|
|
messages,
|
|
config={"metadata": {"message_tag": "THINK"}}
|
|
)
|
|
|
|
response.additional_kwargs["message_tag"] = "THINK"
|
|
response.content = f"<think>{response.content}</think>"
|
|
|
|
# Add the response to the original message list and append a HumanMessage to ensure the last message is from the user
|
|
# Some models do not support assistant message prefilling and require the last message to be from the user
|
|
state['messages'] = state['messages'] + [response, HumanMessage(content=self._get_follow_up_prompt())]
|
|
return state
|
|
|
|
async def abefore_agent(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
|
|
if not self.guidelines:
|
|
return None
|
|
# Prepare the full message list
|
|
messages = state['messages'].copy()
|
|
|
|
guideline_prompt = self.get_guideline_prompt(self.config)
|
|
|
|
# Add guideline_prompt to the message list as a system message
|
|
system_message = SystemMessage(content=guideline_prompt)
|
|
messages = [system_message,messages[-1]]
|
|
|
|
# Call the model using the callback handler
|
|
response = await self.model.ainvoke(
|
|
messages,
|
|
config={"metadata": {"message_tag": "THINK"}}
|
|
)
|
|
response.additional_kwargs["message_tag"] = "THINK"
|
|
response.content = f"<think>{response.content}</think>"
|
|
|
|
# Add the response to the original message list and append a HumanMessage to ensure the last message is from the user
|
|
# Some models do not support assistant message prefilling and require the last message to be from the user
|
|
state['messages'] = state['messages'] + [response, HumanMessage(content=self._get_follow_up_prompt())]
|
|
return state
|
|
|
|
def _get_follow_up_prompt(self) -> str:
|
|
"""Return a prompt that guides the main agent response based on language."""
|
|
prompts = {
|
|
"ja": "以上の分析に基づいて、ユーザーに返信してください。",
|
|
"jp": "以上の分析に基づいて、ユーザーに返信してください。",
|
|
"zh": "请根据以上分析,回复用户。",
|
|
"zh-TW": "請根據以上分析,回覆用戶。",
|
|
"ko": "위 분석을 바탕으로 사용자에게 답변해 주세요.",
|
|
"en": "Based on the above analysis, please respond to the user.",
|
|
}
|
|
return prompts.get(self.language, prompts["en"])
|
|
|
|
def wrap_model_call(
|
|
self,
|
|
request: ModelRequest,
|
|
handler: Callable[[ModelRequest], ModelResponse],
|
|
) -> ModelResponse:
|
|
return handler(request.override(system_prompt=self.processed_system_prompt))
|
|
|
|
async def awrap_model_call(
|
|
self,
|
|
request: ModelRequest,
|
|
handler: Callable[[ModelRequest], ModelResponse],
|
|
) -> ModelResponse:
|
|
return await handler(request.override(system_prompt=self.processed_system_prompt))
|