catalog-agent/utils/project_manager.py
2025-10-17 22:04:10 +08:00

248 lines
8.6 KiB
Python

#!/usr/bin/env python3
"""
Project management functions for handling projects, README generation, and status tracking.
"""
import os
import json
from typing import Dict, List, Optional
from pathlib import Path
from utils.file_utils import get_document_preview, load_processed_files_log
def get_content_from_messages(messages: List[dict]) -> str:
"""Extract content from messages list"""
content = ""
for message in messages:
if message.get("role") == "user":
content += message.get("content", "")
return content
def generate_project_readme(unique_id: str) -> str:
"""Generate README.md content for a project"""
project_dir = os.path.join("projects", unique_id)
readme_content = f"""# Project: {unique_id}
## Project Overview
This project contains processed documents and their associated embeddings for semantic search.
## Dataset Structure
"""
dataset_dir = os.path.join(project_dir, "dataset")
if not os.path.exists(dataset_dir):
readme_content += "No dataset files available.\n"
else:
# Get all document directories
doc_dirs = []
try:
for item in sorted(os.listdir(dataset_dir)):
item_path = os.path.join(dataset_dir, item)
if os.path.isdir(item_path):
doc_dirs.append(item)
except Exception as e:
print(f"Error listing dataset directories: {str(e)}")
if not doc_dirs:
readme_content += "No document directories found.\n"
else:
for doc_dir in doc_dirs:
doc_path = os.path.join(dataset_dir, doc_dir)
document_file = os.path.join(doc_path, "document.txt")
pagination_file = os.path.join(doc_path, "pagination.txt")
embeddings_file = os.path.join(doc_path, "document_embeddings.pkl")
readme_content += f"### {doc_dir}\n\n"
readme_content += f"**Files:**\n"
readme_content += f"- `document.txt`"
if os.path.exists(document_file):
readme_content += ""
readme_content += "\n"
readme_content += f"- `pagination.txt`"
if os.path.exists(pagination_file):
readme_content += ""
readme_content += "\n"
readme_content += f"- `document_embeddings.pkl`"
if os.path.exists(embeddings_file):
readme_content += ""
readme_content += "\n\n"
# Add document preview
if os.path.exists(document_file):
readme_content += f"**Content Preview (first 10 lines):**\n\n```\n"
preview = get_document_preview(document_file, 10)
readme_content += preview
readme_content += "\n```\n\n"
else:
readme_content += f"**Content Preview:** Not available\n\n"
readme_content += f"""---
*Generated on {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
"""
return readme_content
def save_project_readme(unique_id: str):
"""Save README.md for a project"""
readme_content = generate_project_readme(unique_id)
readme_path = os.path.join("projects", unique_id, "README.md")
try:
os.makedirs(os.path.dirname(readme_path), exist_ok=True)
with open(readme_path, 'w', encoding='utf-8') as f:
f.write(readme_content)
print(f"Generated README.md for project {unique_id}")
return readme_path
except Exception as e:
print(f"Error generating README for project {unique_id}: {str(e)}")
return None
def get_project_status(unique_id: str) -> Dict:
"""Get comprehensive status of a project"""
project_dir = os.path.join("projects", unique_id)
project_exists = os.path.exists(project_dir)
if not project_exists:
return {
"unique_id": unique_id,
"project_exists": False,
"error": "Project not found"
}
# Get processed log
processed_log = load_processed_files_log(unique_id)
# Collect document.txt files
document_files = []
dataset_dir = os.path.join(project_dir, "dataset")
if os.path.exists(dataset_dir):
for root, dirs, files in os.walk(dataset_dir):
for file in files:
if file == "document.txt":
document_files.append(os.path.join(root, file))
# Check system prompt and MCP settings
system_prompt_file = os.path.join(project_dir, "system_prompt.txt")
mcp_settings_file = os.path.join(project_dir, "mcp_settings.json")
status = {
"unique_id": unique_id,
"project_exists": True,
"project_path": project_dir,
"processed_files_count": len(processed_log),
"processed_files": processed_log,
"document_files_count": len(document_files),
"document_files": document_files,
"has_system_prompt": os.path.exists(system_prompt_file),
"has_mcp_settings": os.path.exists(mcp_settings_file),
"readme_exists": os.path.exists(os.path.join(project_dir, "README.md")),
"log_file_exists": os.path.exists(os.path.join(project_dir, "processed_files.json"))
}
# Add dataset structure
try:
from utils.dataset_manager import generate_dataset_structure
status["dataset_structure"] = generate_dataset_structure(unique_id)
except Exception as e:
status["dataset_structure"] = f"Error generating structure: {str(e)}"
return status
def remove_project(unique_id: str) -> bool:
"""Remove entire project directory"""
project_dir = os.path.join("projects", unique_id)
try:
if os.path.exists(project_dir):
import shutil
shutil.rmtree(project_dir)
print(f"Removed project directory: {project_dir}")
return True
else:
print(f"Project directory not found: {project_dir}")
return False
except Exception as e:
print(f"Error removing project {unique_id}: {str(e)}")
return False
def list_projects() -> List[str]:
"""List all existing project IDs"""
projects_dir = "projects"
if not os.path.exists(projects_dir):
return []
try:
return [item for item in os.listdir(projects_dir)
if os.path.isdir(os.path.join(projects_dir, item))]
except Exception as e:
print(f"Error listing projects: {str(e)}")
return []
def get_project_stats(unique_id: str) -> Dict:
"""Get statistics for a specific project"""
status = get_project_status(unique_id)
if not status["project_exists"]:
return status
stats = {
"unique_id": unique_id,
"total_processed_files": status["processed_files_count"],
"total_document_files": status["document_files_count"],
"has_system_prompt": status["has_system_prompt"],
"has_mcp_settings": status["has_mcp_settings"],
"has_readme": status["readme_exists"]
}
# Calculate file sizes
total_size = 0
document_sizes = []
for doc_file in status["document_files"]:
try:
size = os.path.getsize(doc_file)
document_sizes.append({
"file": doc_file,
"size": size,
"size_mb": round(size / (1024 * 1024), 2)
})
total_size += size
except Exception:
pass
stats["total_document_size"] = total_size
stats["total_document_size_mb"] = round(total_size / (1024 * 1024), 2)
stats["document_files_detail"] = document_sizes
# Check embeddings files
embedding_files = []
dataset_dir = os.path.join("projects", unique_id, "dataset")
if os.path.exists(dataset_dir):
for root, dirs, files in os.walk(dataset_dir):
for file in files:
if file == "document_embeddings.pkl":
file_path = os.path.join(root, file)
try:
size = os.path.getsize(file_path)
embedding_files.append({
"file": file_path,
"size": size,
"size_mb": round(size / (1024 * 1024), 2)
})
except Exception:
pass
stats["embedding_files_count"] = len(embedding_files)
stats["embedding_files_detail"] = embedding_files
return stats