248 lines
8.6 KiB
Python
248 lines
8.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Project management functions for handling projects, README generation, and status tracking.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
from typing import Dict, List, Optional
|
|
from pathlib import Path
|
|
|
|
from utils.file_utils import get_document_preview, load_processed_files_log
|
|
|
|
|
|
def get_content_from_messages(messages: List[dict]) -> str:
|
|
"""Extract content from messages list"""
|
|
content = ""
|
|
for message in messages:
|
|
if message.get("role") == "user":
|
|
content += message.get("content", "")
|
|
return content
|
|
|
|
|
|
def generate_project_readme(unique_id: str) -> str:
|
|
"""Generate README.md content for a project"""
|
|
project_dir = os.path.join("projects", unique_id)
|
|
readme_content = f"""# Project: {unique_id}
|
|
|
|
## Project Overview
|
|
|
|
This project contains processed documents and their associated embeddings for semantic search.
|
|
|
|
## Dataset Structure
|
|
|
|
"""
|
|
|
|
dataset_dir = os.path.join(project_dir, "dataset")
|
|
if not os.path.exists(dataset_dir):
|
|
readme_content += "No dataset files available.\n"
|
|
else:
|
|
# Get all document directories
|
|
doc_dirs = []
|
|
try:
|
|
for item in sorted(os.listdir(dataset_dir)):
|
|
item_path = os.path.join(dataset_dir, item)
|
|
if os.path.isdir(item_path):
|
|
doc_dirs.append(item)
|
|
except Exception as e:
|
|
print(f"Error listing dataset directories: {str(e)}")
|
|
|
|
if not doc_dirs:
|
|
readme_content += "No document directories found.\n"
|
|
else:
|
|
for doc_dir in doc_dirs:
|
|
doc_path = os.path.join(dataset_dir, doc_dir)
|
|
document_file = os.path.join(doc_path, "document.txt")
|
|
pagination_file = os.path.join(doc_path, "pagination.txt")
|
|
embeddings_file = os.path.join(doc_path, "document_embeddings.pkl")
|
|
|
|
readme_content += f"### {doc_dir}\n\n"
|
|
readme_content += f"**Files:**\n"
|
|
readme_content += f"- `document.txt`"
|
|
if os.path.exists(document_file):
|
|
readme_content += " ✓"
|
|
readme_content += "\n"
|
|
|
|
readme_content += f"- `pagination.txt`"
|
|
if os.path.exists(pagination_file):
|
|
readme_content += " ✓"
|
|
readme_content += "\n"
|
|
|
|
readme_content += f"- `document_embeddings.pkl`"
|
|
if os.path.exists(embeddings_file):
|
|
readme_content += " ✓"
|
|
readme_content += "\n\n"
|
|
|
|
# Add document preview
|
|
if os.path.exists(document_file):
|
|
readme_content += f"**Content Preview (first 10 lines):**\n\n```\n"
|
|
preview = get_document_preview(document_file, 10)
|
|
readme_content += preview
|
|
readme_content += "\n```\n\n"
|
|
else:
|
|
readme_content += f"**Content Preview:** Not available\n\n"
|
|
|
|
readme_content += f"""---
|
|
*Generated on {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
|
|
"""
|
|
|
|
return readme_content
|
|
|
|
|
|
def save_project_readme(unique_id: str):
|
|
"""Save README.md for a project"""
|
|
readme_content = generate_project_readme(unique_id)
|
|
readme_path = os.path.join("projects", unique_id, "README.md")
|
|
|
|
try:
|
|
os.makedirs(os.path.dirname(readme_path), exist_ok=True)
|
|
with open(readme_path, 'w', encoding='utf-8') as f:
|
|
f.write(readme_content)
|
|
print(f"Generated README.md for project {unique_id}")
|
|
return readme_path
|
|
except Exception as e:
|
|
print(f"Error generating README for project {unique_id}: {str(e)}")
|
|
return None
|
|
|
|
|
|
def get_project_status(unique_id: str) -> Dict:
|
|
"""Get comprehensive status of a project"""
|
|
project_dir = os.path.join("projects", unique_id)
|
|
project_exists = os.path.exists(project_dir)
|
|
|
|
if not project_exists:
|
|
return {
|
|
"unique_id": unique_id,
|
|
"project_exists": False,
|
|
"error": "Project not found"
|
|
}
|
|
|
|
# Get processed log
|
|
processed_log = load_processed_files_log(unique_id)
|
|
|
|
# Collect document.txt files
|
|
document_files = []
|
|
dataset_dir = os.path.join(project_dir, "dataset")
|
|
if os.path.exists(dataset_dir):
|
|
for root, dirs, files in os.walk(dataset_dir):
|
|
for file in files:
|
|
if file == "document.txt":
|
|
document_files.append(os.path.join(root, file))
|
|
|
|
# Check system prompt and MCP settings
|
|
system_prompt_file = os.path.join(project_dir, "system_prompt.txt")
|
|
mcp_settings_file = os.path.join(project_dir, "mcp_settings.json")
|
|
|
|
status = {
|
|
"unique_id": unique_id,
|
|
"project_exists": True,
|
|
"project_path": project_dir,
|
|
"processed_files_count": len(processed_log),
|
|
"processed_files": processed_log,
|
|
"document_files_count": len(document_files),
|
|
"document_files": document_files,
|
|
"has_system_prompt": os.path.exists(system_prompt_file),
|
|
"has_mcp_settings": os.path.exists(mcp_settings_file),
|
|
"readme_exists": os.path.exists(os.path.join(project_dir, "README.md")),
|
|
"log_file_exists": os.path.exists(os.path.join(project_dir, "processed_files.json"))
|
|
}
|
|
|
|
# Add dataset structure
|
|
try:
|
|
from utils.dataset_manager import generate_dataset_structure
|
|
status["dataset_structure"] = generate_dataset_structure(unique_id)
|
|
except Exception as e:
|
|
status["dataset_structure"] = f"Error generating structure: {str(e)}"
|
|
|
|
return status
|
|
|
|
|
|
def remove_project(unique_id: str) -> bool:
|
|
"""Remove entire project directory"""
|
|
project_dir = os.path.join("projects", unique_id)
|
|
try:
|
|
if os.path.exists(project_dir):
|
|
import shutil
|
|
shutil.rmtree(project_dir)
|
|
print(f"Removed project directory: {project_dir}")
|
|
return True
|
|
else:
|
|
print(f"Project directory not found: {project_dir}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error removing project {unique_id}: {str(e)}")
|
|
return False
|
|
|
|
|
|
def list_projects() -> List[str]:
|
|
"""List all existing project IDs"""
|
|
projects_dir = "projects"
|
|
if not os.path.exists(projects_dir):
|
|
return []
|
|
|
|
try:
|
|
return [item for item in os.listdir(projects_dir)
|
|
if os.path.isdir(os.path.join(projects_dir, item))]
|
|
except Exception as e:
|
|
print(f"Error listing projects: {str(e)}")
|
|
return []
|
|
|
|
|
|
def get_project_stats(unique_id: str) -> Dict:
|
|
"""Get statistics for a specific project"""
|
|
status = get_project_status(unique_id)
|
|
|
|
if not status["project_exists"]:
|
|
return status
|
|
|
|
stats = {
|
|
"unique_id": unique_id,
|
|
"total_processed_files": status["processed_files_count"],
|
|
"total_document_files": status["document_files_count"],
|
|
"has_system_prompt": status["has_system_prompt"],
|
|
"has_mcp_settings": status["has_mcp_settings"],
|
|
"has_readme": status["readme_exists"]
|
|
}
|
|
|
|
# Calculate file sizes
|
|
total_size = 0
|
|
document_sizes = []
|
|
|
|
for doc_file in status["document_files"]:
|
|
try:
|
|
size = os.path.getsize(doc_file)
|
|
document_sizes.append({
|
|
"file": doc_file,
|
|
"size": size,
|
|
"size_mb": round(size / (1024 * 1024), 2)
|
|
})
|
|
total_size += size
|
|
except Exception:
|
|
pass
|
|
|
|
stats["total_document_size"] = total_size
|
|
stats["total_document_size_mb"] = round(total_size / (1024 * 1024), 2)
|
|
stats["document_files_detail"] = document_sizes
|
|
|
|
# Check embeddings files
|
|
embedding_files = []
|
|
dataset_dir = os.path.join("projects", unique_id, "dataset")
|
|
if os.path.exists(dataset_dir):
|
|
for root, dirs, files in os.walk(dataset_dir):
|
|
for file in files:
|
|
if file == "document_embeddings.pkl":
|
|
file_path = os.path.join(root, file)
|
|
try:
|
|
size = os.path.getsize(file_path)
|
|
embedding_files.append({
|
|
"file": file_path,
|
|
"size": size,
|
|
"size_mb": round(size / (1024 * 1024), 2)
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
stats["embedding_files_count"] = len(embedding_files)
|
|
stats["embedding_files_detail"] = embedding_files
|
|
|
|
return stats |