349 lines
12 KiB
Python
349 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Project management functions for handling projects, README generation, and status tracking.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
from typing import Dict, List, Optional
|
|
from pathlib import Path
|
|
|
|
from utils.file_utils import get_document_preview, load_processed_files_log
|
|
|
|
|
|
def get_content_from_messages(messages: List[dict]) -> str:
|
|
"""Extract content from messages list"""
|
|
content = ""
|
|
for message in messages:
|
|
if message.get("role") == "user":
|
|
content += message.get("content", "")
|
|
return content
|
|
|
|
|
|
def generate_directory_tree(project_dir: str, unique_id: str, max_depth: int = 3) -> str:
|
|
"""Generate dataset directory tree structure for the project"""
|
|
def _build_tree(path: str, prefix: str = "", is_last: bool = True, depth: int = 0) -> List[str]:
|
|
if depth > max_depth:
|
|
return []
|
|
|
|
lines = []
|
|
try:
|
|
entries = sorted(os.listdir(path))
|
|
# Separate directories and files
|
|
dirs = [e for e in entries if os.path.isdir(os.path.join(path, e)) and not e.startswith('.')]
|
|
files = [e for e in entries if os.path.isfile(os.path.join(path, e)) and not e.startswith('.')]
|
|
|
|
entries = dirs + files
|
|
|
|
for i, entry in enumerate(entries):
|
|
entry_path = os.path.join(path, entry)
|
|
is_dir = os.path.isdir(entry_path)
|
|
is_last_entry = i == len(entries) - 1
|
|
|
|
# Choose the appropriate tree symbols
|
|
if is_last_entry:
|
|
connector = "└── "
|
|
new_prefix = prefix + " "
|
|
else:
|
|
connector = "├── "
|
|
new_prefix = prefix + "│ "
|
|
|
|
# Add entry line
|
|
line = prefix + connector + entry
|
|
if is_dir:
|
|
line += "/"
|
|
lines.append(line)
|
|
|
|
# Recursively add subdirectories
|
|
if is_dir and depth < max_depth:
|
|
sub_lines = _build_tree(entry_path, new_prefix, is_last_entry, depth + 1)
|
|
lines.extend(sub_lines)
|
|
|
|
except PermissionError:
|
|
lines.append(prefix + "└── [Permission Denied]")
|
|
except Exception as e:
|
|
lines.append(prefix + "└── [Error: " + str(e) + "]")
|
|
|
|
return lines
|
|
|
|
# Start building tree from dataset directory
|
|
dataset_dir = os.path.join(project_dir, "dataset")
|
|
tree_lines = []
|
|
|
|
if not os.path.exists(dataset_dir):
|
|
return "└── [No dataset directory found]"
|
|
|
|
try:
|
|
entries = sorted(os.listdir(dataset_dir))
|
|
dirs = [e for e in entries if os.path.isdir(os.path.join(dataset_dir, e)) and not e.startswith('.')]
|
|
files = [e for e in entries if os.path.isfile(os.path.join(dataset_dir, e)) and not e.startswith('.')]
|
|
|
|
entries = dirs + files
|
|
|
|
if not entries:
|
|
tree_lines.append("└── [Empty dataset directory]")
|
|
else:
|
|
for i, entry in enumerate(entries):
|
|
entry_path = os.path.join(dataset_dir, entry)
|
|
is_dir = os.path.isdir(entry_path)
|
|
is_last_entry = i == len(entries) - 1
|
|
|
|
if is_last_entry:
|
|
connector = "└── "
|
|
prefix = " "
|
|
else:
|
|
connector = "├── "
|
|
prefix = "│ "
|
|
|
|
line = connector + entry
|
|
if is_dir:
|
|
line += "/"
|
|
tree_lines.append(line)
|
|
|
|
# Recursively add subdirectories
|
|
if is_dir:
|
|
sub_lines = _build_tree(entry_path, prefix, is_last_entry, 1)
|
|
tree_lines.extend(sub_lines)
|
|
|
|
except Exception as e:
|
|
tree_lines.append(f"└── [Error generating tree: {str(e)}]")
|
|
|
|
return "\n".join(tree_lines)
|
|
|
|
|
|
def generate_project_readme(unique_id: str) -> str:
|
|
"""Generate README.md content for a project"""
|
|
project_dir = os.path.join("projects", unique_id)
|
|
readme_content = f"""# Project: {unique_id}
|
|
|
|
## Project Overview
|
|
|
|
This project contains processed documents and their associated embeddings for semantic search.
|
|
|
|
## Directory Structure
|
|
|
|
"""
|
|
|
|
# Generate directory tree
|
|
readme_content += "```\n"
|
|
readme_content += generate_directory_tree(project_dir, unique_id)
|
|
readme_content += "\n```\n\n"
|
|
|
|
readme_content += """## Dataset Structure
|
|
|
|
"""
|
|
|
|
dataset_dir = os.path.join(project_dir, "dataset")
|
|
if not os.path.exists(dataset_dir):
|
|
readme_content += "No dataset files available.\n"
|
|
else:
|
|
# Get all document directories
|
|
doc_dirs = []
|
|
try:
|
|
for item in sorted(os.listdir(dataset_dir)):
|
|
item_path = os.path.join(dataset_dir, item)
|
|
if os.path.isdir(item_path):
|
|
doc_dirs.append(item)
|
|
except Exception as e:
|
|
print(f"Error listing dataset directories: {str(e)}")
|
|
|
|
if not doc_dirs:
|
|
readme_content += "No document directories found.\n"
|
|
else:
|
|
for doc_dir in doc_dirs:
|
|
doc_path = os.path.join(dataset_dir, doc_dir)
|
|
document_file = os.path.join(doc_path, "document.txt")
|
|
pagination_file = os.path.join(doc_path, "pagination.txt")
|
|
embeddings_file = os.path.join(doc_path, "document_embeddings.pkl")
|
|
|
|
readme_content += f"### {doc_dir}\n\n"
|
|
readme_content += f"**Files:**\n"
|
|
readme_content += f"- `document.txt`"
|
|
if os.path.exists(document_file):
|
|
readme_content += " ✓"
|
|
readme_content += "\n"
|
|
|
|
readme_content += f"- `pagination.txt`"
|
|
if os.path.exists(pagination_file):
|
|
readme_content += " ✓"
|
|
readme_content += "\n"
|
|
|
|
readme_content += f"- `document_embeddings.pkl`"
|
|
if os.path.exists(embeddings_file):
|
|
readme_content += " ✓"
|
|
readme_content += "\n\n"
|
|
|
|
# Add document preview
|
|
if os.path.exists(document_file):
|
|
readme_content += f"**Content Preview (first 10 lines):**\n\n```\n"
|
|
preview = get_document_preview(document_file, 10)
|
|
readme_content += preview
|
|
readme_content += "\n```\n\n"
|
|
else:
|
|
readme_content += f"**Content Preview:** Not available\n\n"
|
|
|
|
readme_content += f"""---
|
|
*Generated on {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
|
|
"""
|
|
|
|
return readme_content
|
|
|
|
|
|
def save_project_readme(unique_id: str):
|
|
"""Save README.md for a project"""
|
|
readme_content = generate_project_readme(unique_id)
|
|
readme_path = os.path.join("projects", unique_id, "README.md")
|
|
|
|
try:
|
|
os.makedirs(os.path.dirname(readme_path), exist_ok=True)
|
|
with open(readme_path, 'w', encoding='utf-8') as f:
|
|
f.write(readme_content)
|
|
print(f"Generated README.md for project {unique_id}")
|
|
return readme_path
|
|
except Exception as e:
|
|
print(f"Error generating README for project {unique_id}: {str(e)}")
|
|
return None
|
|
|
|
|
|
def get_project_status(unique_id: str) -> Dict:
|
|
"""Get comprehensive status of a project"""
|
|
project_dir = os.path.join("projects", unique_id)
|
|
project_exists = os.path.exists(project_dir)
|
|
|
|
if not project_exists:
|
|
return {
|
|
"unique_id": unique_id,
|
|
"project_exists": False,
|
|
"error": "Project not found"
|
|
}
|
|
|
|
# Get processed log
|
|
processed_log = load_processed_files_log(unique_id)
|
|
|
|
# Collect document.txt files
|
|
document_files = []
|
|
dataset_dir = os.path.join(project_dir, "dataset")
|
|
if os.path.exists(dataset_dir):
|
|
for root, dirs, files in os.walk(dataset_dir):
|
|
for file in files:
|
|
if file == "document.txt":
|
|
document_files.append(os.path.join(root, file))
|
|
|
|
# Check system prompt and MCP settings
|
|
system_prompt_file = os.path.join(project_dir, "system_prompt.txt")
|
|
mcp_settings_file = os.path.join(project_dir, "mcp_settings.json")
|
|
|
|
status = {
|
|
"unique_id": unique_id,
|
|
"project_exists": True,
|
|
"project_path": project_dir,
|
|
"processed_files_count": len(processed_log),
|
|
"processed_files": processed_log,
|
|
"document_files_count": len(document_files),
|
|
"document_files": document_files,
|
|
"has_system_prompt": os.path.exists(system_prompt_file),
|
|
"has_mcp_settings": os.path.exists(mcp_settings_file),
|
|
"readme_exists": os.path.exists(os.path.join(project_dir, "README.md")),
|
|
"log_file_exists": os.path.exists(os.path.join(project_dir, "processed_files.json"))
|
|
}
|
|
|
|
# Add dataset structure
|
|
try:
|
|
from utils.dataset_manager import generate_dataset_structure
|
|
status["dataset_structure"] = generate_dataset_structure(unique_id)
|
|
except Exception as e:
|
|
status["dataset_structure"] = f"Error generating structure: {str(e)}"
|
|
|
|
return status
|
|
|
|
|
|
def remove_project(unique_id: str) -> bool:
|
|
"""Remove entire project directory"""
|
|
project_dir = os.path.join("projects", unique_id)
|
|
try:
|
|
if os.path.exists(project_dir):
|
|
import shutil
|
|
shutil.rmtree(project_dir)
|
|
print(f"Removed project directory: {project_dir}")
|
|
return True
|
|
else:
|
|
print(f"Project directory not found: {project_dir}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error removing project {unique_id}: {str(e)}")
|
|
return False
|
|
|
|
|
|
def list_projects() -> List[str]:
|
|
"""List all existing project IDs"""
|
|
projects_dir = "projects"
|
|
if not os.path.exists(projects_dir):
|
|
return []
|
|
|
|
try:
|
|
return [item for item in os.listdir(projects_dir)
|
|
if os.path.isdir(os.path.join(projects_dir, item))]
|
|
except Exception as e:
|
|
print(f"Error listing projects: {str(e)}")
|
|
return []
|
|
|
|
|
|
def get_project_stats(unique_id: str) -> Dict:
|
|
"""Get statistics for a specific project"""
|
|
status = get_project_status(unique_id)
|
|
|
|
if not status["project_exists"]:
|
|
return status
|
|
|
|
stats = {
|
|
"unique_id": unique_id,
|
|
"total_processed_files": status["processed_files_count"],
|
|
"total_document_files": status["document_files_count"],
|
|
"has_system_prompt": status["has_system_prompt"],
|
|
"has_mcp_settings": status["has_mcp_settings"],
|
|
"has_readme": status["readme_exists"]
|
|
}
|
|
|
|
# Calculate file sizes
|
|
total_size = 0
|
|
document_sizes = []
|
|
|
|
for doc_file in status["document_files"]:
|
|
try:
|
|
size = os.path.getsize(doc_file)
|
|
document_sizes.append({
|
|
"file": doc_file,
|
|
"size": size,
|
|
"size_mb": round(size / (1024 * 1024), 2)
|
|
})
|
|
total_size += size
|
|
except Exception:
|
|
pass
|
|
|
|
stats["total_document_size"] = total_size
|
|
stats["total_document_size_mb"] = round(total_size / (1024 * 1024), 2)
|
|
stats["document_files_detail"] = document_sizes
|
|
|
|
# Check embeddings files
|
|
embedding_files = []
|
|
dataset_dir = os.path.join("projects", unique_id, "dataset")
|
|
if os.path.exists(dataset_dir):
|
|
for root, dirs, files in os.walk(dataset_dir):
|
|
for file in files:
|
|
if file == "document_embeddings.pkl":
|
|
file_path = os.path.join(root, file)
|
|
try:
|
|
size = os.path.getsize(file_path)
|
|
embedding_files.append({
|
|
"file": file_path,
|
|
"size": size,
|
|
"size_mb": round(size / (1024 * 1024), 2)
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
stats["embedding_files_count"] = len(embedding_files)
|
|
stats["embedding_files_detail"] = embedding_files
|
|
|
|
return stats
|