242 lines
8.8 KiB
Python
242 lines
8.8 KiB
Python
"""Return full schema + sample data for specified file:sheet pairs.
|
|
|
|
Usage: python3 detail.py <file_id1:sheet_id1>,<file_id2:sheet_id2>,...
|
|
|
|
Output: Per sheet — columns with type/stats/description + sample rows (from knowledge.md body).
|
|
|
|
datasets directory: ./datasets/ (gbase-agent-service) or ./dataset/ (catalog-agent), auto-detected at runtime.
|
|
dataset_ids are discovered automatically from subdirectories under datasets directory.
|
|
"""
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
import yaml
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from _session import get_session_dir
|
|
|
|
# Derive project root from script location: scripts/ → kfs-answer/ → skills/ → project root
|
|
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
|
_ds = os.path.join(_PROJECT_ROOT, "datasets")
|
|
DATASETS_DIR = _ds if os.path.isdir(_ds) else os.path.join(_PROJECT_ROOT, "dataset")
|
|
|
|
|
|
def _discover_datasets():
|
|
"""Scan DATASETS_DIR for subdirectory names (each is a dataset_id)."""
|
|
if not os.path.isdir(DATASETS_DIR):
|
|
return []
|
|
return [d for d in sorted(os.listdir(DATASETS_DIR))
|
|
if os.path.isdir(os.path.join(DATASETS_DIR, d))]
|
|
|
|
|
|
def load_file_ref_map():
|
|
"""Load file_id → F{n} mapping from file_refs.txt (in session dir)."""
|
|
refs_path = os.path.join(get_session_dir(), "file_refs.txt")
|
|
mapping = {}
|
|
if not os.path.isfile(refs_path):
|
|
return mapping
|
|
ref_pat = re.compile(r"^(F\d+)=([0-9a-f-]+)\(")
|
|
with open(refs_path, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
m = ref_pat.match(line.strip())
|
|
if m:
|
|
mapping[m.group(2)] = m.group(1)
|
|
return mapping
|
|
|
|
|
|
def find_file_dir(dataset_ids, file_id):
|
|
for dataset_id in dataset_ids:
|
|
candidate = os.path.join(DATASETS_DIR, dataset_id, file_id)
|
|
if os.path.isdir(candidate):
|
|
return candidate
|
|
return None
|
|
|
|
|
|
def load_knowledge(file_dir):
|
|
"""Parse knowledge.md → (meta dict, body text)."""
|
|
km_path = os.path.join(file_dir, "knowledge.md")
|
|
if not os.path.isfile(km_path):
|
|
return None, None
|
|
with open(km_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
if not content.startswith("---"):
|
|
return None, None
|
|
parts = content.split("---", 2)
|
|
if len(parts) < 3:
|
|
return None, None
|
|
meta = yaml.safe_load(parts[1])
|
|
body = parts[2].strip()
|
|
return meta, body
|
|
|
|
|
|
def extract_sheet_body(body, sheet_id):
|
|
"""Extract body section for a specific sheet (delimited by <!-- sheet_xxx [...] -->)."""
|
|
parts = re.split(r"<!--\s*sheet_\w+(?:\s+[^>]*)?\s*-->", body)
|
|
markers = re.findall(r"<!--\s*(sheet_\w+)(?:\s+[^>]*)?\s*-->", body)
|
|
|
|
for i, marker in enumerate(markers):
|
|
if marker == sheet_id and i < len(parts) - 1:
|
|
return parts[i + 1].strip()
|
|
# Fallback: if only one sheet and no markers, return entire body
|
|
if len(markers) == 0 and len(parts) == 1:
|
|
return body.strip()
|
|
return ""
|
|
|
|
|
|
def extract_sheet_src(body, sheet_id):
|
|
"""Extract __src value from <!-- sheet_xxx __src="F0S1" --> marker. Returns empty string if not found."""
|
|
m = re.search(rf'<!--\s*{re.escape(sheet_id)}\s+__src="([^"]*)"', body)
|
|
return m.group(1) if m else ""
|
|
|
|
|
|
def format_columns(columns):
|
|
"""Format columns as compact schema display."""
|
|
lines = []
|
|
for col in columns:
|
|
name = col.get("name", "?")
|
|
ctype = col.get("type", "text")
|
|
desc = col.get("description", "")
|
|
stats = []
|
|
if "distinct" in col:
|
|
stats.append(f"distinct={col['distinct']}")
|
|
if "null_rate" in col:
|
|
stats.append(f"null={col['null_rate']}")
|
|
if "avg_length" in col:
|
|
stats.append(f"avg_len={col['avg_length']}")
|
|
if "range" in col:
|
|
stats.append(f"range={col['range']}")
|
|
if "mean" in col:
|
|
stats.append(f"mean={col['mean']}")
|
|
if "sample" in col:
|
|
sample = col["sample"]
|
|
if isinstance(sample, list):
|
|
sample = ",".join(str(s) for s in sample[:5])
|
|
stats.append(f"sample=[{sample}]")
|
|
if "values" in col:
|
|
vals = col["values"]
|
|
if isinstance(vals, list):
|
|
vals = ",".join(str(v) for v in vals[:8])
|
|
stats.append(f"values=[{vals}]")
|
|
if "topics" in col:
|
|
topics = col["topics"]
|
|
if isinstance(topics, list):
|
|
topics = ",".join(str(t) for t in topics[:5])
|
|
stats.append(f"topics=[{topics}]")
|
|
|
|
stats_str = f" ({', '.join(stats)})" if stats else ""
|
|
desc_str = f" — {desc}" if desc else ""
|
|
lines.append(f" {name} [{ctype}]{stats_str}{desc_str}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
# Auto-discover datasets from ./dataset/ or ./datasets/ subdirectories
|
|
dataset_ids = _discover_datasets()
|
|
raw_entries = [e.strip() for e in sys.argv[1].split(",") if e.strip()]
|
|
|
|
# Load F0→F{n} mapping
|
|
f_ref_map = load_file_ref_map()
|
|
|
|
# Parse file_id:sheet_id pairs
|
|
entries = []
|
|
for entry in raw_entries:
|
|
if ":" in entry:
|
|
fid, sid = entry.split(":", 1)
|
|
entries.append((fid.strip(), sid.strip()))
|
|
else:
|
|
entries.append((entry.strip(), None))
|
|
|
|
# Group by file_id
|
|
file_sheets = {}
|
|
for fid, sid in entries:
|
|
file_sheets.setdefault(fid, []).append(sid)
|
|
|
|
for fid, sheet_ids in file_sheets.items():
|
|
file_dir = find_file_dir(dataset_ids, fid)
|
|
if not file_dir:
|
|
print(f"{'='*60}")
|
|
print(f"file_id: {fid}")
|
|
print(f" ERROR: not found")
|
|
continue
|
|
|
|
meta, body = load_knowledge(file_dir)
|
|
if not meta:
|
|
print(f"{'='*60}")
|
|
print(f"file_id: {fid}")
|
|
print(f" ERROR: knowledge.md not found or invalid")
|
|
continue
|
|
|
|
source_name = meta.get("source_name", "unknown")
|
|
# Check for knowledge.db
|
|
db_path = os.path.join(file_dir, "knowledge.db")
|
|
has_db = os.path.isfile(db_path)
|
|
|
|
print(f"{'='*60}")
|
|
print(f"file_id: {fid}")
|
|
print(f"source: {source_name}")
|
|
if has_db:
|
|
print(f"db_path: {db_path}")
|
|
|
|
sheets_meta = {s["id"]: s for s in meta.get("sheets", [])}
|
|
|
|
for sid in sheet_ids:
|
|
if sid is None:
|
|
# Show all sheets
|
|
target_sheets = list(sheets_meta.values())
|
|
elif sid in sheets_meta:
|
|
target_sheets = [sheets_meta[sid]]
|
|
else:
|
|
print(f"\n sheet {sid}: NOT FOUND in metadata")
|
|
continue
|
|
|
|
for sheet in target_sheets:
|
|
sheet_id = sheet["id"]
|
|
sname = sheet.get("name", "?")
|
|
stype = sheet.get("type", "?")
|
|
print(f"\n --- {sheet_id}: {sname} [{stype}]")
|
|
_sheet_desc = str(sheet.get("description") or "").strip()
|
|
_block_titles = sheet.get("block_titles") or []
|
|
if _sheet_desc:
|
|
print(f" description: {_sheet_desc[:200]}")
|
|
elif _block_titles:
|
|
print(f" description (fallback from block_titles): {str(_block_titles[0])[:200]}")
|
|
elif sname and not str(sname).startswith("sheet_"):
|
|
print(f" description (fallback from sheet name): {sname}")
|
|
|
|
if stype == "db":
|
|
row_count = sheet.get("row_count", "?")
|
|
db_table = sheet.get("db_table", sheet_id)
|
|
print(f" table: {db_table}, rows: {row_count}")
|
|
columns = sheet.get("columns", [])
|
|
if columns:
|
|
print(f" columns ({len(columns)}):")
|
|
print(format_columns(columns))
|
|
else:
|
|
block_count = sheet.get("block_count", "?")
|
|
print(f" blocks: {block_count}")
|
|
|
|
# Show body section (notes + sample for db, content for markdown)
|
|
if body:
|
|
src_tag = extract_sheet_src(body, sheet_id) if body else ""
|
|
f_code = f_ref_map.get(fid, "")
|
|
if src_tag and f_code and "F0" in src_tag:
|
|
src_tag = src_tag.replace("F0S", f"{f_code}S")
|
|
section = extract_sheet_body(body, sheet_id)
|
|
if section:
|
|
# Truncate to ~2000 chars
|
|
if len(section) > 2000:
|
|
section = section[:2000] + "\n ... [truncated]"
|
|
if src_tag:
|
|
print(f' __src="{src_tag}"')
|
|
print(f" content:")
|
|
for line in section.split("\n"):
|
|
print(f" {line}")
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"Done. Showed {len(file_sheets)} files.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|