From 3973174c83120df7debebdfd1436c8dbb8fa9ff6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Thu, 27 Nov 2025 21:50:03 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 17 ++++ agent/config_cache.py | 6 +- agent/custom_mcp_manager.py | 4 +- agent/file_loaded_agent_manager.py | 4 +- agent/modified_assistant.py | 4 +- agent/prompt_loader.py | 19 ++-- agent/sharded_agent_manager.py | 4 +- embedding/embedding.py | 134 +++++++++++++++-------------- embedding/manager.py | 3 +- fastapi_app.py | 16 +++- utils/logger.py => logger.py | 0 routes/chat.py | 38 ++++---- routes/file_manager.py | 9 +- routes/files.py | 25 +++--- routes/projects.py | 15 ++-- routes/system.py | 25 +++--- task_queue/config.py | 6 +- task_queue/manager.py | 28 +++--- task_queue/optimized_consumer.py | 85 +++++++++--------- task_queue/tasks.py | 36 ++++---- utils/agent_pool.py | 4 +- utils/async_file_ops.py | 12 ++- utils/data_merger.py | 28 +++--- utils/dataset_manager.py | 42 +++++---- utils/excel_csv_processor.py | 14 +-- utils/fastapi_utils.py | 77 ++++++++++------- utils/file_manager.py | 11 ++- utils/file_utils.py | 34 ++++---- utils/log_util/context.py | 32 +++++++ utils/log_util/decorator.py | 32 +++++++ utils/log_util/logger.py | 93 ++++++++++++++++++++ utils/multi_project_manager.py | 78 +++++++++-------- utils/organize_dataset_files.py | 22 +++-- utils/project_manager.py | 18 ++-- utils/single_file_processor.py | 26 +++--- utils/system_optimizer.py | 34 ++++---- zip_project_handler.py | 36 ++++---- 37 files changed, 684 insertions(+), 387 deletions(-) rename utils/logger.py => logger.py (100%) create mode 100644 utils/log_util/context.py create mode 100644 utils/log_util/decorator.py create mode 100644 utils/log_util/logger.py diff --git a/README.md b/README.md index d9b2b34..3b5a427 100644 --- a/README.md +++ b/README.md @@ -668,3 +668,20 @@ qwen-agent/ ``` 现在您可以开始使用 Catalog Agent 进行智能数据检索了!🚀 + + + +## 提示词工程 + +指南 +```guideline +1) Condition: 用户意图为“执行设备控制”。 Action: 如果用户在当前交互中已回复肯定确认(例如,同意、确认、好的等),则立即调用 Iot Control-dxcore_update_device_status 工具执行设备操作。否则(用户尚未确认),则向用户发送确认询问:“即将为您[操作内容][设备名称][具体参数],是否确认?”并等待用户回复。 +3) Condition: 当用户意图为查询设备状态或信息时。 Action: 调用 Iot Control 设备相关的工具进行查询(不调用 rag_retrieve-rag_retrieve)。 +4) Condition: 当用户意图为非设备相关的问题(如提问、遇到难处、咨询事实性问题、价格等)时。 Action: 优先调用 rag_retrieve-rag_retrieve 查询知识库;如果无结果,则调用 WebSearch-web_search;最后综合两个工具的内容进行回复。 +``` +术语表 +```terms +1) Name: term_name1, Description: desc, Synonyms: syn1, syn2 +2) Name: term_name2, Description: desc, Synonyms: syn1, syn2 +``` + diff --git a/agent/config_cache.py b/agent/config_cache.py index d663d00..c9f3a16 100644 --- a/agent/config_cache.py +++ b/agent/config_cache.py @@ -7,7 +7,9 @@ import asyncio import os import json from typing import Dict, Tuple, Optional, Any -from utils.logger import logger +import logging + +logger = logging.getLogger('app') class ConfigFileCache: @@ -97,4 +99,4 @@ class ConfigFileCache: # 全局缓存实例 -config_cache = ConfigFileCache() \ No newline at end of file +config_cache = ConfigFileCache() diff --git a/agent/custom_mcp_manager.py b/agent/custom_mcp_manager.py index fa31e47..371caa9 100644 --- a/agent/custom_mcp_manager.py +++ b/agent/custom_mcp_manager.py @@ -24,7 +24,9 @@ from typing import Dict, Optional, Union from dotenv import load_dotenv -from qwen_agent.log import logger +import logging + +logger = logging.getLogger('app') from qwen_agent.tools.base import BaseTool diff --git a/agent/file_loaded_agent_manager.py b/agent/file_loaded_agent_manager.py index 77e6e6d..5a40cd6 100644 --- a/agent/file_loaded_agent_manager.py +++ b/agent/file_loaded_agent_manager.py @@ -21,7 +21,9 @@ import asyncio from typing import Dict, List, Optional from qwen_agent.agents import Assistant -from qwen_agent.log import logger +import logging + +logger = logging.getLogger('app') from agent.modified_assistant import init_modified_agent_service_with_files, update_agent_llm from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async diff --git a/agent/modified_assistant.py b/agent/modified_assistant.py index ec233dd..21be174 100644 --- a/agent/modified_assistant.py +++ b/agent/modified_assistant.py @@ -24,9 +24,11 @@ from qwen_agent.llm.schema import ASSISTANT, FUNCTION, Message from qwen_agent.llm.oai import TextChatAtOAI from qwen_agent.tools import BaseTool from agent.custom_mcp_manager import CustomMCPManager +import logging +logger = logging.getLogger('app') # 设置工具日志记录器 -tool_logger = logging.getLogger('tool_logger') +tool_logger = logging.getLogger('app') class ModifiedAssistant(Assistant): """ diff --git a/agent/prompt_loader.py b/agent/prompt_loader.py index 18f15a8..f043430 100644 --- a/agent/prompt_loader.py +++ b/agent/prompt_loader.py @@ -7,6 +7,9 @@ import json import asyncio from typing import List, Dict, Optional, Any from datetime import datetime, timezone, timedelta +import logging + +logger = logging.getLogger('app') def safe_replace(text: str, placeholder: str, value: Any) -> str: @@ -142,9 +145,9 @@ async def load_system_prompt_async(project_dir: str, language: str = None, syste default_prompt_file = os.path.join("prompt", f"system_prompt_{robot_type}.md") system_prompt_default = await config_cache.get_text_file(default_prompt_file) if system_prompt_default: - print(f"Using cached default system prompt for {robot_type} from prompt folder") + logger.info(f"Using cached default system prompt for {robot_type} from prompt folder") except Exception as e: - print(f"Failed to load default system prompt for {robot_type}: {str(e)}") + logger.error(f"Failed to load default system prompt for {robot_type}: {str(e)}") system_prompt_default = None readme = "" @@ -244,11 +247,11 @@ async def load_mcp_settings_async(project_dir: str, mcp_settings: list=None, bot default_mcp_file = os.path.join("mcp", f"mcp_settings_{robot_type}.json") default_mcp_settings = await config_cache.get_json_file(default_mcp_file) or [] if default_mcp_settings: - print(f"Using cached default mcp_settings_{robot_type} from mcp folder") + logger.info(f"Using cached default mcp_settings_{robot_type} from mcp folder") else: - print(f"No default mcp_settings_{robot_type} found, using empty default settings") + logger.warning(f"No default mcp_settings_{robot_type} found, using empty default settings") except Exception as e: - print(f"Failed to load default mcp_settings_{robot_type}: {str(e)}") + logger.error(f"Failed to load default mcp_settings_{robot_type}: {str(e)}") default_mcp_settings = [] # 遍历mcpServers工具,给每个工具增加env参数 @@ -269,7 +272,7 @@ async def load_mcp_settings_async(project_dir: str, mcp_settings: list=None, bot input_mcp_settings = mcp_settings elif mcp_settings: input_mcp_settings = [mcp_settings] - print(f"Warning: mcp_settings is not a list, converting to list format") + logger.warning(f"Warning: mcp_settings is not a list, converting to list format") # 3. 合并默认设置和传入设置 merged_settings = [] @@ -286,7 +289,7 @@ async def load_mcp_settings_async(project_dir: str, mcp_settings: list=None, bot # 合并mcpServers对象,传入的设置覆盖默认设置中相同的key default_mcp_servers.update(input_mcp_servers) merged_settings[0]['mcpServers'] = default_mcp_servers - print(f"Merged mcpServers: default + {len(input_mcp_servers)} input servers") + logger.info(f"Merged mcpServers: default + {len(input_mcp_servers)} input servers") # 如果没有默认设置但有传入设置,直接使用传入设置 elif input_mcp_settings: @@ -296,7 +299,7 @@ async def load_mcp_settings_async(project_dir: str, mcp_settings: list=None, bot if not merged_settings: merged_settings = [] elif not isinstance(merged_settings, list): - print(f"Warning: merged_settings is not a list, converting to list format") + logger.warning(f"Warning: merged_settings is not a list, converting to list format") merged_settings = [merged_settings] if merged_settings else [] # 计算 dataset_dir 用于替换 MCP 配置中的占位符 diff --git a/agent/sharded_agent_manager.py b/agent/sharded_agent_manager.py index c0729f7..da9eb77 100644 --- a/agent/sharded_agent_manager.py +++ b/agent/sharded_agent_manager.py @@ -24,7 +24,9 @@ import threading from collections import defaultdict from qwen_agent.agents import Assistant -from qwen_agent.log import logger +import logging + +logger = logging.getLogger('app') from agent.modified_assistant import init_modified_agent_service_with_files, update_agent_llm from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async diff --git a/embedding/embedding.py b/embedding/embedding.py index 4ffaf2c..1cc61b6 100644 --- a/embedding/embedding.py +++ b/embedding/embedding.py @@ -7,6 +7,10 @@ import requests import asyncio import hashlib import json +import logging + +# Configure logger +logger = logging.getLogger('app') def encode_texts_via_api(texts, batch_size=32): """通过 API 接口编码文本""" @@ -35,18 +39,18 @@ def encode_texts_via_api(texts, batch_size=32): if result_data.get("success"): embeddings_list = result_data.get("embeddings", []) - print(f"API编码成功,处理了 {len(texts)} 个文本,embedding维度: {len(embeddings_list[0]) if embeddings_list else 0}") + logger.info(f"API编码成功,处理了 {len(texts)} 个文本,embedding维度: {len(embeddings_list[0]) if embeddings_list else 0}") return np.array(embeddings_list) else: error_msg = result_data.get('error', '未知错误') - print(f"API编码失败: {error_msg}") + logger.error(f"API编码失败: {error_msg}") raise Exception(f"API编码失败: {error_msg}") else: - print(f"API请求失败: {response.status_code} - {response.text}") + logger.error(f"API请求失败: {response.status_code} - {response.text}") raise Exception(f"API请求失败: {response.status_code}") except Exception as e: - print(f"API编码异常: {e}") + logger.error(f"API编码异常: {e}") raise def clean_text(text): @@ -144,10 +148,10 @@ def embed_document(input_file='document.txt', output_file='embedding.pkl', if is_meaningful_line(cleaned_text): chunks.append(cleaned_text) - print(f"使用按行分块策略") - print(f"原始行数: {original_count}") - print(f"清理后有效句子数: {len(chunks)}") - print(f"过滤比例: {((original_count - len(chunks)) / original_count * 100):.1f}%") + logger.info(f"使用按行分块策略") + logger.info(f"原始行数: {original_count}") + logger.info(f"清理后有效句子数: {len(chunks)}") + logger.info(f"过滤比例: {((original_count - len(chunks)) / original_count * 100):.1f}%") elif chunking_strategy == 'paragraph': # 新的段落级分块策略 @@ -166,13 +170,13 @@ def embed_document(input_file='document.txt', output_file='embedding.pkl', # 使用段落分块 chunks = paragraph_chunking(cleaned_content, **params) - print(f"使用段落级分块策略") - print(f"文档总长度: {len(content)} 字符") - print(f"分块数量: {len(chunks)}") + logger.info(f"使用段落级分块策略") + logger.info(f"文档总长度: {len(content)} 字符") + logger.info(f"分块数量: {len(chunks)}") if chunks: - print(f"平均chunk大小: {sum(len(chunk) for chunk in chunks) / len(chunks):.1f} 字符") - print(f"最大chunk大小: {max(len(chunk) for chunk in chunks)} 字符") - print(f"最小chunk大小: {min(len(chunk) for chunk in chunks)} 字符") + logger.debug(f"平均chunk大小: {sum(len(chunk) for chunk in chunks) / len(chunks):.1f} 字符") + logger.debug(f"最大chunk大小: {max(len(chunk) for chunk in chunks)} 字符") + logger.debug(f"最小chunk大小: {min(len(chunk) for chunk in chunks)} 字符") elif chunking_strategy == 'smart': # 智能分块策略,自动检测文档格式 @@ -186,25 +190,25 @@ def embed_document(input_file='document.txt', output_file='embedding.pkl', # 使用智能分块 chunks = smart_chunking(content, **params) - print(f"使用智能分块策略") - print(f"文档总长度: {len(content)} 字符") - print(f"分块数量: {len(chunks)}") + logger.info(f"使用智能分块策略") + logger.info(f"文档总长度: {len(content)} 字符") + logger.info(f"分块数量: {len(chunks)}") if chunks: - print(f"平均chunk大小: {sum(len(chunk) for chunk in chunks) / len(chunks):.1f} 字符") - print(f"最大chunk大小: {max(len(chunk) for chunk in chunks)} 字符") - print(f"最小chunk大小: {min(len(chunk) for chunk in chunks)} 字符") + logger.debug(f"平均chunk大小: {sum(len(chunk) for chunk in chunks) / len(chunks):.1f} 字符") + logger.debug(f"最大chunk大小: {max(len(chunk) for chunk in chunks)} 字符") + logger.debug(f"最小chunk大小: {min(len(chunk) for chunk in chunks)} 字符") else: raise ValueError(f"不支持的分块策略: {chunking_strategy}") if not chunks: - print("警告:没有找到有效的内容块!") + logger.warning("警告:没有找到有效的内容块!") return None - print(f"正在处理 {len(chunks)} 个内容块...") + logger.info(f"正在处理 {len(chunks)} 个内容块...") # 使用API接口进行编码 - print("使用API接口进行编码...") + logger.info("使用API接口进行编码...") chunk_embeddings = encode_texts_via_api(chunks, batch_size=32) embedding_data = { @@ -218,14 +222,14 @@ def embed_document(input_file='document.txt', output_file='embedding.pkl', with open(output_file, 'wb') as f: pickle.dump(embedding_data, f) - print(f"已保存嵌入向量到 {output_file}") + logger.info(f"已保存嵌入向量到 {output_file}") return embedding_data except FileNotFoundError: - print(f"错误:找不到文件 {input_file}") + logger.error(f"错误:找不到文件 {input_file}") return None except Exception as e: - print(f"处理文档时出错:{e}") + logger.error(f"处理文档时出错:{e}") return None @@ -611,21 +615,21 @@ def split_document_by_pages(input_file='document.txt', output_file='pagination.t if page_content: pages.append(page_content) - print(f"总共分割出 {len(pages)} 页") + logger.info(f"总共分割出 {len(pages)} 页") # 写入序列化文件 with open(output_file, 'w', encoding='utf-8') as f: for i, page_content in enumerate(pages, 1): f.write(f"{page_content}\n") - print(f"已将页面内容序列化到 {output_file}") + logger.info(f"已将页面内容序列化到 {output_file}") return pages except FileNotFoundError: - print(f"错误:找不到文件 {input_file}") + logger.error(f"错误:找不到文件 {input_file}") return [] except Exception as e: - print(f"分割文档时出错:{e}") + logger.error(f"分割文档时出错:{e}") return [] def test_chunking_strategies(): @@ -645,54 +649,54 @@ def test_chunking_strategies(): 第五段:这是最后一个段落。它用来测试分块策略的完整性和准确性。 """ - print("=" * 60) - print("分块策略测试") - print("=" * 60) + logger.debug("=" * 60) + logger.debug("分块策略测试") + logger.debug("=" * 60) # 测试1: 段落级分块(小chunk) - print("\n1. 段落级分块 - 小chunk (max_size=200):") + logger.debug("\n1. 段落级分块 - 小chunk (max_size=200):") chunks_small = paragraph_chunking(test_text, max_chunk_size=200, overlap=50) for i, chunk in enumerate(chunks_small): - print(f"Chunk {i+1} (长度: {len(chunk)}): {chunk[:50]}...") + logger.debug(f"Chunk {i+1} (长度: {len(chunk)}): {chunk[:50]}...") # 测试2: 段落级分块(大chunk) - print("\n2. 段落级分块 - 大chunk (max_size=500):") + logger.debug("\n2. 段落级分块 - 大chunk (max_size=500):") chunks_large = paragraph_chunking(test_text, max_chunk_size=500, overlap=100) for i, chunk in enumerate(chunks_large): - print(f"Chunk {i+1} (长度: {len(chunk)}): {chunk[:50]}...") + logger.debug(f"Chunk {i+1} (长度: {len(chunk)}): {chunk[:50]}...") # 测试3: 段落级分块(无重叠) - print("\n3. 段落级分块 - 无重叠:") + logger.debug("\n3. 段落级分块 - 无重叠:") chunks_no_overlap = paragraph_chunking(test_text, max_chunk_size=300, overlap=0) for i, chunk in enumerate(chunks_no_overlap): - print(f"Chunk {i+1} (长度: {len(chunk)}): {chunk[:50]}...") + logger.debug(f"Chunk {i+1} (长度: {len(chunk)}): {chunk[:50]}...") - print(f"\n测试总结:") - print(f"- 小chunk策略: {len(chunks_small)} 个chunks") - print(f"- 大chunk策略: {len(chunks_large)} 个chunks") - print(f"- 无重叠策略: {len(chunks_no_overlap)} 个chunks") + logger.debug(f"\n测试总结:") + logger.debug(f"- 小chunk策略: {len(chunks_small)} 个chunks") + logger.debug(f"- 大chunk策略: {len(chunks_large)} 个chunks") + logger.debug(f"- 无重叠策略: {len(chunks_no_overlap)} 个chunks") def demo_usage(): """ 演示如何使用新的分块功能 """ - print("=" * 60) - print("使用示例") - print("=" * 60) + logger.debug("=" * 60) + logger.debug("使用示例") + logger.debug("=" * 60) - print("\n1. 使用传统的按行分块:") - print("embed_document('document.txt', 'line_embedding.pkl', chunking_strategy='line')") + logger.debug("\n1. 使用传统的按行分块:") + logger.debug("embed_document('document.txt', 'line_embedding.pkl', chunking_strategy='line')") - print("\n2. 使用段落级分块(默认参数):") - print("embed_document('document.txt', 'paragraph_embedding.pkl', chunking_strategy='paragraph')") + logger.debug("\n2. 使用段落级分块(默认参数):") + logger.debug("embed_document('document.txt', 'paragraph_embedding.pkl', chunking_strategy='paragraph')") - print("\n3. 使用自定义参数的段落级分块:") - print("embed_document('document.txt', 'custom_embedding.pkl',") - print(" chunking_strategy='paragraph',") - print(" max_chunk_size=1500,") - print(" overlap=200,") - print(" min_chunk_size=300)") + logger.debug("\n3. 使用自定义参数的段落级分块:") + logger.debug("embed_document('document.txt', 'custom_embedding.pkl',") + logger.debug(" chunking_strategy='paragraph',") + logger.debug(" max_chunk_size=1500,") + logger.debug(" overlap=200,") + logger.debug(" min_chunk_size=300)") @@ -737,10 +741,10 @@ def cache_terms_embeddings(bot_id: str, terms_list: List[Dict[str, Any]]) -> Dic # 验证缓存数据是否匹配当前的terms current_hash = _generate_terms_hash(terms_list) if cached_data.get('hash') == current_hash: - print(f"Using cached terms embeddings for {cache_key}") + logger.info(f"Using cached terms embeddings for {cache_key}") return cached_data except Exception as e: - print(f"Error loading cache: {e}") + logger.error(f"Error loading cache: {e}") # 准备要编码的文本 term_texts = [] @@ -793,11 +797,11 @@ def cache_terms_embeddings(bot_id: str, terms_list: List[Dict[str, Any]]) -> Dic with open(cache_file, 'wb') as f: pickle.dump(cache_data, f) - print(f"Cached {len(term_texts)} terms embeddings to {cache_file}") + logger.info(f"Cached {len(term_texts)} terms embeddings to {cache_file}") return cache_data except Exception as e: - print(f"Error generating terms embeddings: {e}") + logger.error(f"Error generating terms embeddings: {e}") return {} @@ -826,14 +830,14 @@ def search_similar_terms(query_text: str, cached_terms_data: Dict[str, Any]) -> term_info = cached_terms_data['term_info'] # 添加调试信息 - print(f"DEBUG: Query text: '{query_text}'") - print(f"DEBUG: Query vector shape: {query_vector.shape}, norm: {np.linalg.norm(query_vector)}") + logger.debug(f"DEBUG: Query text: '{query_text}'") + logger.debug(f"DEBUG: Query vector shape: {query_vector.shape}, norm: {np.linalg.norm(query_vector)}") # 计算cos相似度 similarities = _cosine_similarity(query_vector, term_embeddings) - print(f"DEBUG: Similarities: {similarities}") - print(f"DEBUG: Max similarity: {np.max(similarities):.3f}, Mean similarity: {np.mean(similarities):.3f}") + logger.debug(f"DEBUG: Similarities: {similarities}") + logger.debug(f"DEBUG: Max similarity: {np.max(similarities):.3f}, Mean similarity: {np.mean(similarities):.3f}") # 获取所有terms的相似度 matches = [] @@ -852,7 +856,7 @@ def search_similar_terms(query_text: str, cached_terms_data: Dict[str, Any]) -> return matches[:5] except Exception as e: - print(f"Error in similarity search: {e}") + logger.error(f"Error in similarity search: {e}") return [] diff --git a/embedding/manager.py b/embedding/manager.py index dd854f6..9fb506d 100644 --- a/embedding/manager.py +++ b/embedding/manager.py @@ -18,8 +18,9 @@ import psutil import numpy as np from sentence_transformers import SentenceTransformer +import logging -logger = logging.getLogger(__name__) +logger = logging.getLogger('app') class GlobalModelManager: diff --git a/fastapi_app.py b/fastapi_app.py index ca6cfa2..d1d721a 100644 --- a/fastapi_app.py +++ b/fastapi_app.py @@ -10,7 +10,11 @@ from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from routes.file_manager import router as file_manager_router -from utils.logger import logger +import logging + + +from utils.log_util.logger import init_with_fastapi + # Import route modules from routes import chat, files, projects, system @@ -20,6 +24,10 @@ from routes.system import agent_manager, connection_pool, file_cache app = FastAPI(title="Database Assistant API", version="1.0.0") +init_with_fastapi(app) + +logger = logging.getLogger('app') + # 挂载public文件夹为静态文件服务 app.mount("/public", StaticFiles(directory="public"), name="static") @@ -47,7 +55,7 @@ app.include_router(file_manager_router) if __name__ == "__main__": # 启动 FastAPI 应用 - print("Starting FastAPI server...") - print("File Manager API available at: http://localhost:8001/api/v1/files") - print("Web Interface available at: http://localhost:8001/public/file-manager.html") + logger.info("Starting FastAPI server...") + logger.info("File Manager API available at: http://localhost:8001/api/v1/files") + logger.info("Web Interface available at: http://localhost:8001/public/file-manager.html") uvicorn.run(app, host="0.0.0.0", port=8001) diff --git a/utils/logger.py b/logger.py similarity index 100% rename from utils/logger.py rename to logger.py diff --git a/routes/chat.py b/routes/chat.py index 12ffe7d..919b1f0 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -5,7 +5,9 @@ from typing import Union, Optional from fastapi import APIRouter, HTTPException, Header from fastapi.responses import StreamingResponse from pydantic import BaseModel +import logging +logger = logging.getLogger('app') from utils import ( Message, ChatRequest, ChatResponse ) @@ -79,6 +81,7 @@ async def generate_stream_response(agent, messages, thought_list, tool_response: } yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" + logger.info(f"开始生成Agent流式响应, model: {model}") for response in agent.run(messages=messages): previous_content = accumulated_content accumulated_content = get_content_from_messages(response, tool_response=tool_response) @@ -92,6 +95,8 @@ async def generate_stream_response(agent, messages, thought_list, tool_response: # 只有当有新内容时才发送chunk if new_content: + if chunk_id == 0: + logger.info(f"Agent首个Token已生成, 开始流式输出") chunk_id += 1 # 构造OpenAI格式的流式响应 chunk_data = { @@ -126,10 +131,11 @@ async def generate_stream_response(agent, messages, thought_list, tool_response: # 发送结束标记 yield "data: [DONE]\n\n" + logger.info(f"Agent流式响应完成, 总共生成 {chunk_id} 个chunks") except Exception as e: import traceback error_details = traceback.format_exc() - from utils.logger import logger + logger.error(f"Error in generate_stream_response: {str(e)}") logger.error(f"Full traceback: {error_details}") @@ -168,7 +174,7 @@ async def create_agent_and_generate_response( # 2. 如果有terms内容,先进行embedding(embedding需要缓存起来,这个可以tmp文件缓存,以{bot_id}_terms作为key)embedding实现情参考 @embedding/embedding.py 文件,可以在里面实现。拿到embedding后,可以进行相似性检索,检索方式先使用cos相似度,找到阈值相似性>0.7的匹配项,重新整理为terms_analysis,格式:1) Name: term_name1, Description: desc, Synonyms: syn1, syn2。 terms_analysis = "" if terms_list: - print(f"terms_list: {terms_list}") + logger.info(f"terms_list: {terms_list}") # 从messages中提取用户的查询文本用于相似性检索 query_text = get_user_last_message_content(messages) # 使用embedding进行terms处理 @@ -178,9 +184,9 @@ async def create_agent_and_generate_response( if terms_analysis: # 将terms分析结果也添加到消息中 system_prompt = system_prompt.replace("{terms}", terms_analysis) - print(f"Generated terms analysis: {terms_analysis[:200]}...") # 只打印前200个字符 + logger.info(f"Generated terms analysis: {terms_analysis[:200]}...") # 只打印前200个字符 except Exception as e: - print(f"Error processing terms with embedding: {e}") + logger.error(f"Error processing terms with embedding: {e}") terms_analysis = "" else: # 当terms_list为空时,删除对应的pkl缓存文件 @@ -189,14 +195,14 @@ async def create_agent_and_generate_response( cache_file = f"projects/cache/{bot_id}_terms.pkl" if os.path.exists(cache_file): os.remove(cache_file) - print(f"Removed empty terms cache file: {cache_file}") + logger.info(f"Removed empty terms cache file: {cache_file}") except Exception as e: - print(f"Error removing terms cache file: {e}") + logger.error(f"Error removing terms cache file: {e}") # 3. 如果有guideline内容,进行并发处理 guideline_analysis = "" if guidelines_list: - print(f"guidelines_list: {guidelines_list}") + logger.info(f"guidelines_list: {guidelines_list}") guidelines_count = len(guidelines_list) if guidelines_count > 0: @@ -223,7 +229,7 @@ async def create_agent_and_generate_response( batches[-2].extend(batches[-1]) batches.pop() - print(f"Processing {guidelines_count} guidelines in {len(batches)} batches with {batch_count} concurrent batches") + logger.info(f"Processing {guidelines_count} guidelines in {len(batches)} batches with {batch_count} concurrent batches") # 准备chat_history chat_history = format_messages_to_chat_history(messages) @@ -265,13 +271,13 @@ async def create_agent_and_generate_response( # 处理结果:最后一个结果是agent,前面的是guideline批次结果 agent = all_results[-1] # agent创建的结果 batch_results = all_results[:-1] # guideline批次的结果 - print(f"batch_results:{batch_results}") + logger.info(f"batch_results:{batch_results}") # 合并guideline分析结果,使用JSON格式的checks数组 all_checks = [] for i, result in enumerate(batch_results): if isinstance(result, Exception): - print(f"Guideline batch {i} failed: {result}") + logger.error(f"Guideline batch {i} failed: {result}") continue if result and isinstance(result, dict) and 'checks' in result: # 如果是JSON对象且包含checks数组,只保留applies为true的checks @@ -279,13 +285,13 @@ async def create_agent_and_generate_response( all_checks.extend(applicable_checks) elif result and isinstance(result, str) and result.strip(): # 如果是普通文本,保留原有逻辑 - print(f"Non-JSON result from batch {i}: {result}") + logger.info(f"Non-JSON result from batch {i}: {result}") if all_checks: # 将checks数组格式化为JSON字符串 guideline_analysis = "\n".join([item["rationale"] for item in all_checks]) # guideline_analysis = json.dumps({"checks": all_checks}, ensure_ascii=False) - print(f"Merged guideline analysis result: {guideline_analysis}") + logger.info(f"Merged guideline analysis result: {guideline_analysis}") # 将分析结果添加到最后一个消息的内容中 if guideline_analysis: @@ -430,8 +436,8 @@ async def chat_completions(request: ChatRequest, authorization: Optional[str] = except Exception as e: import traceback error_details = traceback.format_exc() - print(f"Error in chat_completions: {str(e)}") - print(f"Full traceback: {error_details}") + logger.error(f"Error in chat_completions: {str(e)}") + logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @@ -526,6 +532,6 @@ async def chat_completions_v2(request: ChatRequestV2, authorization: Optional[st except Exception as e: import traceback error_details = traceback.format_exc() - print(f"Error in chat_completions_v2: {str(e)}") - print(f"Full traceback: {error_details}") + logger.error(f"Error in chat_completions_v2: {str(e)}") + logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") diff --git a/routes/file_manager.py b/routes/file_manager.py index 28f98c6..17d73f8 100644 --- a/routes/file_manager.py +++ b/routes/file_manager.py @@ -13,6 +13,9 @@ from fastapi.responses import FileResponse, StreamingResponse import mimetypes import json import zipfile +import logging + +logger = logging.getLogger('app') import tempfile import io import math @@ -485,7 +488,7 @@ async def download_folder_as_zip(request: Dict[str, str]): zipf.write(file_path, arcname) except (OSError, IOError) as e: # 跳过无法读取的文件,但记录警告 - print(f"Warning: Skipping file {file_path}: {e}") + logger.warning(f"Warning: Skipping file {file_path}: {e}") continue zip_buffer.seek(0) @@ -586,7 +589,7 @@ async def download_multiple_items_as_zip(request: Dict[str, Any]): try: zipf.write(target_path, target_path.name) except (OSError, IOError) as e: - print(f"Warning: Skipping file {target_path}: {e}") + logger.warning(f"Warning: Skipping file {target_path}: {e}") continue elif target_path.is_dir(): # 文件夹 @@ -597,7 +600,7 @@ async def download_multiple_items_as_zip(request: Dict[str, Any]): arcname = f"{target_path.name}/{file_path.relative_to(target_path)}" zipf.write(file_path, arcname) except (OSError, IOError) as e: - print(f"Warning: Skipping file {file_path}: {e}") + logger.warning(f"Warning: Skipping file {file_path}: {e}") continue zip_buffer.seek(0) diff --git a/routes/files.py b/routes/files.py index b65f6c3..e8f59a8 100644 --- a/routes/files.py +++ b/routes/files.py @@ -5,6 +5,9 @@ from datetime import datetime from typing import Optional, List from fastapi import APIRouter, HTTPException, Header, UploadFile, File, Form from pydantic import BaseModel +import logging + +logger = logging.getLogger('app') from utils import ( DatasetRequest, QueueTaskRequest, IncrementalTaskRequest, QueueTaskResponse, @@ -83,7 +86,7 @@ async def process_files_async_endpoint(request: QueueTaskRequest, authorization: except HTTPException: raise except Exception as e: - print(f"Error submitting async file processing task: {str(e)}") + logger.error(f"Error submitting async file processing task: {str(e)}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @@ -146,7 +149,7 @@ async def process_files_incremental_endpoint(request: IncrementalTaskRequest, au except HTTPException: raise except Exception as e: - print(f"Error submitting incremental file processing task: {str(e)}") + logger.error(f"Error submitting incremental file processing task: {str(e)}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @@ -260,7 +263,7 @@ async def cleanup_project_async_endpoint(dataset_id: str, remove_all: bool = Fal "action": "remove_all" if remove_all else "cleanup_logs" } except Exception as e: - print(f"Error submitting cleanup task: {str(e)}") + logger.error(f"Error submitting cleanup task: {str(e)}") raise HTTPException(status_code=500, detail=f"提交清理任务失败: {str(e)}") @@ -281,8 +284,8 @@ async def upload_file(file: UploadFile = File(...), folder: Optional[str] = Form """ try: # 调试信息 - print(f"Received folder parameter: {folder}") - print(f"File received: {file.filename if file else 'None'}") + logger.info(f"Received folder parameter: {folder}") + logger.info(f"File received: {file.filename if file else 'None'}") # 确定上传文件夹 if folder: @@ -345,7 +348,7 @@ async def upload_file(file: UploadFile = File(...), folder: Optional[str] = Form except HTTPException: raise except Exception as e: - print(f"Error uploading file: {str(e)}") + logger.error(f"Error uploading file: {str(e)}") raise HTTPException(status_code=500, detail=f"文件上传失败: {str(e)}") @@ -377,7 +380,7 @@ async def get_task_status(task_id: str): } except Exception as e: - print(f"Error getting task status: {str(e)}") + logger.error(f"Error getting task status: {str(e)}") raise HTTPException(status_code=500, detail=f"获取任务状态失败: {str(e)}") @@ -399,7 +402,7 @@ async def delete_task(task_id: str): "task_id": task_id } except Exception as e: - print(f"Error deleting task: {str(e)}") + logger.error(f"Error deleting task: {str(e)}") raise HTTPException(status_code=500, detail=f"删除任务记录失败: {str(e)}") @@ -428,7 +431,7 @@ async def list_tasks(status: Optional[str] = None, dataset_id: Optional[str] = N } except Exception as e: - print(f"Error listing tasks: {str(e)}") + logger.error(f"Error listing tasks: {str(e)}") raise HTTPException(status_code=500, detail=f"获取任务列表失败: {str(e)}") @@ -445,7 +448,7 @@ async def get_task_statistics(): } except Exception as e: - print(f"Error getting statistics: {str(e)}") + logger.error(f"Error getting statistics: {str(e)}") raise HTTPException(status_code=500, detail=f"获取统计信息失败: {str(e)}") @@ -463,5 +466,5 @@ async def cleanup_tasks(older_than_days: int = 7): } except Exception as e: - print(f"Error cleaning up tasks: {str(e)}") + logger.error(f"Error cleaning up tasks: {str(e)}") raise HTTPException(status_code=500, detail=f"清理任务记录失败: {str(e)}") \ No newline at end of file diff --git a/routes/projects.py b/routes/projects.py index c068133..87edc50 100644 --- a/routes/projects.py +++ b/routes/projects.py @@ -2,6 +2,9 @@ import os import json from typing import Optional from fastapi import APIRouter, HTTPException +import logging + +logger = logging.getLogger('app') from task_queue.task_status import task_status_store @@ -45,7 +48,7 @@ async def list_all_projects(): "updated_at": os.path.getmtime(item_path) }) except Exception as e: - print(f"Error reading robot project {item}: {str(e)}") + logger.error(f"Error reading robot project {item}: {str(e)}") robot_projects.append({ "id": item, "name": item, @@ -95,7 +98,7 @@ async def list_all_projects(): "updated_at": os.path.getmtime(item_path) }) except Exception as e: - print(f"Error reading dataset {item}: {str(e)}") + logger.error(f"Error reading dataset {item}: {str(e)}") datasets.append({ "id": item, "name": f"数据集 - {item[:8]}...", @@ -118,7 +121,7 @@ async def list_all_projects(): } except Exception as e: - print(f"Error listing projects: {str(e)}") + logger.error(f"Error listing projects: {str(e)}") raise HTTPException(status_code=500, detail=f"获取项目列表失败: {str(e)}") @@ -134,7 +137,7 @@ async def list_robot_projects(): "projects": response["robot_projects"] } except Exception as e: - print(f"Error listing robot projects: {str(e)}") + logger.error(f"Error listing robot projects: {str(e)}") raise HTTPException(status_code=500, detail=f"获取机器人项目列表失败: {str(e)}") @@ -150,7 +153,7 @@ async def list_datasets(): "projects": response["datasets"] } except Exception as e: - print(f"Error listing datasets: {str(e)}") + logger.error(f"Error listing datasets: {str(e)}") raise HTTPException(status_code=500, detail=f"获取数据集列表失败: {str(e)}") @@ -169,5 +172,5 @@ async def get_project_tasks(dataset_id: str): } except Exception as e: - print(f"Error getting project tasks: {str(e)}") + logger.error(f"Error getting project tasks: {str(e)}") raise HTTPException(status_code=500, detail=f"获取项目任务失败: {str(e)}") \ No newline at end of file diff --git a/routes/system.py b/routes/system.py index 2c67e93..30ef999 100644 --- a/routes/system.py +++ b/routes/system.py @@ -19,6 +19,9 @@ except ImportError: from utils.fastapi_utils import get_content_from_messages from embedding import get_model_manager from pydantic import BaseModel +import logging + +logger = logging.getLogger('app') router = APIRouter() @@ -38,7 +41,7 @@ class EncodeResponse(BaseModel): # 系统优化设置初始化 -print("正在初始化系统优化...") +logger.info("正在初始化系统优化...") system_optimizer = setup_system_optimizations() # 全局助手管理器配置(使用优化后的配置) @@ -66,10 +69,10 @@ file_cache = init_global_file_cache( ttl=int(os.getenv("FILE_CACHE_TTL", "300")) ) -print("系统优化初始化完成") -print(f"- 分片Agent管理器: {shard_count} 个分片,最多缓存 {max_cached_agents} 个agent") -print(f"- 连接池: 每主机100连接,总计500连接") -print(f"- 文件缓存: 1000个文件,TTL 300秒") +logger.info("系统优化初始化完成") +logger.info(f"- 分片Agent管理器: {shard_count} 个分片,最多缓存 {max_cached_agents} 个agent") +logger.info(f"- 连接池: 每主机100连接,总计500连接") +logger.info(f"- 文件缓存: 1000个文件,TTL 300秒") @router.get("/api/health") @@ -128,7 +131,7 @@ async def get_performance_stats(): } except Exception as e: - print(f"Error getting performance stats: {str(e)}") + logger.error(f"Error getting performance stats: {str(e)}") raise HTTPException(status_code=500, detail=f"获取性能统计失败: {str(e)}") @@ -146,7 +149,7 @@ async def optimize_system(profile: str = "balanced"): } except Exception as e: - print(f"Error applying optimization profile: {str(e)}") + logger.error(f"Error applying optimization profile: {str(e)}") raise HTTPException(status_code=500, detail=f"应用优化配置失败: {str(e)}") @@ -175,7 +178,7 @@ async def clear_system_cache(cache_type: Optional[str] = None): } except Exception as e: - print(f"Error clearing cache: {str(e)}") + logger.error(f"Error clearing cache: {str(e)}") raise HTTPException(status_code=500, detail=f"清理缓存失败: {str(e)}") @@ -197,7 +200,7 @@ async def get_system_config(): } except Exception as e: - print(f"Error getting system config: {str(e)}") + logger.error(f"Error getting system config: {str(e)}") raise HTTPException(status_code=500, detail=f"获取系统配置失败: {str(e)}") @@ -260,7 +263,7 @@ async def encode_texts(request: EncodeRequest): ) except Exception as e: - from utils.logger import logger + logger.error(f"文本编码 API 错误: {e}") return EncodeResponse( success=False, @@ -269,4 +272,4 @@ async def encode_texts(request: EncodeRequest): processing_time=0.0, total_texts=len(request.texts) if request else 0, error=f"编码失败: {str(e)}" - ) \ No newline at end of file + ) diff --git a/task_queue/config.py b/task_queue/config.py index 736f3fb..fa47763 100644 --- a/task_queue/config.py +++ b/task_queue/config.py @@ -4,9 +4,13 @@ Queue configuration using SqliteHuey for asynchronous file processing. """ import os +import logging from huey import SqliteHuey from datetime import timedelta +# 配置日志 +logger = logging.getLogger('app') + # 确保projects/queue_data目录存在 queue_data_dir = os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data') os.makedirs(queue_data_dir, exist_ok=True) @@ -24,4 +28,4 @@ huey.store_errors = True # 存储错误信息 huey.max_retries = 3 # 最大重试次数 huey.retry_delay = timedelta(seconds=60) # 重试延迟 -print(f"SqliteHuey队列已初始化,数据库路径: {os.path.join(queue_data_dir, 'huey.db')}") \ No newline at end of file +logger.info(f"SqliteHuey队列已初始化,数据库路径: {os.path.join(queue_data_dir, 'huey.db')}") \ No newline at end of file diff --git a/task_queue/manager.py b/task_queue/manager.py index 9c8f5ff..1e61a88 100644 --- a/task_queue/manager.py +++ b/task_queue/manager.py @@ -6,11 +6,15 @@ Queue manager for handling file processing queues. import os import json import time +import logging from typing import Dict, List, Optional, Any from huey import Huey from huey.api import Task from datetime import datetime, timedelta +# 配置日志 +logger = logging.getLogger('app') + from .config import huey from .tasks import process_file_async, process_multiple_files_async, process_zip_file_async, cleanup_processed_files @@ -20,7 +24,7 @@ class QueueManager: def __init__(self): self.huey = huey - print(f"队列管理器已初始化,使用数据库: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}") + logger.info(f"队列管理器已初始化,使用数据库: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}") def enqueue_file( self, @@ -49,7 +53,7 @@ class QueueManager: else: task = process_file_async(project_id, file_path, original_filename) - print(f"文件已加入队列: {file_path}, 任务ID: {task.id}") + logger.info(f"文件已加入队列: {file_path}, 任务ID: {task.id}") return task.id def enqueue_multiple_files( @@ -79,7 +83,7 @@ class QueueManager: else: task = process_multiple_files_async(project_id, file_paths, original_filenames) - print(f"批量文件已加入队列: {len(file_paths)} 个文件, 任务ID: {task.id}") + logger.info(f"批量文件已加入队列: {len(file_paths)} 个文件, 任务ID: {task.id}") return [task.id] def enqueue_zip_file( @@ -109,7 +113,7 @@ class QueueManager: else: task = process_zip_file_async(project_id, zip_path, extract_to) - print(f"zip文件已加入队列: {zip_path}, 任务ID: {task.id}") + logger.info(f"zip文件已加入队列: {zip_path}, 任务ID: {task.id}") return task.id def get_task_status(self, task_id: str) -> Dict[str, Any]: @@ -201,7 +205,7 @@ class QueueManager: stats["pending_tasks"] = len(pending_tasks) stats["total_tasks"] += len(pending_tasks) except Exception as e: - print(f"获取pending任务失败: {e}") + logger.error(f"获取pending任务失败: {e}") # 尝试获取定时任务数量 try: @@ -209,7 +213,7 @@ class QueueManager: stats["scheduled_tasks"] = len(scheduled_tasks) stats["total_tasks"] += len(scheduled_tasks) except Exception as e: - print(f"获取scheduled任务失败: {e}") + logger.error(f"获取scheduled任务失败: {e}") return stats @@ -237,7 +241,7 @@ class QueueManager: else: return False except Exception as e: - print(f"取消任务失败: {str(e)}") + logger.error(f"取消任务失败: {str(e)}") return False def cleanup_old_tasks(self, older_than_days: int = 7) -> Dict[str, Any]: @@ -292,7 +296,7 @@ class QueueManager: else: task = cleanup_processed_files(project_id, older_than_days) - print(f"清理任务已加入队列: 项目 {project_id}, 任务ID: {task.id}") + logger.info(f"清理任务已加入队列: 项目 {project_id}, 任务ID: {task.id}") return task.id def list_pending_tasks(self, limit: int = 50) -> List[Dict[str, Any]]: @@ -318,7 +322,7 @@ class QueueManager: "status": "pending", }) except Exception as e: - print(f"获取pending任务失败: {e}") + logger.error(f"获取pending任务失败: {e}") # 获取scheduled任务 try: @@ -330,12 +334,12 @@ class QueueManager: "status": "scheduled", }) except Exception as e: - print(f"获取scheduled任务失败: {e}") + logger.error(f"获取scheduled任务失败: {e}") return pending_tasks except Exception as e: - print(f"获取待处理任务失败: {str(e)}") + logger.error(f"获取待处理任务失败: {str(e)}") return [] def get_task_result(self, task_id: str) -> Optional[Any]: @@ -354,7 +358,7 @@ class QueueManager: return task.get() return None except Exception as e: - print(f"获取任务结果失败: {str(e)}") + logger.error(f"获取任务结果失败: {str(e)}") return None diff --git a/task_queue/optimized_consumer.py b/task_queue/optimized_consumer.py index 706902e..6c4a3d1 100755 --- a/task_queue/optimized_consumer.py +++ b/task_queue/optimized_consumer.py @@ -9,10 +9,14 @@ import time import signal import argparse import multiprocessing +import logging from pathlib import Path from concurrent.futures import ThreadPoolExecutor import threading +# 配置日志 +logger = logging.getLogger('app') + # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) @@ -50,7 +54,7 @@ class OptimizedQueueConsumer: def _signal_handler(self, signum, frame): """信号处理器,用于优雅关闭""" - print(f"\n收到信号 {signum},正在关闭队列消费者...") + logger.info(f"\n收到信号 {signum},正在关闭队列消费者...") self.running = False if self.consumer: self.consumer.stop() @@ -80,9 +84,9 @@ class OptimizedQueueConsumer: if hasattr(huey, 'always_eager'): huey.always_eager = False - print(f"队列消费者优化设置完成:") - print(f"- 工作类型: {self.worker_type}") - print(f"- 工作线程数: {self.workers}") + logger.info(f"队列消费者优化设置完成:") + logger.info(f"- 工作类型: {self.worker_type}") + logger.info(f"- 工作线程数: {self.workers}") def monitor_performance(self): """性能监控线程""" @@ -93,27 +97,26 @@ class OptimizedQueueConsumer: elapsed = time.time() - self.start_time rate = self.performance_stats['tasks_processed'] / max(1, elapsed) - print(f"\n[性能统计]") - print(f"- 运行时间: {elapsed:.1f}秒") - print(f"- 已处理任务: {self.performance_stats['tasks_processed']}") - print(f"- 失败任务: {self.performance_stats['tasks_failed']}") - print(f"- 平均处理速率: {rate:.2f} 任务/秒") + logger.info(f"\n[性能统计]") + logger.info(f"- 运行时间: {elapsed:.1f}秒") + logger.info(f"- 已处理任务: {self.performance_stats['tasks_processed']}") + logger.info(f"- 失败任务: {self.performance_stats['tasks_failed']}") + logger.info(f"- 平均处理速率: {rate:.2f} 任务/秒") if self.performance_stats['avg_processing_time'] > 0: - print(f"- 平均处理时间: {self.performance_stats['avg_processing_time']:.2f}秒") + logger.info(f"- 平均处理时间: {self.performance_stats['avg_processing_time']:.2f}秒") def start(self): """启动队列消费者""" - print("=" * 60) - print("优化的队列消费者启动") - print("=" * 60) + logger.info("=" * 60) + logger.info("优化的队列消费者启动") + logger.info("=" * 60) # 设置优化 self.setup_optimizations() - print(f"数据库: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}") - print("按 Ctrl+C 停止消费者") - print() + logger.info(f"数据库: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}") + logger.info("按 Ctrl+C 停止消费者") self.running = True self.start_time = time.time() @@ -134,15 +137,15 @@ class OptimizedQueueConsumer: periodic=True, # 启用定期任务 ) - print("队列消费者已启动,等待任务...") + logger.info("队列消费者已启动,等待任务...") # 启动消费者 self.consumer.run() except KeyboardInterrupt: - print("\n收到键盘中断信号") + logger.info("\n收到键盘中断信号") except Exception as e: - print(f"队列消费者运行错误: {e}") + logger.error(f"队列消费者运行错误: {e}") import traceback traceback.print_exc() finally: @@ -150,27 +153,27 @@ class OptimizedQueueConsumer: def shutdown(self): """关闭队列消费者""" - print("\n正在关闭队列消费者...") + logger.info("\n正在关闭队列消费者...") self.running = False if self.consumer: try: self.consumer.stop() - print("队列消费者已停止") + logger.info("队列消费者已停止") except Exception as e: - print(f"停止队列消费者时出错: {e}") + logger.error(f"停止队列消费者时出错: {e}") # 输出最终统计 if self.start_time: elapsed = time.time() - self.start_time - print(f"\n[最终统计]") - print(f"- 总运行时间: {elapsed:.1f}秒") - print(f"- 总处理任务: {self.performance_stats['tasks_processed']}") - print(f"- 总失败任务: {self.performance_stats['tasks_failed']}") + logger.info(f"\n[最终统计]") + logger.info(f"- 总运行时间: {elapsed:.1f}秒") + logger.info(f"- 总处理任务: {self.performance_stats['tasks_processed']}") + logger.info(f"- 总失败任务: {self.performance_stats['tasks_failed']}") if self.performance_stats['tasks_processed'] > 0: rate = self.performance_stats['tasks_processed'] / elapsed - print(f"- 平均处理速率: {rate:.2f} 任务/秒") + logger.info(f"- 平均处理速率: {rate:.2f} 任务/秒") def calculate_optimal_workers(): @@ -191,25 +194,25 @@ def check_queue_status(): try: stats = queue_manager.get_queue_stats() - print("\n[队列状态]") + logger.info("\n[队列状态]") if isinstance(stats, dict): if 'total_tasks' in stats: - print(f"- 总任务数: {stats['total_tasks']}") + logger.info(f"- 总任务数: {stats['total_tasks']}") if 'pending_tasks' in stats: - print(f"- 待处理任务: {stats['pending_tasks']}") + logger.info(f"- 待处理任务: {stats['pending_tasks']}") if 'scheduled_tasks' in stats: - print(f"- 定时任务: {stats['scheduled_tasks']}") + logger.info(f"- 定时任务: {stats['scheduled_tasks']}") # 检查数据库文件 db_path = os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db') if os.path.exists(db_path): size = os.path.getsize(db_path) - print(f"- 数据库大小: {size} 字节") + logger.info(f"- 数据库大小: {size} 字节") else: - print("- 数据库文件: 不存在") + logger.info("- 数据库文件: 不存在") except Exception as e: - print(f"获取队列状态失败: {e}") + logger.error(f"获取队列状态失败: {e}") def main(): @@ -248,11 +251,11 @@ def main(): os.environ['PYTHONOPTIMIZE'] = '1' if args.workers > 2: args.workers = 2 - print(f"低内存模式: 调整工作线程数为 {args.workers}") + logger.info(f"低内存模式: 调整工作线程数为 {args.workers}") elif args.profile == "high_performance": if args.workers < 4: args.workers = 4 - print(f"高性能模式: 调整工作线程数为 {args.workers}") + logger.info(f"高性能模式: 调整工作线程数为 {args.workers}") # 检查队列状态 if args.check_status: @@ -263,12 +266,12 @@ def main(): try: import psutil memory = psutil.virtual_memory() - print(f"[系统信息]") - print(f"- CPU核心数: {multiprocessing.cpu_count()}") - print(f"- 可用内存: {memory.available / (1024**3):.1f}GB") - print(f"- 内存使用率: {memory.percent:.1f}%") + logger.info(f"[系统信息]") + logger.info(f"- CPU核心数: {multiprocessing.cpu_count()}") + logger.info(f"- 可用内存: {memory.available / (1024**3):.1f}GB") + logger.info(f"- 内存使用率: {memory.percent:.1f}%") except ImportError: - print("[提示] 安装 psutil 可显示系统信息: pip install psutil") + logger.info("[提示] 安装 psutil 可显示系统信息: pip install psutil") # 创建并启动队列消费者 consumer = OptimizedQueueConsumer( diff --git a/task_queue/tasks.py b/task_queue/tasks.py index 10a2afd..50f04e6 100644 --- a/task_queue/tasks.py +++ b/task_queue/tasks.py @@ -7,10 +7,14 @@ import os import json import time import shutil +import logging from pathlib import Path from typing import Dict, List, Optional, Any from huey import crontab +# 配置日志 +logger = logging.getLogger('app') + from .config import huey from utils.file_utils import ( extract_zip_file, @@ -42,7 +46,7 @@ def process_file_async( 处理结果字典 """ try: - print(f"开始处理文件: {file_path}") + logger.info(f"开始处理文件: {file_path}") # 确保项目目录存在 project_dir = os.path.join("projects", project_id) @@ -55,7 +59,7 @@ def process_file_async( # 检查文件是否已处理 processed_log = load_processed_files_log(project_id) if file_hash in processed_log: - print(f"文件已处理,跳过: {file_path}") + logger.info(f"文件已处理,跳过: {file_path}") return { "status": "skipped", "message": "文件已处理", @@ -84,12 +88,12 @@ def process_file_async( result["file_hash"] = file_hash result["project_id"] = project_id - print(f"文件处理完成: {file_path}, 状态: {result['status']}") + logger.info(f"文件处理完成: {file_path}, 状态: {result['status']}") return result except Exception as e: error_msg = f"处理文件时发生错误: {str(e)}" - print(error_msg) + logger.error(error_msg) return { "status": "error", "message": error_msg, @@ -116,7 +120,7 @@ def process_multiple_files_async( 处理结果列表 """ try: - print(f"开始批量处理 {len(file_paths)} 个文件") + logger.info(f"开始批量处理 {len(file_paths)} 个文件") results = [] for i, file_path in enumerate(file_paths): @@ -126,12 +130,12 @@ def process_multiple_files_async( result = process_file_async(project_id, file_path, original_filename) results.append(result) - print(f"批量文件处理任务已提交,共 {len(results)} 个文件") + logger.info(f"批量文件处理任务已提交,共 {len(results)} 个文件") return results except Exception as e: error_msg = f"批量处理文件时发生错误: {str(e)}" - print(error_msg) + logger.error(error_msg) return [{ "status": "error", "message": error_msg, @@ -157,7 +161,7 @@ def process_zip_file_async( 处理结果字典 """ try: - print(f"开始处理zip文件: {zip_path}") + logger.info(f"开始处理zip文件: {zip_path}") # 设置解压目录 if extract_to is None: @@ -191,7 +195,7 @@ def process_zip_file_async( except Exception as e: error_msg = f"处理zip文件时发生错误: {str(e)}" - print(error_msg) + logger.error(error_msg) return { "status": "error", "message": error_msg, @@ -216,7 +220,7 @@ def cleanup_processed_files( 清理结果字典 """ try: - print(f"开始清理项目 {project_id} 中 {older_than_days} 天前的文件") + logger.info(f"开始清理项目 {project_id} 中 {older_than_days} 天前的文件") project_dir = os.path.join("projects", project_id) if not os.path.exists(project_dir): @@ -240,9 +244,9 @@ def cleanup_processed_files( try: os.remove(file_path) cleaned_files.append(file_path) - print(f"已删除旧文件: {file_path}") + logger.info(f"已删除旧文件: {file_path}") except Exception as e: - print(f"删除文件失败 {file_path}: {str(e)}") + logger.error(f"删除文件失败 {file_path}: {str(e)}") # 清理空目录 for root, dirs, files in os.walk(project_dir, topdown=False): @@ -251,9 +255,9 @@ def cleanup_processed_files( try: if not os.listdir(dir_path): os.rmdir(dir_path) - print(f"已删除空目录: {dir_path}") + logger.info(f"已删除空目录: {dir_path}") except Exception as e: - print(f"删除目录失败 {dir_path}: {str(e)}") + logger.error(f"删除目录失败 {dir_path}: {str(e)}") return { "status": "success", @@ -265,7 +269,7 @@ def cleanup_processed_files( except Exception as e: error_msg = f"清理文件时发生错误: {str(e)}" - print(error_msg) + logger.error(error_msg) return { "status": "error", "message": error_msg, @@ -351,6 +355,6 @@ def _process_single_file( @huey.periodic_task(crontab(hour=2, minute=0)) def daily_cleanup(): """每日清理任务""" - print("执行每日清理任务") + logger.info("执行每日清理任务") # 这里可以添加清理逻辑 return {"status": "completed", "message": "每日清理任务完成"} diff --git a/utils/agent_pool.py b/utils/agent_pool.py index 82b9684..660f020 100644 --- a/utils/agent_pool.py +++ b/utils/agent_pool.py @@ -2,7 +2,7 @@ import asyncio from typing import List, Optional import logging -logger = logging.getLogger(__name__) +logger = logging.getLogger('app') class AgentPool: @@ -175,4 +175,4 @@ async def release_agent_to_pool(agent): if _global_agent_pool is None: raise RuntimeError("全局助手实例池未初始化") - await _global_agent_pool.release_agent(agent) \ No newline at end of file + await _global_agent_pool.release_agent(agent) diff --git a/utils/async_file_ops.py b/utils/async_file_ops.py index 10bb941..3347592 100644 --- a/utils/async_file_ops.py +++ b/utils/async_file_ops.py @@ -8,6 +8,7 @@ import json import asyncio import aiofiles import aiofiles.os +import logging from typing import Dict, List, Optional, Any from pathlib import Path import weakref @@ -15,6 +16,9 @@ import threading import time from concurrent.futures import ThreadPoolExecutor +# Configure logger +logger = logging.getLogger('app') + class AsyncFileCache: """异步文件缓存管理器""" @@ -70,7 +74,7 @@ class AsyncFileCache: return content except Exception as e: - print(f"Error reading file {abs_path}: {e}") + logger.error(f"Error reading file {abs_path}: {e}") return "" def _read_text_file(self, file_path: str, encoding: str) -> str: @@ -90,7 +94,7 @@ class AsyncFileCache: try: return json.loads(content) except json.JSONDecodeError as e: - print(f"Error parsing JSON from {file_path}: {e}") + logger.error(f"Error parsing JSON from {file_path}: {e}") return {} async def write_file(self, file_path: str, content: str, encoding: str = 'utf-8'): @@ -244,7 +248,7 @@ class ParallelFileReader: content = await task results[file_path] = content except Exception as e: - print(f"Error reading {file_path}: {e}") + logger.error(f"Error reading {file_path}: {e}") results[file_path] = "" return results @@ -269,7 +273,7 @@ class ParallelFileReader: try: results[file_path] = json.loads(content) except json.JSONDecodeError as e: - print(f"Error parsing JSON from {file_path}: {e}") + logger.error(f"Error parsing JSON from {file_path}: {e}") results[file_path] = {} else: results[file_path] = {} diff --git a/utils/data_merger.py b/utils/data_merger.py index 61549ec..59bbd98 100644 --- a/utils/data_merger.py +++ b/utils/data_merger.py @@ -5,15 +5,19 @@ Data merging functions for combining processed file results. import os import pickle +import logging from typing import Dict, List, Optional, Tuple import json +# Configure logger +logger = logging.getLogger('app') + # Try to import numpy, but handle if missing try: import numpy as np NUMPY_SUPPORT = True except ImportError: - print("NumPy not available, some embedding features may be limited") + logger.warning("NumPy not available, some embedding features may be limited") NUMPY_SUPPORT = False @@ -66,7 +70,7 @@ def merge_documents_by_group(unique_id: str, group_name: str) -> Dict: result["source_files"].append(filename_stem) except Exception as e: - print(f"Error reading document file {document_path}: {str(e)}") + logger.error(f"Error reading document file {document_path}: {str(e)}") continue if merged_content: @@ -83,7 +87,7 @@ def merge_documents_by_group(unique_id: str, group_name: str) -> Dict: except Exception as e: result["error"] = f"Document merging failed: {str(e)}" - print(f"Error merging documents for group {group_name}: {str(e)}") + logger.error(f"Error merging documents for group {group_name}: {str(e)}") return result @@ -136,7 +140,7 @@ def merge_paginations_by_group(unique_id: str, group_name: str) -> Dict: result["source_files"].append(filename_stem) except Exception as e: - print(f"Error reading pagination file {pagination_path}: {str(e)}") + logger.error(f"Error reading pagination file {pagination_path}: {str(e)}") continue if merged_lines: @@ -153,7 +157,7 @@ def merge_paginations_by_group(unique_id: str, group_name: str) -> Dict: except Exception as e: result["error"] = f"Pagination merging failed: {str(e)}" - print(f"Error merging paginations for group {group_name}: {str(e)}") + logger.error(f"Error merging paginations for group {group_name}: {str(e)}") return result @@ -236,7 +240,7 @@ def merge_embeddings_by_group(unique_id: str, group_name: str) -> Dict: result["source_files"].append(filename_stem) except Exception as e: - print(f"Error loading embedding file {embedding_path}: {str(e)}") + logger.error(f"Error loading embedding file {embedding_path}: {str(e)}") continue if all_chunks and all_embeddings: @@ -259,7 +263,7 @@ def merge_embeddings_by_group(unique_id: str, group_name: str) -> Dict: np_embeddings.append(emb) else: # 如果无法转换,跳过这个文件 - print(f"Warning: Cannot convert embedding to numpy from file {filename_stem}") + logger.warning(f"Warning: Cannot convert embedding to numpy from file {filename_stem}") continue if np_embeddings: @@ -283,7 +287,7 @@ def merge_embeddings_by_group(unique_id: str, group_name: str) -> Dict: elif isinstance(emb, np.ndarray): np_embeddings.append(emb) else: - print(f"Warning: Cannot convert embedding to numpy from file {filename_stem}") + logger.warning(f"Warning: Cannot convert embedding to numpy from file {filename_stem}") continue if np_embeddings: @@ -297,7 +301,7 @@ def merge_embeddings_by_group(unique_id: str, group_name: str) -> Dict: return result except Exception as e: result["error"] = f"Failed to merge embedding tensors: {str(e)}" - print(f"Error merging embedding tensors: {str(e)}") + logger.error(f"Error merging embedding tensors: {str(e)}") return result # Create merged embedding data structure @@ -327,7 +331,7 @@ def merge_embeddings_by_group(unique_id: str, group_name: str) -> Dict: except Exception as e: result["error"] = f"Embedding merging failed: {str(e)}" - print(f"Error merging embeddings for group {group_name}: {str(e)}") + logger.error(f"Error merging embeddings for group {group_name}: {str(e)}") return result @@ -425,11 +429,11 @@ def cleanup_dataset_group(unique_id: str, group_name: str) -> bool: if os.path.exists(dataset_group_dir): import shutil shutil.rmtree(dataset_group_dir) - print(f"Cleaned up dataset group: {group_name}") + logger.info(f"Cleaned up dataset group: {group_name}") return True else: return True # Nothing to clean up except Exception as e: - print(f"Error cleaning up dataset group {group_name}: {str(e)}") + logger.error(f"Error cleaning up dataset group {group_name}: {str(e)}") return False diff --git a/utils/dataset_manager.py b/utils/dataset_manager.py index b438188..ef73697 100644 --- a/utils/dataset_manager.py +++ b/utils/dataset_manager.py @@ -6,8 +6,12 @@ New implementation with per-file processing and group merging. import os import json +import logging from typing import Dict, List +# Configure logger +logger = logging.getLogger('app') + # Import new modules from utils.file_manager import ( ensure_directories, sync_files_to_group, cleanup_orphaned_files, @@ -37,13 +41,13 @@ async def download_dataset_files(unique_id: str, files: Dict[str, List[str]], in if not files: return {} - print(f"Starting {'incremental' if incremental_mode else 'full'} file processing for project: {unique_id}") + logger.info(f"Starting {'incremental' if incremental_mode else 'full'} file processing for project: {unique_id}") # Ensure project directories exist ensure_directories(unique_id) # Step 1: Sync files to group directories - print("Step 1: Syncing files to group directories...") + logger.info("Step 1: Syncing files to group directories...") synced_files, failed_files = sync_files_to_group(unique_id, files, incremental_mode) # Step 2: Detect changes and cleanup orphaned files (only in non-incremental mode) @@ -52,14 +56,14 @@ async def download_dataset_files(unique_id: str, files: Dict[str, List[str]], in # Only cleanup orphaned files in non-incremental mode or when files are explicitly removed if not incremental_mode and any(changes["removed"].values()): - print("Step 2: Cleaning up orphaned files...") + logger.info("Step 2: Cleaning up orphaned files...") removed_files = cleanup_orphaned_files(unique_id, changes) - print(f"Removed orphaned files: {removed_files}") + logger.info(f"Removed orphaned files: {removed_files}") elif incremental_mode: - print("Step 2: Skipping cleanup in incremental mode to preserve existing files") + logger.info("Step 2: Skipping cleanup in incremental mode to preserve existing files") # Step 3: Process individual files - print("Step 3: Processing individual files...") + logger.info("Step 3: Processing individual files...") processed_files_by_group = {} processing_results = {} @@ -75,12 +79,12 @@ async def download_dataset_files(unique_id: str, files: Dict[str, List[str]], in # Skip if file doesn't exist (might be remote file that failed to download) if not os.path.exists(local_path) and not file_path.startswith(('http://', 'https://')): - print(f"Skipping non-existent file: {filename}") + logger.warning(f"Skipping non-existent file: {filename}") continue # Check if already processed if check_file_already_processed(unique_id, group_name, filename): - print(f"Skipping already processed file: {filename}") + logger.info(f"Skipping already processed file: {filename}") processed_files_by_group[group_name].append(filename) processing_results[group_name].append({ "filename": filename, @@ -89,18 +93,18 @@ async def download_dataset_files(unique_id: str, files: Dict[str, List[str]], in continue # Process the file - print(f"Processing file: {filename} (group: {group_name})") + logger.info(f"Processing file: {filename} (group: {group_name})") result = await process_single_file(unique_id, group_name, filename, file_path, local_path) processing_results[group_name].append(result) if result["success"]: processed_files_by_group[group_name].append(filename) - print(f" Successfully processed {filename}") + logger.info(f" Successfully processed {filename}") else: - print(f" Failed to process {filename}: {result['error']}") + logger.error(f" Failed to process {filename}: {result['error']}") # Step 4: Merge results by group - print("Step 4: Merging results by group...") + logger.info("Step 4: Merging results by group...") merge_results = {} for group_name in processed_files_by_group.keys(): @@ -108,20 +112,20 @@ async def download_dataset_files(unique_id: str, files: Dict[str, List[str]], in group_files = get_group_files_list(unique_id, group_name) if group_files: - print(f"Merging group: {group_name} with {len(group_files)} files") + logger.info(f"Merging group: {group_name} with {len(group_files)} files") merge_result = merge_all_data_by_group(unique_id, group_name) merge_results[group_name] = merge_result if merge_result["success"]: - print(f" Successfully merged group {group_name}") + logger.info(f" Successfully merged group {group_name}") else: - print(f" Failed to merge group {group_name}: {merge_result['errors']}") + logger.error(f" Failed to merge group {group_name}: {merge_result['errors']}") # Step 5: Save processing log - print("Step 5: Saving processing log...") + logger.info("Step 5: Saving processing log...") await save_processing_log(unique_id, files, synced_files, processing_results, merge_results) - print(f"File processing completed for project: {unique_id}") + logger.info(f"File processing completed for project: {unique_id}") return processed_files_by_group @@ -156,9 +160,9 @@ async def save_processing_log( try: with open(log_file_path, 'w', encoding='utf-8') as f: json.dump(log_data, f, ensure_ascii=False, indent=2) - print(f"Processing log saved to: {log_file_path}") + logger.info(f"Processing log saved to: {log_file_path}") except Exception as e: - print(f"Error saving processing log: {str(e)}") + logger.error(f"Error saving processing log: {str(e)}") def generate_dataset_structure(unique_id: str) -> str: diff --git a/utils/excel_csv_processor.py b/utils/excel_csv_processor.py index d04bee8..8fa6ed3 100644 --- a/utils/excel_csv_processor.py +++ b/utils/excel_csv_processor.py @@ -5,8 +5,12 @@ Excel and CSV file processor for converting data to document.txt and pagination. import os import pandas as pd +import logging from typing import List, Dict, Any, Tuple +# 配置日志 +logger = logging.getLogger('app') + def read_excel_sheets(file_path: str) -> Dict[str, List[Dict[str, Any]]]: """ @@ -48,16 +52,16 @@ def read_excel_sheets(file_path: str) -> Dict[str, List[Dict[str, Any]]]: sheet_data.append(row_dict) sheets_data[sheet_name] = sheet_data - print(f"读取 Excel sheet '{sheet_name}': {len(sheet_data)} 行数据") + logger.info(f"读取 Excel sheet '{sheet_name}': {len(sheet_data)} 行数据") except Exception as e: - print(f"读取 Excel sheet '{sheet_name}' 失败: {str(e)}") + logger.error(f"读取 Excel sheet '{sheet_name}' 失败: {str(e)}") continue return sheets_data except Exception as e: - print(f"读取 Excel 文件失败: {str(e)}") + logger.error(f"读取 Excel 文件失败: {str(e)}") return {} @@ -105,11 +109,11 @@ def read_csv_file(file_path: str, encoding: str = 'utf-8') -> List[Dict[str, Any if any(v.strip() for v in row_dict.values()): csv_data.append(row_dict) - print(f"读取 CSV 文件: {len(csv_data)} 行数据") + logger.info(f"读取 CSV 文件: {len(csv_data)} 行数据") return csv_data except Exception as e: - print(f"读取 CSV 文件失败: {str(e)}") + logger.error(f"读取 CSV 文件失败: {str(e)}") return [] diff --git a/utils/fastapi_utils.py b/utils/fastapi_utils.py index f96957d..19a95cf 100644 --- a/utils/fastapi_utils.py +++ b/utils/fastapi_utils.py @@ -9,7 +9,9 @@ import aiohttp from qwen_agent.llm.schema import ASSISTANT, FUNCTION from qwen_agent.llm.oai import TextChatAtOAI from fastapi import HTTPException -from utils.logger import logger +import logging + +logger = logging.getLogger('app') # 创建全局线程池执行器,用于执行同步的HTTP调用 thread_pool = ThreadPoolExecutor(max_workers=10) @@ -61,9 +63,9 @@ def get_versioned_filename(upload_dir: str, name_without_ext: str, file_extensio file_to_delete = os.path.join(upload_dir, filename) try: os.remove(file_to_delete) - print(f"已删除文件: {file_to_delete}") + logger.info(f"已删除文件: {file_to_delete}") except OSError as e: - print(f"删除文件失败 {file_to_delete}: {e}") + logger.error(f"删除文件失败 {file_to_delete}: {e}") # 确定下一个版本号 if existing_versions: @@ -301,7 +303,7 @@ def create_project_directory(dataset_ids: Optional[List[str]], bot_id: str, robo from utils.multi_project_manager import create_robot_project return create_robot_project(dataset_ids, bot_id) except Exception as e: - print(f"Error creating project directory: {e}") + logger.error(f"Error creating project directory: {e}") return None @@ -392,7 +394,7 @@ def _sync_call_guideline_llm(llm_config, messages) -> str: return str(response) if response else "" except Exception as e: - print(f"Error calling guideline LLM: {e}") + logger.error(f"Error calling guideline LLM: {e}") return "" @@ -414,7 +416,7 @@ async def call_guideline_llm(chat_history: str, guidelines_text: str, terms:str, with open('./prompt/guideline_prompt.md', 'r', encoding='utf-8') as f: guideline_template = f.read() except Exception as e: - print(f"Error reading guideline prompt template: {e}") + logger.error(f"Error reading guideline prompt template: {e}") return "" # 替换模板中的占位符 @@ -439,7 +441,7 @@ async def call_guideline_llm(chat_history: str, guidelines_text: str, terms:str, return response except Exception as e: - print(f"Error calling guideline LLM: {e}") + logger.error(f"Error calling guideline LLM: {e}") return "" @@ -466,28 +468,43 @@ async def process_guideline_batch( model_server: str ) -> str: """处理单个guideline批次""" - try: - # 调用LLM分析这批guidelines - batch_guidelines_text = "\n".join(guidelines_batch) - batch_analysis = await call_guideline_llm(chat_history, batch_guidelines_text, terms, model_name, api_key, model_server) + max_retries = 3 - # 从响应中提取 ```json 和 ``` 包裹的内容 - json_pattern = r'```json\s*\n(.*?)\n```' - json_matches = re.findall(json_pattern, batch_analysis, re.DOTALL) - - if json_matches: - try: - # 解析第一个找到的JSON对象 - json_data = json.loads(json_matches[0]) - return json_data # 返回解析后的JSON对象 - except json.JSONDecodeError as e: - print(f"Error parsing JSON from guideline analysis: {e}") - return batch_analysis # 如果JSON解析失败,返回原始文本 - else: - return batch_analysis # 如果没有找到JSON格式,返回原始文本 - except Exception as e: - print(f"Error processing guideline batch: {e}") - return "" + for attempt in range(max_retries): + try: + # 调用LLM分析这批guidelines + batch_guidelines_text = "\n".join(guidelines_batch) + logger.info(f"Start processed guideline batch on attempt {attempt + 1}") + batch_analysis = await call_guideline_llm(chat_history, batch_guidelines_text, terms, model_name, api_key, model_server) + + # 从响应中提取 ```json 和 ``` 包裹的内容 + json_pattern = r'```json\s*\n(.*?)\n```' + json_matches = re.findall(json_pattern, batch_analysis, re.DOTALL) + + if json_matches: + try: + # 解析第一个找到的JSON对象 + json_data = json.loads(json_matches[0]) + logger.info(f"Successfully processed guideline batch on attempt {attempt + 1}") + return json_data # 返回解析后的JSON对象 + except json.JSONDecodeError as e: + logger.error(f"Error parsing JSON from guideline analysis on attempt {attempt + 1}: {e}") + if attempt == max_retries - 1: + return batch_analysis # 最后一次尝试失败,返回原始文本 + continue + else: + logger.warning(f"No JSON format found in guideline analysis on attempt {attempt + 1}") + if attempt == max_retries - 1: + return batch_analysis # 最后一次尝试失败,返回原始文本 + continue + + except Exception as e: + logger.error(f"Error processing guideline batch on attempt {attempt + 1}: {e}") + if attempt == max_retries - 1: + return "" # 最后一次尝试失败,返回空字符串 + + # 这里不应该到达,但为了完整性 + return "" def extract_block_from_system_prompt(system_prompt: Optional[str]) -> tuple[str, List[Dict[str, Any]], List[Dict[str, Any]]]: @@ -519,7 +536,7 @@ def extract_block_from_system_prompt(system_prompt: Optional[str]) -> tuple[str, guidelines_list.extend(guidelines) blocks_to_remove.append(match.group(0)) except Exception as e: - print(f"Error parsing guidelines: {e}") + logger.error(f"Error parsing guidelines: {e}") elif block_type == 'terms': try: @@ -527,7 +544,7 @@ def extract_block_from_system_prompt(system_prompt: Optional[str]) -> tuple[str, terms_list.extend(terms) blocks_to_remove.append(match.group(0)) except Exception as e: - print(f"Error parsing terms: {e}") + logger.error(f"Error parsing terms: {e}") # 从system_prompt中移除这些已解析的块 cleaned_prompt = system_prompt diff --git a/utils/file_manager.py b/utils/file_manager.py index dcb0394..191e45f 100644 --- a/utils/file_manager.py +++ b/utils/file_manager.py @@ -8,6 +8,9 @@ import json import shutil from typing import Dict, List, Set, Tuple from pathlib import Path +import logging + +logger = logging.getLogger('app') def get_existing_files(unique_id: str) -> Dict[str, Set[str]]: @@ -204,11 +207,11 @@ def cleanup_orphaned_files(unique_id: str, changes: Dict) -> Dict[str, List[str] removed_files[group_name].append(f"processed: {Path(filename).stem}") except Exception as e: - print(f"Error cleaning up {filename}: {str(e)}") + logger.error(f"Error cleaning up {filename}: {str(e)}") # Handle completely removed groups for group_name in changes.get("removed_groups", set()): - print(f"Cleaning up completely removed group: {group_name}") + logger.info(f"Cleaning up completely removed group: {group_name}") removed_files[group_name] = [] try: @@ -230,10 +233,10 @@ def cleanup_orphaned_files(unique_id: str, changes: Dict) -> Dict[str, List[str] shutil.rmtree(dataset_group_dir) removed_files[group_name].append("dataset group directory") - print(f"Completely removed group '{group_name}' and all its data") + logger.info(f"Completely removed group '{group_name}' and all its data") except Exception as e: - print(f"Error cleaning up group {group_name}: {str(e)}") + logger.error(f"Error cleaning up group {group_name}: {str(e)}") return removed_files diff --git a/utils/file_utils.py b/utils/file_utils.py index 404c20f..8495d61 100644 --- a/utils/file_utils.py +++ b/utils/file_utils.py @@ -10,9 +10,13 @@ import aiohttp import shutil import zipfile import tempfile +import logging from typing import Dict, List, Optional from pathlib import Path +# 配置日志 +logger = logging.getLogger('app') + async def download_file(url: str, destination_path: str) -> bool: """Download file from URL asynchronously""" @@ -25,10 +29,10 @@ async def download_file(url: str, destination_path: str) -> bool: await f.write(chunk) return True else: - print(f"Failed to download {url}, status code: {response.status}") + logger.error(f"Failed to download {url}, status code: {response.status}") return False except Exception as e: - print(f"Error downloading {url}: {str(e)}") + logger.error(f"Error downloading {url}: {str(e)}") return False @@ -45,11 +49,11 @@ def remove_file_or_directory(path: str): os.remove(path) elif os.path.isdir(path): shutil.rmtree(path) - print(f"Removed: {path}") + logger.info(f"Removed: {path}") else: - print(f"Path does not exist: {path}") + logger.warning(f"Path does not exist: {path}") except Exception as e: - print(f"Error removing {path}: {str(e)}") + logger.error(f"Error removing {path}: {str(e)}") def extract_zip_file(zip_path: str, extract_dir: str) -> List[str]: @@ -65,11 +69,11 @@ def extract_zip_file(zip_path: str, extract_dir: str) -> List[str]: if file.lower().endswith(('.txt', '.md', '.xlsx', '.xls', '.csv')): extracted_files.append(os.path.join(root, file)) - print(f"Extracted {len(extracted_files)} txt/md/xlsx/csv files from {zip_path}") + logger.info(f"Extracted {len(extracted_files)} txt/md/xlsx/csv files from {zip_path}") return extracted_files except Exception as e: - print(f"Error extracting zip file {zip_path}: {str(e)}") + logger.error(f"Error extracting zip file {zip_path}: {str(e)}") return [] @@ -110,7 +114,7 @@ def load_processed_files_log(unique_id: str) -> Dict[str, Dict]: with open(log_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: - print(f"Error loading processed files log: {e}") + logger.error(f"Error loading processed files log: {e}") return {} @@ -123,7 +127,7 @@ def save_processed_files_log(unique_id: str, processed_log: Dict[str, Dict]): with open(log_file, 'w', encoding='utf-8') as f: json.dump(processed_log, f, ensure_ascii=False, indent=2) except Exception as e: - print(f"Error saving processed files log: {e}") + logger.error(f"Error saving processed files log: {e}") def get_processing_log(unique_id: str) -> Dict: @@ -135,7 +139,7 @@ def get_processing_log(unique_id: str) -> Dict: with open(log_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: - print(f"Error loading processing log: {e}") + logger.error(f"Error loading processing log: {e}") return {} @@ -148,7 +152,7 @@ def save_project_status(unique_id: str, status: Dict): with open(status_file, 'w', encoding='utf-8') as f: json.dump(status, f, ensure_ascii=False, indent=2) except Exception as e: - print(f"Error saving project status: {e}") + logger.error(f"Error saving project status: {e}") def load_project_status(unique_id: str) -> Dict: @@ -160,7 +164,7 @@ def load_project_status(unique_id: str) -> Dict: with open(status_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: - print(f"Error loading project status: {e}") + logger.error(f"Error loading project status: {e}") return {} @@ -212,7 +216,7 @@ def update_file_processing_status(unique_id: str, group_name: str, filename: str json.dump(file_status, f, ensure_ascii=False, indent=2) except Exception as e: - print(f"Error updating file processing status: {e}") + logger.error(f"Error updating file processing status: {e}") def get_file_processing_status(unique_id: str, group_name: str = None, filename: str = None) -> Dict: @@ -240,7 +244,7 @@ def get_file_processing_status(unique_id: str, group_name: str = None, filename: return file_status except Exception as e: - print(f"Error getting file processing status: {e}") + logger.error(f"Error getting file processing status: {e}") return {} @@ -254,7 +258,7 @@ def calculate_directory_size(directory_path: str) -> int: if os.path.exists(file_path): total_size += os.path.getsize(file_path) except Exception as e: - print(f"Error calculating directory size: {e}") + logger.error(f"Error calculating directory size: {e}") return total_size diff --git a/utils/log_util/context.py b/utils/log_util/context.py new file mode 100644 index 0000000..0af8687 --- /dev/null +++ b/utils/log_util/context.py @@ -0,0 +1,32 @@ +# author: askfiy + +import contextvars + + +class GlobalContext: + def __init__(self): + super().__setattr__('_contextvar', contextvars.ContextVar(f"{__class__.__name__}_g")) + + def __getattr__(self, k: str): + ctx = self._contextvar.get() + return ctx[k] + + def __setattr__(self, k: str, v): + try: + ctx = self._contextvar.get() + except LookupError: + ctx = {} + + ctx.update({k: v}) + self._contextvar.set(ctx) + + def get_context(self): + return self._contextvar.get() + + def update_context(self, ctx): + self._contextvar.set(ctx) + + +g = GlobalContext() + + diff --git a/utils/log_util/decorator.py b/utils/log_util/decorator.py new file mode 100644 index 0000000..dc905a8 --- /dev/null +++ b/utils/log_util/decorator.py @@ -0,0 +1,32 @@ +import time +import logging +from functools import wraps +from functools import update_wrapper + +from .context import g + + +app_logger = logging.getLogger('app') + + +def safe_network(func): + @wraps(func) + def wrapper(*args, **kwargs): + while True: + try: + return func(*args, **kwargs) + except Exception as exc: + app_logger.error(exc) + time.sleep(1) + + return wrapper + + +def copy_threading_context(func): + main_thridng_g_ctx = g.get_context() + + def wrapper(*args, **kwargs): + g.update_context(main_thridng_g_ctx) + return func(*args, **kwargs) + + return update_wrapper(wrapper, func) diff --git a/utils/log_util/logger.py b/utils/log_util/logger.py new file mode 100644 index 0000000..fc168c6 --- /dev/null +++ b/utils/log_util/logger.py @@ -0,0 +1,93 @@ +import logging +import uuid +import json +from datetime import datetime + +from fastapi import Request, FastAPI +from starlette.responses import JSONResponse +from starlette.requests import HTTPConnection +from typing import Callable, Awaitable + +from .context import g + + +def add_request_routes(app: FastAPI): + @app.middleware("http") + async def before_request(request: Request, call_next: Callable[[Request], Awaitable[JSONResponse]]): + + # 先从header中获取X-Trace-Id,如果没有则生成新的 + trace_id = request.headers.get('X-Trace-Id') + if not trace_id: + trace_id = "generate_" + str(uuid.uuid4()) + + # user_id = "未知的 user_id" + + g.trace_id = trace_id + # g.user_id = user_id + response = await call_next(request) + + response.headers['X-Trace-Id'] = g.trace_id + return response + + +class Formatter(logging.Formatter): + def formatTime(self, record, datefmt=None): + # 将时间戳转换为datetime对象 + dt = datetime.fromtimestamp(record.created) + # 格式化时间戳到毫秒 + if datefmt: + s = dt.strftime(datefmt) + # 去除微秒的后三位,保留毫秒部分 + s = s[:-3] + else: + # 去除微秒的后三位,保留毫秒部分 + s = dt.strftime("%H:%M:%S.%f")[:-3] + return s + + def format(self, record): + # 处理 trace_id + if not hasattr(record, "trace_id"): + record.trace_id = getattr(g, "trace_id") + # 处理 user_id + # if not hasattr(record, "user_id"): + # record.user_id = getattr(g, "user_id") + + # 格式化时间戳 + record.timestamp = self.formatTime(record, self.datefmt) + + return super().format(record) + + +# 这里注册session 上下文追踪一次就可以了 +def init_logger_once(name,level): + logger = logging.getLogger(name) + logger.setLevel(level=level) + formatter = Formatter("%(timestamp)s | %(levelname)-5s | %(trace_id)s | %(name)s:%(funcName)s:%(lineno)s - %(message)s", datefmt='%Y-%m-%d %H:%M:%S.%f') + handler = logging.StreamHandler() + handler.setFormatter(formatter) + logger.addHandler(handler) + + +def init_with_fastapi(app,level=logging.INFO): + init_logger_once("app",level) + add_request_routes(app) + +def info(message, *args, **kwargs): + app_logger = logging.getLogger('app') + app_logger.info(message, *args, **kwargs) + +def debug(message, *args, **kwargs): + app_logger = logging.getLogger('app') + app_logger.debug(message, *args, **kwargs) + +def warning(message, *args, **kwargs): + app_logger = logging.getLogger('app') + app_logger.warning(message, *args, **kwargs) + +def error(message, *args, **kwargs): + app_logger = logging.getLogger('app') + app_logger.error(message, *args, **kwargs) + +def critical(message, *args, **kwargs): + app_logger = logging.getLogger('app') + app_logger.critical(message, *args, **kwargs) diff --git a/utils/multi_project_manager.py b/utils/multi_project_manager.py index 5321c2c..76cc16f 100644 --- a/utils/multi_project_manager.py +++ b/utils/multi_project_manager.py @@ -6,10 +6,14 @@ import os import shutil import json +import logging from pathlib import Path from typing import List, Dict, Optional from datetime import datetime +# Configure logger +logger = logging.getLogger('app') + from utils.file_utils import get_document_preview @@ -177,11 +181,11 @@ def copy_dataset_folder(source_project_id: str, target_dataset_dir: Path, folder shutil.copytree(source_folder, target_folder) result["success"] = True - print(f" Copied: {source_folder} -> {target_folder}") + logger.info(f" Copied: {source_folder} -> {target_folder}") except Exception as e: result["error"] = str(e) - print(f" Error copying {folder_name}: {str(e)}") + logger.error(f" Error copying {folder_name}: {str(e)}") return result @@ -237,7 +241,7 @@ def generate_robot_readme(robot_id: str, dataset_ids: List[str], copy_results: L if item_path.is_dir(): doc_dirs.append(item) except Exception as e: - print(f"Error listing dataset directories: {str(e)}") + logger.error(f"Error listing dataset directories: {str(e)}") if not doc_dirs: readme_content += "No document directories found.\n" @@ -292,7 +296,7 @@ def generate_robot_readme(robot_id: str, dataset_ids: List[str], copy_results: L with open(readme_path, 'w', encoding='utf-8') as f: f.write(readme_content) - print(f"Generated README: {readme_path}") + logger.info(f"Generated README: {readme_path}") return str(readme_path) @@ -314,14 +318,14 @@ def should_rebuild_robot_project(dataset_ids: List[str], bot_id: str) -> bool: # 如果机器人项目不存在,需要创建 if not robot_dir.exists(): - print(f"Robot project does not exist, need to create: {bot_id}") - return True + logger.info(f"Robot project does not exist, need to create: {bot_id}") + return True # 检查机器人项目的配置信息 config_file = robot_dir / "robot_config.json" if not config_file.exists(): - print(f"Robot config file not found, need to rebuild: {bot_id}") - return True + logger.info(f"Robot config file not found, need to rebuild: {bot_id}") + return True # 读取配置信息 try: @@ -329,8 +333,8 @@ def should_rebuild_robot_project(dataset_ids: List[str], bot_id: str) -> bool: config = json.load(f) cached_dataset_ids = set(config.get("dataset_ids", [])) except Exception as e: - print(f"Error reading robot config: {e}, need to rebuild") - return True + logger.error(f"Error reading robot config: {e}, need to rebuild") + return True # 检查dataset_ids是否有变化 current_dataset_ids = set(dataset_ids) @@ -338,14 +342,14 @@ def should_rebuild_robot_project(dataset_ids: List[str], bot_id: str) -> bool: # 如果有新增的dataset_id new_ids = current_dataset_ids - cached_dataset_ids if new_ids: - print(f"Found new dataset_ids: {new_ids}, need to rebuild") - return True + logger.info(f"Found new dataset_ids: {new_ids}, need to rebuild") + return True # 如果有删除的dataset_id removed_ids = cached_dataset_ids - current_dataset_ids if removed_ids: - print(f"Removed dataset_ids: {removed_ids}, need to rebuild") - return True + logger.info(f"Removed dataset_ids: {removed_ids}, need to rebuild") + return True # 获取机器人项目的最后修改时间 robot_mod_time = robot_dir.stat().st_mtime @@ -355,17 +359,17 @@ def should_rebuild_robot_project(dataset_ids: List[str], bot_id: str) -> bool: log_file = Path("projects") / "data" / source_project_id / "processing_log.json" if not log_file.exists(): - print(f"Processing log file not found for project {source_project_id}, will rebuild") - return True + logger.info(f"Processing log file not found for project {source_project_id}, will rebuild") + return True log_mod_time = log_file.stat().st_mtime # 如果任何一个processing_log.json文件比机器人项目新,需要重建 if log_mod_time > robot_mod_time: - print(f"Processing log updated for project {source_project_id}, need to rebuild") - return True + logger.info(f"Processing log updated for project {source_project_id}, need to rebuild") + return True - print(f"Robot project {bot_id} is up to date, no rebuild needed") + logger.info(f"Robot project {bot_id} is up to date, no rebuild needed") return False @@ -381,12 +385,12 @@ def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: boo Returns: str: 机器人项目目录路径 """ - print(f"Creating robot project: {bot_id} from sources: {dataset_ids}") + logger.info(f"Creating robot project: {bot_id} from sources: {dataset_ids}") # 检查是否需要重建 if not force_rebuild and not should_rebuild_robot_project(dataset_ids, bot_id): robot_dir = Path("projects") / "robot" / bot_id - print(f"Using existing robot project: {robot_dir}") + logger.info(f"Using existing robot project: {robot_dir}") return str(robot_dir) # 创建机器人目录结构 @@ -395,8 +399,8 @@ def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: boo # 清理已存在的目录(如果需要) if robot_dir.exists(): - print(f"Robot directory already exists, cleaning up: {robot_dir}") - shutil.rmtree(robot_dir) + logger.info(f"Robot directory already exists, cleaning up: {robot_dir}") + shutil.rmtree(robot_dir) robot_dir.mkdir(parents=True, exist_ok=True) dataset_dir.mkdir(parents=True, exist_ok=True) @@ -405,19 +409,19 @@ def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: boo # 遍历每个源项目 for source_project_id in dataset_ids: - print(f"\nProcessing source project: {source_project_id}") + logger.info(f"\nProcessing source project: {source_project_id}") source_dataset_dir = Path("projects") / "data" / source_project_id / "dataset" if not source_dataset_dir.exists(): - print(f" Warning: Dataset directory not found for project {source_project_id}") + logger.warning(f" Warning: Dataset directory not found for project {source_project_id}") continue # 获取所有子文件夹 folders = [f for f in source_dataset_dir.iterdir() if f.is_dir()] if not folders: - print(f" Warning: No folders found in dataset directory for project {source_project_id}") + logger.warning(f" Warning: No folders found in dataset directory for project {source_project_id}") continue # 复制每个文件夹 @@ -442,12 +446,12 @@ def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: boo # 统计信息 successful_copies = sum(1 for r in copy_results if r["success"]) - print(f"\nRobot project creation completed:") - print(f" Robot directory: {robot_dir}") - print(f" Total folders processed: {len(copy_results)}") - print(f" Successful copies: {successful_copies}") - print(f" Config saved: {config_file}") - print(f" README generated: {readme_path}") + logger.info(f"\nRobot project creation completed:") + logger.info(f" Robot directory: {robot_dir}") + logger.info(f" Total folders processed: {len(copy_results)}") + logger.info(f" Successful copies: {successful_copies}") + logger.info(f" Config saved: {config_file}") + logger.info(f" README generated: {readme_path}") return str(robot_dir) @@ -513,14 +517,14 @@ def cleanup_robot_project(bot_id: str) -> bool: if robot_dir.exists(): shutil.rmtree(robot_dir) - print(f"Cleaned up robot project: {bot_id}") + logger.info(f"Cleaned up robot project: {bot_id}") return True else: - print(f"Robot project does not exist: {bot_id}") + logger.info(f"Robot project does not exist: {bot_id}") return True except Exception as e: - print(f"Error cleaning up robot project {bot_id}: {str(e)}") + logger.error(f"Error cleaning up robot project {bot_id}: {str(e)}") return False @@ -530,7 +534,7 @@ if __name__ == "__main__": test_bot_id = "test-robot-001" robot_dir = create_robot_project(test_dataset_ids, test_bot_id) - print(f"Created robot project at: {robot_dir}") + logger.info(f"Created robot project at: {robot_dir}") info = get_robot_project_info(test_bot_id) - print(f"Robot project info: {json.dumps(info, indent=2, ensure_ascii=False)}") + logger.info(f"Robot project info: {json.dumps(info, indent=2, ensure_ascii=False)}") diff --git a/utils/organize_dataset_files.py b/utils/organize_dataset_files.py index 542d36b..4cff748 100644 --- a/utils/organize_dataset_files.py +++ b/utils/organize_dataset_files.py @@ -1,8 +1,12 @@ #!/usr/bin/env python3 import os import shutil +import logging from pathlib import Path +# 配置日志 +logger = logging.getLogger('app') + def is_file_already_processed(target_file: Path, pagination_file: Path, embeddings_file: Path) -> bool: """Check if a file has already been processed (document.txt, pagination.txt, and embeddings exist)""" if not target_file.exists(): @@ -22,22 +26,22 @@ def organize_single_project_files(unique_id: str, skip_processed=True): project_dir = Path("projects") / "data" / unique_id if not project_dir.exists(): - print(f"Project directory not found: {project_dir}") + logger.error(f"Project directory not found: {project_dir}") return - print(f"Organizing files for project: {unique_id} (skip_processed={skip_processed})") + logger.info(f"Organizing files for project: {unique_id} (skip_processed={skip_processed})") files_dir = project_dir / "files" dataset_dir = project_dir / "dataset" # Check if files directory exists and has files if not files_dir.exists(): - print(f" No files directory found, skipping...") + logger.info(f" No files directory found, skipping...") return files = list(files_dir.glob("*")) if not files: - print(f" Files directory is empty, skipping...") + logger.info(f" Files directory is empty, skipping...") return # Create dataset directory if it doesn't exist @@ -55,10 +59,10 @@ def organize_single_project_files(unique_id: str, skip_processed=True): # Check if file is already processed if skip_processed and is_file_already_processed(target_file, pagination_file, embeddings_file): - print(f" Skipping already processed file: {file_path.name}") + logger.info(f" Skipping already processed file: {file_path.name}") continue - print(f" Copying {file_path.name} -> {target_file.relative_to(project_dir)}") + logger.info(f" Copying {file_path.name} -> {target_file.relative_to(project_dir)}") # Create target directory target_dir.mkdir(exist_ok=True) @@ -140,12 +144,12 @@ def organize_dataset_files(): # Check if files directory exists and has files if not files_dir.exists(): - print(f" No files directory found, skipping...") + logger.info(f" No files directory found, skipping...") continue files = list(files_dir.glob("*")) if not files: - print(f" Files directory is empty, skipping...") + logger.info(f" Files directory is empty, skipping...") continue # Create dataset directory if it doesn't exist @@ -159,7 +163,7 @@ def organize_dataset_files(): target_dir = dataset_dir / file_name_without_ext target_file = target_dir / "document.txt" - print(f" Copying {file_path.name} -> {target_file.relative_to(project_dir)}") + logger.info(f" Copying {file_path.name} -> {target_file.relative_to(project_dir)}") # Create target directory target_dir.mkdir(exist_ok=True) diff --git a/utils/project_manager.py b/utils/project_manager.py index b9443d8..e9e4f8d 100644 --- a/utils/project_manager.py +++ b/utils/project_manager.py @@ -5,9 +5,13 @@ Project management functions for handling projects, README generation, and statu import os import json +import logging from typing import Dict, List, Optional from pathlib import Path +# Configure logger +logger = logging.getLogger('app') + from utils.file_utils import get_document_preview, load_processed_files_log @@ -145,7 +149,7 @@ This project contains processed documents and their associated embeddings for se if os.path.isdir(item_path): doc_dirs.append(item) except Exception as e: - print(f"Error listing dataset directories: {str(e)}") + logger.error(f"Error listing dataset directories: {str(e)}") if not doc_dirs: readme_content += "No document directories found.\n" @@ -198,10 +202,10 @@ def save_project_readme(unique_id: str): os.makedirs(os.path.dirname(readme_path), exist_ok=True) with open(readme_path, 'w', encoding='utf-8') as f: f.write(readme_content) - print(f"Generated README.md for project {unique_id}") + logger.info(f"Generated README.md for project {unique_id}") return readme_path except Exception as e: - print(f"Error generating README for project {unique_id}: {str(e)}") + logger.error(f"Error generating README for project {unique_id}: {str(e)}") return None @@ -264,13 +268,13 @@ def remove_project(unique_id: str) -> bool: if os.path.exists(project_dir): import shutil shutil.rmtree(project_dir) - print(f"Removed project directory: {project_dir}") + logger.info(f"Removed project directory: {project_dir}") return True else: - print(f"Project directory not found: {project_dir}") + logger.warning(f"Project directory not found: {project_dir}") return False except Exception as e: - print(f"Error removing project {unique_id}: {str(e)}") + logger.error(f"Error removing project {unique_id}: {str(e)}") return False @@ -284,7 +288,7 @@ def list_projects() -> List[str]: return [item for item in os.listdir(projects_dir) if os.path.isdir(os.path.join(projects_dir, item))] except Exception as e: - print(f"Error listing projects: {str(e)}") + logger.error(f"Error listing projects: {str(e)}") return [] diff --git a/utils/single_file_processor.py b/utils/single_file_processor.py index b68caf4..88d92b9 100644 --- a/utils/single_file_processor.py +++ b/utils/single_file_processor.py @@ -6,9 +6,13 @@ Single file processing functions for handling individual files. import os import tempfile import zipfile +import logging from typing import Dict, List, Tuple, Optional from pathlib import Path +# Configure logger +logger = logging.getLogger('app') + from utils.file_utils import download_file # Try to import excel/csv processor, but handle if dependencies are missing @@ -18,7 +22,7 @@ try: ) EXCEL_CSV_SUPPORT = True except ImportError as e: - print(f"Excel/CSV processing not available: {e}") + logger.warning(f"Excel/CSV processing not available: {e}") EXCEL_CSV_SUPPORT = False # Fallback functions @@ -71,7 +75,7 @@ async def process_single_file( # Download file if it's remote and not yet downloaded if original_path.startswith(('http://', 'https://')): if not os.path.exists(local_path): - print(f"Downloading {original_path} -> {local_path}") + logger.info(f"Downloading {original_path} -> {local_path}") success = await download_file(original_path, local_path) if not success: result["error"] = "Failed to download file" @@ -112,11 +116,11 @@ async def process_single_file( except Exception as e: result["error"] = f"Embedding generation failed: {str(e)}" - print(f"Failed to generate embeddings for {filename}: {str(e)}") + logger.error(f"Failed to generate embeddings for {filename}: {str(e)}") except Exception as e: result["error"] = f"File processing failed: {str(e)}" - print(f"Error processing file {filename}: {str(e)}") + logger.error(f"Error processing file {filename}: {str(e)}") return result @@ -167,14 +171,14 @@ async def extract_from_zip(zip_path: str, filename: str) -> Tuple[str, List[str] pagination_lines.extend(file_pagination) except Exception as e: - print(f"Error processing extracted file {file}: {str(e)}") + logger.error(f"Error processing extracted file {file}: {str(e)}") # Clean up temporary directory import shutil shutil.rmtree(temp_dir) except Exception as e: - print(f"Error extracting zip file {filename}: {str(e)}") + logger.error(f"Error extracting zip file {filename}: {str(e)}") return "", [] return '\n\n'.join(content_parts), pagination_lines @@ -192,7 +196,7 @@ async def extract_from_excel(file_path: str, filename: str) -> Tuple[str, List[s return "", [] except Exception as e: - print(f"Error processing Excel file {filename}: {str(e)}") + logger.error(f"Error processing Excel file {filename}: {str(e)}") return "", [] @@ -208,7 +212,7 @@ async def extract_from_csv(file_path: str, filename: str) -> Tuple[str, List[str return "", [] except Exception as e: - print(f"Error processing CSV file {filename}: {str(e)}") + logger.error(f"Error processing CSV file {filename}: {str(e)}") return "", [] @@ -224,7 +228,7 @@ async def extract_from_text(file_path: str, filename: str) -> Tuple[str, List[st return "", [] except Exception as e: - print(f"Error reading text file {filename}: {str(e)}") + logger.error(f"Error reading text file {filename}: {str(e)}") return "", [] @@ -248,7 +252,7 @@ def generate_pagination_from_text(document_path: str, pagination_path: str) -> L return pagination_lines except Exception as e: - print(f"Error generating pagination from text: {str(e)}") + logger.error(f"Error generating pagination from text: {str(e)}") return [] @@ -273,7 +277,7 @@ async def generate_embeddings_for_file(document_path: str, embedding_path: str) return None except Exception as e: - print(f"Error generating embeddings: {str(e)}") + logger.error(f"Error generating embeddings: {str(e)}") return None diff --git a/utils/system_optimizer.py b/utils/system_optimizer.py index ca88433..43a4d4c 100644 --- a/utils/system_optimizer.py +++ b/utils/system_optimizer.py @@ -9,9 +9,13 @@ import multiprocessing import threading import time import resource +import logging from typing import Dict, Any, Optional from concurrent.futures import ThreadPoolExecutor +# 配置日志 +logger = logging.getLogger('app') + class SystemOptimizer: """系统优化器 @@ -37,9 +41,9 @@ class SystemOptimizer: new_limit = min(65536, hard) # 增加到65536或硬件限制 resource.setrlimit(resource.RLIMIT_NOFILE, (new_limit, hard)) self.original_settings['RLIMIT_NOFILE'] = (soft, hard) - print(f"文件描述符限制从 {soft} 增加到 {new_limit}") + logger.info(f"文件描述符限制从 {soft} 增加到 {new_limit}") except (ValueError, OSError) as e: - print(f"无法设置文件描述符限制: {e}") + logger.error(f"无法设置文件描述符限制: {e}") # 2. 优化线程栈大小 try: @@ -47,9 +51,9 @@ class SystemOptimizer: new_stack = min(8 * 1024 * 1024, hard) # 8MB栈大小 resource.setrlimit(resource.RLIMIT_STACK, (new_stack, hard)) self.original_settings['RLIMIT_STACK'] = (soft, hard) - print(f"线程栈大小设置为 {new_stack // (1024*1024)}MB") + logger.info(f"线程栈大小设置为 {new_stack // (1024*1024)}MB") except (ValueError, OSError) as e: - print(f"无法设置线程栈大小: {e}") + logger.error(f"无法设置线程栈大小: {e}") # 3. 环境变量优化 env_vars = { @@ -85,10 +89,10 @@ class SystemOptimizer: for key, value in env_vars.items(): if key not in os.environ: os.environ[key] = value - print(f"设置环境变量: {key}={value}") + logger.info(f"设置环境变量: {key}={value}") self.optimized = True - print("系统优化完成") + logger.info("系统优化完成") def _backup_original_settings(self): """备份原始设置""" @@ -115,20 +119,20 @@ class SystemOptimizer: if not self.original_settings: return - print("恢复原始系统设置...") + logger.info("恢复原始系统设置...") # 恢复资源限制 if 'RLIMIT_NOFILE' in self.original_settings: try: resource.setrlimit(resource.RLIMIT_NOFILE, self.original_settings['RLIMIT_NOFILE']) - print(f"恢复文件描述符限制") + logger.info(f"恢复文件描述符限制") except: pass if 'RLIMIT_STACK' in self.original_settings: try: resource.setrlimit(resource.RLIMIT_STACK, self.original_settings['RLIMIT_STACK']) - print(f"恢复线程栈大小") + logger.info(f"恢复线程栈大小") except: pass @@ -137,13 +141,13 @@ class SystemOptimizer: if key.startswith('TOKENIZERS_') or key in ['PYTHONUNBUFFERED', 'PYTHONDONTWRITEBYTECODE']: if key in os.environ: del os.environ[key] - print(f"移除环境变量: {key}") + logger.info(f"移除环境变量: {key}") elif key in ['OMP_NUM_THREADS'] and value is not None: os.environ[key] = value - print(f"恢复环境变量: {key}={value}") + logger.info(f"恢复环境变量: {key}={value}") self.optimized = False - print("系统设置已恢复") + logger.info("系统设置已恢复") class AsyncioOptimizer: @@ -156,9 +160,9 @@ class AsyncioOptimizer: # 尝试使用uvloop(如果可用) import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - print("使用uvloop事件循环策略") + logger.info("使用uvloop事件循环策略") except ImportError: - print("使用默认事件循环策略") + logger.info("使用默认事件循环策略") # 设置线程池大小 cpu_count = multiprocessing.cpu_count() @@ -166,7 +170,7 @@ class AsyncioOptimizer: # 注意:不能在这里设置默认线程池执行器,因为还没有运行事件循环 # 这个设置会在应用启动时进行 - print(f"建议线程池大小: {thread_pool_size}") + logger.info(f"建议线程池大小: {thread_pool_size}") @staticmethod def optimize_gunicorn_settings() -> Dict[str, Any]: diff --git a/zip_project_handler.py b/zip_project_handler.py index 9ad0b67..aa1e5ae 100644 --- a/zip_project_handler.py +++ b/zip_project_handler.py @@ -9,10 +9,14 @@ import hashlib import zipfile import requests import tempfile +import logging from typing import List, Optional from urllib.parse import urlparse from pathlib import Path +# 配置日志 +logger = logging.getLogger('app') + class ZipProjectHandler: """ZIP项目处理器""" @@ -56,7 +60,7 @@ class ZipProjectHandler: return True except Exception as e: - print(f"下载文件失败: {e}") + logger.error(f"下载文件失败: {e}") return False def _copy_local_file(self, local_path: str, target_path: str) -> bool: @@ -66,7 +70,7 @@ class ZipProjectHandler: shutil.copy2(local_path, target_path) return True except Exception as e: - print(f"复制本地文件失败: {e}") + logger.error(f"复制本地文件失败: {e}") return False def _extract_zip(self, zip_path: str, extract_to: str) -> bool: @@ -76,7 +80,7 @@ class ZipProjectHandler: zip_ref.extractall(extract_to) return True except Exception as e: - print(f"解压ZIP文件失败: {e}") + logger.error(f"解压ZIP文件失败: {e}") return False def get_project_from_zip(self, zip_url: str, unique_id: Optional[str] = None) -> Optional[str]: @@ -91,7 +95,7 @@ class ZipProjectHandler: Optional[str]: 成功时返回项目目录路径,失败时返回None """ if not self._is_valid_url_or_path(zip_url): - print(f"无效的URL或路径: {zip_url}") + logger.error(f"无效的URL或路径: {zip_url}") return None # 使用unique_id作为目录名,如果没有则使用url_hash @@ -104,7 +108,7 @@ class ZipProjectHandler: cached_project_dir = self.projects_dir / project_dir_name if cached_project_dir.exists() and not unique_id: - print(f"使用缓存的项目目录: {cached_project_dir}") + logger.info(f"使用缓存的项目目录: {cached_project_dir}") return str(cached_project_dir) # 下载或复制ZIP文件 @@ -125,24 +129,24 @@ class ZipProjectHandler: is_url = False if is_url: - print(f"下载ZIP文件: {zip_url}") + logger.info(f"下载ZIP文件: {zip_url}") if not self._download_file(zip_url, str(zip_path)): return None else: - print(f"复制本地ZIP文件: {zip_url}") + logger.info(f"复制本地ZIP文件: {zip_url}") # 解析相对路径 local_path = Path(zip_url).resolve() if not self._copy_local_file(str(local_path), str(zip_path)): return None else: - print(f"使用缓存的ZIP文件: {zip_path}") + logger.info(f"使用缓存的ZIP文件: {zip_path}") # 解压到项目目录 - print(f"解压ZIP文件到: {cached_project_dir}") + logger.info(f"解压ZIP文件到: {cached_project_dir}") if not self._extract_zip(str(zip_path), str(cached_project_dir)): return None - print(f"项目准备完成: {cached_project_dir}") + logger.info(f"项目准备完成: {cached_project_dir}") return str(cached_project_dir) def collect_document_files(self, project_dir: str) -> List[str]: @@ -159,7 +163,7 @@ class ZipProjectHandler: project_path = Path(project_dir) if not project_path.exists(): - print(f"项目目录不存在: {project_dir}") + logger.error(f"项目目录不存在: {project_dir}") return document_files # 递归搜索所有 document.txt 文件 @@ -167,11 +171,11 @@ class ZipProjectHandler: if file_path.is_file(): document_files.append(str(file_path)) - print(f"在项目目录 {project_dir} 中找到 {len(document_files)} 个 document.txt 文件") + logger.info(f"在项目目录 {project_dir} 中找到 {len(document_files)} 个 document.txt 文件") for file_path in document_files[:5]: # 只打印前5个文件路径作为示例 - print(f" - {file_path}") + logger.info(f" - {file_path}") if len(document_files) > 5: - print(f" ... 还有 {len(document_files) - 5} 个文件") + logger.info(f" ... 还有 {len(document_files) - 5} 个文件") return document_files @@ -182,9 +186,9 @@ class ZipProjectHandler: if self.cache_dir.exists(): shutil.rmtree(self.cache_dir) self.cache_dir.mkdir(exist_ok=True) - print("缓存清理完成") + logger.info("缓存清理完成") except Exception as e: - print(f"清理缓存失败: {e}") + logger.error(f"清理缓存失败: {e}") # 全局ZIP项目处理器实例