From f13b1aaec914b917ce7a350d153d50c124cefdbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Tue, 28 Oct 2025 14:48:30 +0800 Subject: [PATCH] add upload --- fastapi_app.py | 53 ++++++++++++++++++++++++++---- mcp/mcp_common.py | 2 +- mcp/multi_keyword_search_server.py | 8 ++--- task_queue/integration_tasks.py | 6 ++-- utils/data_merger.py | 16 ++++----- utils/dataset_manager.py | 14 ++++---- utils/file_manager.py | 10 +++--- utils/file_utils.py | 16 ++++----- utils/organize_dataset_files.py | 8 ++--- utils/project_manager.py | 10 +++--- utils/single_file_processor.py | 4 +-- 11 files changed, 93 insertions(+), 54 deletions(-) diff --git a/fastapi_app.py b/fastapi_app.py index f109eb1..116f11e 100644 --- a/fastapi_app.py +++ b/fastapi_app.py @@ -2,11 +2,12 @@ import json import os import tempfile import shutil +import uuid from typing import AsyncGenerator, Dict, List, Optional, Union, Any from datetime import datetime import uvicorn -from fastapi import FastAPI, HTTPException, Depends, Header +from fastapi import FastAPI, HTTPException, Depends, Header, UploadFile, File from fastapi.responses import StreamingResponse, HTMLResponse, FileResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware @@ -440,7 +441,7 @@ async def chat_completions(request: ChatRequest, authorization: Optional[str] = raise HTTPException(status_code=400, detail="unique_id is required") # 使用unique_id获取项目目录 - project_dir = os.path.join("projects", unique_id) + project_dir = os.path.join("projects", "data", unique_id) if not os.path.exists(project_dir): project_dir = "" @@ -531,6 +532,44 @@ async def chat_completions(request: ChatRequest, authorization: Optional[str] = raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") +@app.post("/api/v1/upload") +async def upload_file(file: UploadFile = File(...)): + """ + 文件上传API接口,上传文件到 ./projects/uploads 目录 + + Args: + file: 上传的文件 + + Returns: + dict: 包含文件路径和文件名的响应 + """ + try: + # 确保上传目录存在 + upload_dir = os.path.join("projects", "uploads") + os.makedirs(upload_dir, exist_ok=True) + + # 生成唯一文件名 + file_extension = os.path.splitext(file.filename)[1] if file.filename else "" + unique_filename = f"{uuid.uuid4()}{file_extension}" + file_path = os.path.join(upload_dir, unique_filename) + + # 保存文件 + with open(file_path, "wb") as buffer: + shutil.copyfileobj(file.file, buffer) + + return { + "success": True, + "message": "文件上传成功", + "filename": unique_filename, + "original_filename": file.filename, + "file_path": file_path + } + + except Exception as e: + print(f"Error uploading file: {str(e)}") + raise HTTPException(status_code=500, detail=f"文件上传失败: {str(e)}") + + @app.get("/api/health") async def health_check(): """Health check endpoint""" @@ -617,7 +656,7 @@ async def get_files_processing_status(unique_id: str): processed_log = load_processed_files_log(unique_id) # Get project directory info - project_dir = os.path.join("projects", unique_id) + project_dir = os.path.join("projects", "data", unique_id) project_exists = os.path.exists(project_dir) # Collect document.txt files @@ -635,7 +674,7 @@ async def get_files_processing_status(unique_id: str): "processed_files": processed_log, "document_files_count": len(document_files), "document_files": document_files, - "log_file_exists": os.path.exists(os.path.join("projects", unique_id, "processed_files.json")) + "log_file_exists": os.path.exists(os.path.join("projects", "data", unique_id, "processed_files.json")) } except Exception as e: raise HTTPException(status_code=500, detail=f"获取文件处理状态失败: {str(e)}") @@ -645,8 +684,8 @@ async def get_files_processing_status(unique_id: str): async def reset_files_processing(unique_id: str): """重置项目的文件处理状态,删除处理日志和所有文件""" try: - project_dir = os.path.join("projects", unique_id) - log_file = os.path.join("projects", unique_id, "processed_files.json") + project_dir = os.path.join("projects", "data", unique_id) + log_file = os.path.join("projects", "data", unique_id, "processed_files.json") # Load processed log to know what files to remove processed_log = load_processed_files_log(unique_id) @@ -668,7 +707,7 @@ async def reset_files_processing(unique_id: str): elif 'filename' in file_info: # Fallback to old filename-based structure filename_without_ext = os.path.splitext(file_info['filename'])[0] - dataset_dir = os.path.join("projects", unique_id, "dataset", filename_without_ext) + dataset_dir = os.path.join("projects", "data", unique_id, "dataset", filename_without_ext) if remove_file_or_directory(dataset_dir): removed_files.append(dataset_dir) diff --git a/mcp/mcp_common.py b/mcp/mcp_common.py index 1eabc2d..f016fe6 100644 --- a/mcp/mcp_common.py +++ b/mcp/mcp_common.py @@ -20,7 +20,7 @@ def get_allowed_directory(): return os.path.abspath(dataset_dir) # 从环境变量读取项目数据目录 - project_dir = os.getenv("PROJECT_DATA_DIR", "./projects") + project_dir = os.getenv("PROJECT_DATA_DIR", "./projects/data") return os.path.abspath(project_dir) diff --git a/mcp/multi_keyword_search_server.py b/mcp/multi_keyword_search_server.py index 61fd687..b34089e 100644 --- a/mcp/multi_keyword_search_server.py +++ b/mcp/multi_keyword_search_server.py @@ -143,7 +143,7 @@ def search_count(patterns: List[Dict[str, Any]], file_paths: List[str], "content": [ { "type": "text", - "text": f"Error: Specified files not found in project directory {project_data_dir}" + "text": f"Error: Specified files not found in project directory {get_allowed_directory()}" } ] } @@ -328,7 +328,7 @@ def search(patterns: List[Dict[str, Any]], file_paths: List[str], "content": [ { "type": "text", - "text": f"Error: Specified files not found in project directory {project_data_dir}" + "text": f"Error: Specified files not found in project directory {get_allowed_directory()}" } ] } @@ -565,7 +565,7 @@ def regex_grep(patterns: Union[str, List[str]], file_paths: List[str], context_l "content": [ { "type": "text", - "text": f"Error: Specified files not found in project directory {project_data_dir}" + "text": f"Error: Specified files not found in project directory {get_allowed_directory()}" } ] } @@ -713,7 +713,7 @@ def regex_grep_count(patterns: Union[str, List[str]], file_paths: List[str], "content": [ { "type": "text", - "text": f"Error: Specified files not found in project directory {project_data_dir}" + "text": f"Error: Specified files not found in project directory {get_allowed_directory()}" } ] } diff --git a/task_queue/integration_tasks.py b/task_queue/integration_tasks.py index 8032882..3104701 100644 --- a/task_queue/integration_tasks.py +++ b/task_queue/integration_tasks.py @@ -47,7 +47,7 @@ def process_files_async( ) # 确保项目目录存在 - project_dir = os.path.join("projects", unique_id) + project_dir = os.path.join("projects", "data", unique_id) if not os.path.exists(project_dir): os.makedirs(project_dir, exist_ok=True) @@ -102,7 +102,7 @@ def process_files_async( result_files = [] for key in processed_files_by_key.keys(): # 添加对应的dataset document.txt路径 - document_path = os.path.join("projects", unique_id, "dataset", key, "document.txt") + document_path = os.path.join("projects", "data", unique_id, "dataset", key, "document.txt") if os.path.exists(document_path): result_files.append(document_path) @@ -172,7 +172,7 @@ def cleanup_project_async( try: print(f"开始异步清理项目,项目ID: {unique_id}") - project_dir = os.path.join("projects", unique_id) + project_dir = os.path.join("projects", "data", unique_id) removed_items = [] if remove_all and os.path.exists(project_dir): diff --git a/utils/data_merger.py b/utils/data_merger.py index c304405..1a1409f 100644 --- a/utils/data_merger.py +++ b/utils/data_merger.py @@ -20,8 +20,8 @@ except ImportError: def merge_documents_by_group(unique_id: str, group_name: str) -> Dict: """Merge all document.txt files in a group into a single document.""" - processed_group_dir = os.path.join("projects", unique_id, "processed", group_name) - dataset_group_dir = os.path.join("projects", unique_id, "dataset", group_name) + processed_group_dir = os.path.join("projects", "data", unique_id, "processed", group_name) + dataset_group_dir = os.path.join("projects", "data", unique_id, "dataset", group_name) os.makedirs(dataset_group_dir, exist_ok=True) merged_document_path = os.path.join(dataset_group_dir, "document.txt") @@ -91,8 +91,8 @@ def merge_documents_by_group(unique_id: str, group_name: str) -> Dict: def merge_paginations_by_group(unique_id: str, group_name: str) -> Dict: """Merge all pagination.txt files in a group.""" - processed_group_dir = os.path.join("projects", unique_id, "processed", group_name) - dataset_group_dir = os.path.join("projects", unique_id, "dataset", group_name) + processed_group_dir = os.path.join("projects", "data", unique_id, "processed", group_name) + dataset_group_dir = os.path.join("projects", "data", unique_id, "dataset", group_name) os.makedirs(dataset_group_dir, exist_ok=True) merged_pagination_path = os.path.join(dataset_group_dir, "pagination.txt") @@ -161,8 +161,8 @@ def merge_paginations_by_group(unique_id: str, group_name: str) -> Dict: def merge_embeddings_by_group(unique_id: str, group_name: str) -> Dict: """Merge all embedding.pkl files in a group.""" - processed_group_dir = os.path.join("projects", unique_id, "processed", group_name) - dataset_group_dir = os.path.join("projects", unique_id, "dataset", group_name) + processed_group_dir = os.path.join("projects", "data", unique_id, "processed", group_name) + dataset_group_dir = os.path.join("projects", "data", unique_id, "dataset", group_name) os.makedirs(dataset_group_dir, exist_ok=True) merged_embedding_path = os.path.join(dataset_group_dir, "embedding.pkl") @@ -296,7 +296,7 @@ def merge_all_data_by_group(unique_id: str, group_name: str) -> Dict: def get_group_merge_status(unique_id: str, group_name: str) -> Dict: """Get the status of merged data for a group.""" - dataset_group_dir = os.path.join("projects", unique_id, "dataset", group_name) + dataset_group_dir = os.path.join("projects", "data", unique_id, "dataset", group_name) status = { "group_name": group_name, @@ -340,7 +340,7 @@ def get_group_merge_status(unique_id: str, group_name: str) -> Dict: def cleanup_dataset_group(unique_id: str, group_name: str) -> bool: """Clean up merged dataset files for a group.""" - dataset_group_dir = os.path.join("projects", unique_id, "dataset", group_name) + dataset_group_dir = os.path.join("projects", "data", unique_id, "dataset", group_name) try: if os.path.exists(dataset_group_dir): diff --git a/utils/dataset_manager.py b/utils/dataset_manager.py index cbef1a5..253ef42 100644 --- a/utils/dataset_manager.py +++ b/utils/dataset_manager.py @@ -63,7 +63,7 @@ async def download_dataset_files(unique_id: str, files: Dict[str, List[str]]) -> filename = os.path.basename(file_path) # Get local file path - local_path = os.path.join("projects", unique_id, "files", group_name, filename) + local_path = os.path.join("projects", "data", unique_id, "files", group_name, filename) # Skip if file doesn't exist (might be remote file that failed to download) if not os.path.exists(local_path) and not file_path.startswith(('http://', 'https://')): @@ -144,7 +144,7 @@ async def save_processing_log( } } - log_file_path = os.path.join("projects", unique_id, "processing_log.json") + log_file_path = os.path.join("projects", "data", unique_id, "processing_log.json") try: with open(log_file_path, 'w', encoding='utf-8') as f: json.dump(log_data, f, ensure_ascii=False, indent=2) @@ -155,7 +155,7 @@ async def save_processing_log( def generate_dataset_structure(unique_id: str) -> str: """Generate a string representation of the dataset structure""" - project_dir = os.path.join("projects", unique_id) + project_dir = os.path.join("projects", "data", unique_id) structure = [] def add_directory_contents(dir_path: str, prefix: str = ""): @@ -198,7 +198,7 @@ def generate_dataset_structure(unique_id: str) -> str: def get_processing_status(unique_id: str) -> Dict: """Get comprehensive processing status for a project.""" - project_dir = os.path.join("projects", unique_id) + project_dir = os.path.join("projects", "data", unique_id) if not os.path.exists(project_dir): return { @@ -261,7 +261,7 @@ def get_processing_status(unique_id: str) -> Dict: def remove_dataset_directory(unique_id: str, filename_without_ext: str): """Remove a specific dataset directory (deprecated - use new structure)""" # This function is kept for compatibility but delegates to new structure - dataset_path = os.path.join("projects", unique_id, "processed", filename_without_ext) + dataset_path = os.path.join("projects", "data", unique_id, "processed", filename_without_ext) if os.path.exists(dataset_path): import shutil shutil.rmtree(dataset_path) @@ -270,13 +270,13 @@ def remove_dataset_directory(unique_id: str, filename_without_ext: str): def remove_dataset_directory_by_key(unique_id: str, key: str): """Remove dataset directory by key (group name)""" # Remove files directory - files_group_path = os.path.join("projects", unique_id, "files", key) + files_group_path = os.path.join("projects", "data", unique_id, "files", key) if os.path.exists(files_group_path): import shutil shutil.rmtree(files_group_path) # Remove processed directory - processed_group_path = os.path.join("projects", unique_id, "processed", key) + processed_group_path = os.path.join("projects", "data", unique_id, "processed", key) if os.path.exists(processed_group_path): import shutil shutil.rmtree(processed_group_path) diff --git a/utils/file_manager.py b/utils/file_manager.py index a38d813..ca4274d 100644 --- a/utils/file_manager.py +++ b/utils/file_manager.py @@ -13,7 +13,7 @@ from pathlib import Path def get_existing_files(unique_id: str) -> Dict[str, Set[str]]: """Get existing files organized by group.""" existing_files = {} - files_dir = os.path.join("projects", unique_id, "files") + files_dir = os.path.join("projects", "data", unique_id, "files") if not os.path.exists(files_dir): return existing_files @@ -83,7 +83,7 @@ def sync_files_to_group(unique_id: str, files: Dict[str, List[str]]) -> Tuple[Di Returns: Tuple of (synced_files, failed_files) """ - project_dir = os.path.join("projects", unique_id) + project_dir = os.path.join("projects", "data", unique_id) files_dir = os.path.join(project_dir, "files") # Create files directory @@ -164,7 +164,7 @@ def sync_files_to_group(unique_id: str, files: Dict[str, List[str]]) -> Tuple[Di def cleanup_orphaned_files(unique_id: str, changes: Dict) -> Dict[str, List[str]]: """Remove files and their processing results that are no longer needed.""" removed_files = {} - project_dir = os.path.join("projects", unique_id) + project_dir = os.path.join("projects", "data", unique_id) # Handle individual file removals for group_name, removed_filenames in changes["removed"].items(): @@ -225,7 +225,7 @@ def cleanup_orphaned_files(unique_id: str, changes: Dict) -> Dict[str, List[str] def get_group_files_list(unique_id: str, group_name: str) -> List[str]: """Get list of files in a specific group.""" - group_dir = os.path.join("projects", unique_id, "files", group_name) + group_dir = os.path.join("projects", "data", unique_id, "files", group_name) if not os.path.exists(group_dir): return [] @@ -241,7 +241,7 @@ def get_group_files_list(unique_id: str, group_name: str) -> List[str]: def ensure_directories(unique_id: str): """Ensure all necessary directories exist for a project.""" - base_dir = os.path.join("projects", unique_id) + base_dir = os.path.join("projects", "data", unique_id) directories = [ "files", "processed", diff --git a/utils/file_utils.py b/utils/file_utils.py index 3f6afe2..404c20f 100644 --- a/utils/file_utils.py +++ b/utils/file_utils.py @@ -103,7 +103,7 @@ def is_file_already_processed(target_file: Path, pagination_file: Path, embeddin def load_processed_files_log(unique_id: str) -> Dict[str, Dict]: """Load processed files log for a project""" - log_file = os.path.join("projects", unique_id, "processed_files.json") + log_file = os.path.join("projects", "data", unique_id, "processed_files.json") if os.path.exists(log_file): try: import json @@ -116,7 +116,7 @@ def load_processed_files_log(unique_id: str) -> Dict[str, Dict]: def save_processed_files_log(unique_id: str, processed_log: Dict[str, Dict]): """Save processed files log for a project (legacy function)""" - log_file = os.path.join("projects", unique_id, "processed_files.json") + log_file = os.path.join("projects", "data", unique_id, "processed_files.json") try: os.makedirs(os.path.dirname(log_file), exist_ok=True) import json @@ -128,7 +128,7 @@ def save_processed_files_log(unique_id: str, processed_log: Dict[str, Dict]): def get_processing_log(unique_id: str) -> Dict: """Get the comprehensive processing log for a project""" - log_file = os.path.join("projects", unique_id, "processing_log.json") + log_file = os.path.join("projects", "data", unique_id, "processing_log.json") if os.path.exists(log_file): try: import json @@ -141,7 +141,7 @@ def get_processing_log(unique_id: str) -> Dict: def save_project_status(unique_id: str, status: Dict): """Save project processing status""" - status_file = os.path.join("projects", unique_id, "status.json") + status_file = os.path.join("projects", "data", unique_id, "status.json") try: os.makedirs(os.path.dirname(status_file), exist_ok=True) import json @@ -153,7 +153,7 @@ def save_project_status(unique_id: str, status: Dict): def load_project_status(unique_id: str) -> Dict: """Load project processing status""" - status_file = os.path.join("projects", unique_id, "status.json") + status_file = os.path.join("projects", "data", unique_id, "status.json") if os.path.exists(status_file): try: import json @@ -185,7 +185,7 @@ def get_file_metadata(file_path: str) -> Dict: def update_file_processing_status(unique_id: str, group_name: str, filename: str, status: Dict): """Update processing status for a specific file""" - status_file = os.path.join("projects", unique_id, "file_status.json") + status_file = os.path.join("projects", "data", unique_id, "file_status.json") try: # Load existing status @@ -217,7 +217,7 @@ def update_file_processing_status(unique_id: str, group_name: str, filename: str def get_file_processing_status(unique_id: str, group_name: str = None, filename: str = None) -> Dict: """Get processing status for files""" - status_file = os.path.join("projects", unique_id, "file_status.json") + status_file = os.path.join("projects", "data", unique_id, "file_status.json") if not os.path.exists(status_file): return {} @@ -261,7 +261,7 @@ def calculate_directory_size(directory_path: str) -> int: def get_project_statistics(unique_id: str) -> Dict: """Get comprehensive statistics for a project""" - project_dir = os.path.join("projects", unique_id) + project_dir = os.path.join("projects", "data", unique_id) if not os.path.exists(project_dir): return {"project_exists": False} diff --git a/utils/organize_dataset_files.py b/utils/organize_dataset_files.py index 946e269..542d36b 100644 --- a/utils/organize_dataset_files.py +++ b/utils/organize_dataset_files.py @@ -17,9 +17,9 @@ def is_file_already_processed(target_file: Path, pagination_file: Path, embeddin return False def organize_single_project_files(unique_id: str, skip_processed=True): - """Organize files for a single project from projects/{unique_id}/files to projects/{unique_id}/dataset/{file_name}/document.txt""" + """Organize files for a single project from projects/data/{unique_id}/files to projects/data/{unique_id}/dataset/{file_name}/document.txt""" - project_dir = Path("projects") / unique_id + project_dir = Path("projects") / "data" / unique_id if not project_dir.exists(): print(f"Project directory not found: {project_dir}") @@ -120,9 +120,9 @@ def organize_single_project_files(unique_id: str, skip_processed=True): def organize_dataset_files(): - """Move files from projects/{unique_id}/files to projects/{unique_id}/dataset/{file_name}/document.txt""" + """Move files from projects/data/{unique_id}/files to projects/data/{unique_id}/dataset/{file_name}/document.txt""" - projects_dir = Path("projects") + projects_dir = Path("projects") / "data" if not projects_dir.exists(): print("Projects directory not found") diff --git a/utils/project_manager.py b/utils/project_manager.py index 8aff292..b9443d8 100644 --- a/utils/project_manager.py +++ b/utils/project_manager.py @@ -113,7 +113,7 @@ def generate_directory_tree(project_dir: str, unique_id: str, max_depth: int = 3 def generate_project_readme(unique_id: str) -> str: """Generate README.md content for a project""" - project_dir = os.path.join("projects", unique_id) + project_dir = os.path.join("projects", "data", unique_id) readme_content = f"""# Project: {unique_id} ## Project Overview @@ -192,7 +192,7 @@ This project contains processed documents and their associated embeddings for se def save_project_readme(unique_id: str): """Save README.md for a project""" readme_content = generate_project_readme(unique_id) - readme_path = os.path.join("projects", unique_id, "README.md") + readme_path = os.path.join("projects", "data", unique_id, "README.md") try: os.makedirs(os.path.dirname(readme_path), exist_ok=True) @@ -207,7 +207,7 @@ def save_project_readme(unique_id: str): def get_project_status(unique_id: str) -> Dict: """Get comprehensive status of a project""" - project_dir = os.path.join("projects", unique_id) + project_dir = os.path.join("projects", "data", unique_id) project_exists = os.path.exists(project_dir) if not project_exists: @@ -259,7 +259,7 @@ def get_project_status(unique_id: str) -> Dict: def remove_project(unique_id: str) -> bool: """Remove entire project directory""" - project_dir = os.path.join("projects", unique_id) + project_dir = os.path.join("projects", "data", unique_id) try: if os.path.exists(project_dir): import shutil @@ -326,7 +326,7 @@ def get_project_stats(unique_id: str) -> Dict: # Check embeddings files embedding_files = [] - dataset_dir = os.path.join("projects", unique_id, "dataset") + dataset_dir = os.path.join("projects", "data", unique_id, "dataset") if os.path.exists(dataset_dir): for root, dirs, files in os.walk(dataset_dir): for file in files: diff --git a/utils/single_file_processor.py b/utils/single_file_processor.py index cd1d484..b68caf4 100644 --- a/utils/single_file_processor.py +++ b/utils/single_file_processor.py @@ -50,7 +50,7 @@ async def process_single_file( """ # Create output directory for this file filename_stem = Path(filename).stem - output_dir = os.path.join("projects", unique_id, "processed", group_name, filename_stem) + output_dir = os.path.join("projects", "data", unique_id, "processed", group_name, filename_stem) os.makedirs(output_dir, exist_ok=True) result = { @@ -280,7 +280,7 @@ async def generate_embeddings_for_file(document_path: str, embedding_path: str) def check_file_already_processed(unique_id: str, group_name: str, filename: str) -> bool: """Check if a file has already been processed.""" filename_stem = Path(filename).stem - output_dir = os.path.join("projects", unique_id, "processed", group_name, filename_stem) + output_dir = os.path.join("projects", "data", unique_id, "processed", group_name, filename_stem) document_path = os.path.join(output_dir, "document.txt") pagination_path = os.path.join(output_dir, "pagination.txt")