qwen_agent/skills/onprem/kfs-answer/scripts/detail.py
2026-04-18 23:21:10 +08:00

242 lines
8.8 KiB
Python

"""Return full schema + sample data for specified file:sheet pairs.
Usage: python3 detail.py <file_id1:sheet_id1>,<file_id2:sheet_id2>,...
Output: Per sheet — columns with type/stats/description + sample rows (from knowledge.md body).
datasets directory: ./datasets/ (gbase-agent-service) or ./dataset/ (catalog-agent), auto-detected at runtime.
dataset_ids are discovered automatically from subdirectories under datasets directory.
"""
import os
import re
import sys
import yaml
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _session import get_session_dir
# Derive project root from script location: scripts/ → kfs-answer/ → skills/ → project root
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
_ds = os.path.join(_PROJECT_ROOT, "datasets")
DATASETS_DIR = _ds if os.path.isdir(_ds) else os.path.join(_PROJECT_ROOT, "dataset")
def _discover_datasets():
"""Scan DATASETS_DIR for subdirectory names (each is a dataset_id)."""
if not os.path.isdir(DATASETS_DIR):
return []
return [d for d in sorted(os.listdir(DATASETS_DIR))
if os.path.isdir(os.path.join(DATASETS_DIR, d))]
def load_file_ref_map():
"""Load file_id → F{n} mapping from file_refs.txt (in session dir)."""
refs_path = os.path.join(get_session_dir(), "file_refs.txt")
mapping = {}
if not os.path.isfile(refs_path):
return mapping
ref_pat = re.compile(r"^(F\d+)=([0-9a-f-]+)\(")
with open(refs_path, "r", encoding="utf-8") as f:
for line in f:
m = ref_pat.match(line.strip())
if m:
mapping[m.group(2)] = m.group(1)
return mapping
def find_file_dir(dataset_ids, file_id):
for dataset_id in dataset_ids:
candidate = os.path.join(DATASETS_DIR, dataset_id, file_id)
if os.path.isdir(candidate):
return candidate
return None
def load_knowledge(file_dir):
"""Parse knowledge.md → (meta dict, body text)."""
km_path = os.path.join(file_dir, "knowledge.md")
if not os.path.isfile(km_path):
return None, None
with open(km_path, "r", encoding="utf-8") as f:
content = f.read()
if not content.startswith("---"):
return None, None
parts = content.split("---", 2)
if len(parts) < 3:
return None, None
meta = yaml.safe_load(parts[1])
body = parts[2].strip()
return meta, body
def extract_sheet_body(body, sheet_id):
"""Extract body section for a specific sheet (delimited by <!-- sheet_xxx [...] -->)."""
parts = re.split(r"<!--\s*sheet_\w+(?:\s+[^>]*)?\s*-->", body)
markers = re.findall(r"<!--\s*(sheet_\w+)(?:\s+[^>]*)?\s*-->", body)
for i, marker in enumerate(markers):
if marker == sheet_id and i < len(parts) - 1:
return parts[i + 1].strip()
# Fallback: if only one sheet and no markers, return entire body
if len(markers) == 0 and len(parts) == 1:
return body.strip()
return ""
def extract_sheet_src(body, sheet_id):
"""Extract __src value from <!-- sheet_xxx __src="F0S1" --> marker. Returns empty string if not found."""
m = re.search(rf'<!--\s*{re.escape(sheet_id)}\s+__src="([^"]*)"', body)
return m.group(1) if m else ""
def format_columns(columns):
"""Format columns as compact schema display."""
lines = []
for col in columns:
name = col.get("name", "?")
ctype = col.get("type", "text")
desc = col.get("description", "")
stats = []
if "distinct" in col:
stats.append(f"distinct={col['distinct']}")
if "null_rate" in col:
stats.append(f"null={col['null_rate']}")
if "avg_length" in col:
stats.append(f"avg_len={col['avg_length']}")
if "range" in col:
stats.append(f"range={col['range']}")
if "mean" in col:
stats.append(f"mean={col['mean']}")
if "sample" in col:
sample = col["sample"]
if isinstance(sample, list):
sample = ",".join(str(s) for s in sample[:5])
stats.append(f"sample=[{sample}]")
if "values" in col:
vals = col["values"]
if isinstance(vals, list):
vals = ",".join(str(v) for v in vals[:8])
stats.append(f"values=[{vals}]")
if "topics" in col:
topics = col["topics"]
if isinstance(topics, list):
topics = ",".join(str(t) for t in topics[:5])
stats.append(f"topics=[{topics}]")
stats_str = f" ({', '.join(stats)})" if stats else ""
desc_str = f"{desc}" if desc else ""
lines.append(f" {name} [{ctype}]{stats_str}{desc_str}")
return "\n".join(lines)
def main():
# Auto-discover datasets from ./dataset/ or ./datasets/ subdirectories
dataset_ids = _discover_datasets()
raw_entries = [e.strip() for e in sys.argv[1].split(",") if e.strip()]
# Load F0→F{n} mapping
f_ref_map = load_file_ref_map()
# Parse file_id:sheet_id pairs
entries = []
for entry in raw_entries:
if ":" in entry:
fid, sid = entry.split(":", 1)
entries.append((fid.strip(), sid.strip()))
else:
entries.append((entry.strip(), None))
# Group by file_id
file_sheets = {}
for fid, sid in entries:
file_sheets.setdefault(fid, []).append(sid)
for fid, sheet_ids in file_sheets.items():
file_dir = find_file_dir(dataset_ids, fid)
if not file_dir:
print(f"{'='*60}")
print(f"file_id: {fid}")
print(f" ERROR: not found")
continue
meta, body = load_knowledge(file_dir)
if not meta:
print(f"{'='*60}")
print(f"file_id: {fid}")
print(f" ERROR: knowledge.md not found or invalid")
continue
source_name = meta.get("source_name", "unknown")
# Check for knowledge.db
db_path = os.path.join(file_dir, "knowledge.db")
has_db = os.path.isfile(db_path)
print(f"{'='*60}")
print(f"file_id: {fid}")
print(f"source: {source_name}")
if has_db:
print(f"db_path: {db_path}")
sheets_meta = {s["id"]: s for s in meta.get("sheets", [])}
for sid in sheet_ids:
if sid is None:
# Show all sheets
target_sheets = list(sheets_meta.values())
elif sid in sheets_meta:
target_sheets = [sheets_meta[sid]]
else:
print(f"\n sheet {sid}: NOT FOUND in metadata")
continue
for sheet in target_sheets:
sheet_id = sheet["id"]
sname = sheet.get("name", "?")
stype = sheet.get("type", "?")
print(f"\n --- {sheet_id}: {sname} [{stype}]")
_sheet_desc = str(sheet.get("description") or "").strip()
_block_titles = sheet.get("block_titles") or []
if _sheet_desc:
print(f" description: {_sheet_desc[:200]}")
elif _block_titles:
print(f" description (fallback from block_titles): {str(_block_titles[0])[:200]}")
elif sname and not str(sname).startswith("sheet_"):
print(f" description (fallback from sheet name): {sname}")
if stype == "db":
row_count = sheet.get("row_count", "?")
db_table = sheet.get("db_table", sheet_id)
print(f" table: {db_table}, rows: {row_count}")
columns = sheet.get("columns", [])
if columns:
print(f" columns ({len(columns)}):")
print(format_columns(columns))
else:
block_count = sheet.get("block_count", "?")
print(f" blocks: {block_count}")
# Show body section (notes + sample for db, content for markdown)
if body:
src_tag = extract_sheet_src(body, sheet_id) if body else ""
f_code = f_ref_map.get(fid, "")
if src_tag and f_code and "F0" in src_tag:
src_tag = src_tag.replace("F0S", f"{f_code}S")
section = extract_sheet_body(body, sheet_id)
if section:
# Truncate to ~2000 chars
if len(section) > 2000:
section = section[:2000] + "\n ... [truncated]"
if src_tag:
print(f' __src="{src_tag}"')
print(f" content:")
for line in section.split("\n"):
print(f" {line}")
print(f"\n{'='*60}")
print(f"Done. Showed {len(file_sheets)} files.")
if __name__ == "__main__":
main()