diff --git a/routes/chat.py b/routes/chat.py index e6f81c0..12ffe7d 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -177,7 +177,7 @@ async def create_agent_and_generate_response( terms_analysis = process_terms_with_embedding(terms_list, bot_id, query_text) if terms_analysis: # 将terms分析结果也添加到消息中 - messages = append_user_last_message(messages, f"\n\nRelevant Terms:\n{terms_analysis}") + system_prompt = system_prompt.replace("{terms}", terms_analysis) print(f"Generated terms analysis: {terms_analysis[:200]}...") # 只打印前200个字符 except Exception as e: print(f"Error processing terms with embedding: {e}") diff --git a/utils/fastapi_utils.py b/utils/fastapi_utils.py index 59284fb..c7a3160 100644 --- a/utils/fastapi_utils.py +++ b/utils/fastapi_utils.py @@ -3,6 +3,7 @@ import re import hashlib import json import asyncio +from concurrent.futures import ThreadPoolExecutor from typing import List, Dict, Optional, Union, Any import aiohttp from qwen_agent.llm.schema import ASSISTANT, FUNCTION @@ -10,6 +11,12 @@ from qwen_agent.llm.oai import TextChatAtOAI from fastapi import HTTPException from utils.logger import logger +# 创建全局线程池执行器,用于执行同步的HTTP调用 +thread_pool = ThreadPoolExecutor(max_workers=10) + +# 创建并发信号量,限制同时进行的API调用数量 +api_semaphore = asyncio.Semaphore(8) # 最多同时进行8个API调用 + def get_versioned_filename(upload_dir: str, name_without_ext: str, file_extension: str) -> tuple[str, int]: """ @@ -362,6 +369,33 @@ async def fetch_bot_config(bot_id: str) -> Dict[str, Any]: ) +def _sync_call_guideline_llm(llm_config, messages) -> str: + """同步调用LLM的辅助函数,在线程池中执行""" + llm_instance = TextChatAtOAI(llm_config) + try: + # 设置stream=False来获取非流式响应 + response = llm_instance.chat(messages=messages, stream=False) + + # 处理响应 + if isinstance(response, list) and response: + # 如果返回的是Message列表,提取内容 + if hasattr(response[0], 'content'): + return response[0].content + elif isinstance(response[0], dict) and 'content' in response[0]: + return response[0]['content'] + + # 如果是字符串,直接返回 + if isinstance(response, str): + return response + + # 处理其他类型 + return str(response) if response else "" + + except Exception as e: + print(f"Error calling guideline LLM: {e}") + return "" + + async def call_guideline_llm(chat_history: str, guidelines_text: str, terms:str, model_name: str, api_key: str, model_server: str) -> str: """调用大语言模型处理guideline分析 @@ -393,31 +427,17 @@ async def call_guideline_llm(chat_history: str, guidelines_text: str, terms:str, 'model_server': model_server, # 使用传入的model_server参数 } - # 创建LLM实例 - llm_instance = TextChatAtOAI(llm_config) - # 调用模型 messages = [{'role': 'user', 'content': system_prompt}] try: - # 设置stream=False来获取非流式响应 - response = llm_instance.chat(messages=messages, stream=False) - - # 处理响应 - if isinstance(response, list) and response: - # 如果返回的是Message列表,提取内容 - if hasattr(response[0], 'content'): - return response[0].content - elif isinstance(response[0], dict) and 'content' in response[0]: - return response[0]['content'] - - # 如果是字符串,直接返回 - if isinstance(response, str): + # 使用信号量控制并发API调用数量 + async with api_semaphore: + # 使用线程池执行同步HTTP调用,避免阻塞事件循环 + loop = asyncio.get_event_loop() + response = await loop.run_in_executor(thread_pool, _sync_call_guideline_llm, llm_config, messages) return response - # 处理其他类型 - return str(response) if response else "" - except Exception as e: print(f"Error calling guideline LLM: {e}") return ""