qwen_agent/skills/auto-daily-summary/scripts/summary_core.py
2026-04-16 10:23:54 +08:00

229 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from collections import Counter
from typing import Any
SUMMARY_LENGTH_LIMITS = {
"tldr": 2,
"short": 4,
"standard": 6,
"detailed": 10,
}
ACTION_PREFIX_PATTERNS = [
r"^(?:TODO|待办|action)[:]?\s*(.+)$",
r"^(?:需要|跟进)\s*(.+)$",
]
RISK_KEYWORDS = {
"high": ["阻塞", "blocker", "故障", "失败", "严重", "不可用"],
"medium": ["风险", "延迟", "异常", "超时", "报错"],
"low": ["提醒", "注意", "观察", "待确认"],
}
BOOL_FIELDS = ["extract_actions", "extract_risks"]
VALID_STYLES = {"daily_report", "digest", "meeting_digest", "executive"}
VALID_LENGTHS = {"tldr", "short", "standard", "detailed"}
def _clean_text(text: str) -> str:
return re.sub(r"\s+", " ", (text or "").strip())
def _normalize_key(text: str) -> str:
text = _clean_text(text).lower()
return re.sub(r"[^\w\u4e00-\u9fff]+", "", text)
def _split_sentences(text: str) -> list[str]:
raw_parts = re.split(r"[。!?;.!?;]+|\n+", text)
return [_clean_text(part) for part in raw_parts if _clean_text(part)]
def _sentence_tokens(sentence: str) -> list[str]:
return re.findall(r"[A-Za-z0-9_-]+|[\u4e00-\u9fff]{2,}", sentence.lower())
def _top_sentences(texts: list[str], limit: int) -> list[str]:
sentences: list[str] = []
for text in texts:
sentences.extend(_split_sentences(text))
if not sentences:
return []
token_counter = Counter()
for sentence in sentences:
token_counter.update(_sentence_tokens(sentence))
scored: list[tuple[int, int, str]] = []
for index, sentence in enumerate(sentences):
score = sum(token_counter[token] for token in _sentence_tokens(sentence))
scored.append((score, -index, sentence))
ranked = [sentence for _, _, sentence in sorted(scored, reverse=True)]
unique_ranked = []
seen = set()
for sentence in ranked:
key = _normalize_key(sentence)
if key and key not in seen:
seen.add(key)
unique_ranked.append(sentence)
if len(unique_ranked) >= limit:
break
return unique_ranked
def _trim_fragment(text: str, max_length: int = 80) -> str:
fragment = re.split(r"[,。;;!?]", text, maxsplit=1)[0]
fragment = _clean_text(fragment)
return fragment[:max_length].strip()
def _extract_actions(texts: list[str]) -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
for text in texts:
for sentence in _split_sentences(text):
for pattern in ACTION_PREFIX_PATTERNS:
match = re.match(pattern, sentence, re.IGNORECASE)
if not match:
continue
task = _trim_fragment(match.group(1))
if len(task) < 2:
continue
items.append({"task": task, "owner": None, "due_at": None, "blocker": None})
break
dedup = []
seen = set()
for item in items:
key = _normalize_key(item["task"])
if key and key not in seen:
seen.add(key)
dedup.append(item)
return dedup[:10]
def _extract_risks(texts: list[str]) -> list[dict[str, Any]]:
risks: list[dict[str, Any]] = []
for text in texts:
for sentence in _split_sentences(text):
lowered = sentence.lower()
for impact, keywords in RISK_KEYWORDS.items():
matched = next((keyword for keyword in keywords if keyword.lower() in lowered), None)
if not matched:
continue
start = max(0, lowered.find(matched.lower()) - 18)
end = min(len(sentence), lowered.find(matched.lower()) + len(matched) + 30)
fragment = _clean_text(sentence[start:end])
fragment = fragment[:120]
if len(fragment) < 2:
continue
risks.append({"risk": fragment, "impact": impact, "mitigation": None})
break
dedup = []
seen = set()
for item in risks:
key = _normalize_key(item["risk"])
if key and key not in seen:
seen.add(key)
dedup.append(item)
return dedup[:10]
def _build_summary_line(sentences: list[str]) -> str:
if not sentences:
return "暂无可提炼的关键信息。"
selected = sentences[:2]
return "".join(selected)
def build_summary(payload: dict[str, Any]) -> dict[str, Any]:
data = payload.get("data", {})
sources = data.get("sources", [])
texts = [_clean_text(source.get("content", "")) for source in sources if _clean_text(source.get("content", ""))]
length = data.get("length", "standard")
style = data.get("style", "daily_report")
limit = SUMMARY_LENGTH_LIMITS.get(length, SUMMARY_LENGTH_LIMITS["standard"])
top_sentences = _top_sentences(texts, limit)
summary_line = _build_summary_line(top_sentences)
summary_keys = {_normalize_key(sentence) for sentence in top_sentences[:2]}
detail_sentences = [sentence for sentence in top_sentences if _normalize_key(sentence) not in summary_keys]
sections = []
if detail_sentences:
if len(detail_sentences) == 1:
sections = [{"title": "Key Updates", "bullets": detail_sentences}]
else:
midpoint = max(1, len(detail_sentences) // 2)
sections = [
{"title": "Key Updates", "bullets": detail_sentences[:midpoint]},
{"title": "Notable Details", "bullets": detail_sentences[midpoint:]},
]
action_items = _extract_actions(texts) if data.get("extract_actions") else []
risk_items = _extract_risks(texts) if data.get("extract_risks") else []
markdown_lines = ["# Summary", "", "## Summary", f"- {summary_line}"]
for section in sections:
if not section["bullets"]:
continue
markdown_lines.extend(["", f"## {section['title']}"])
markdown_lines.extend(f"- {bullet}" for bullet in section["bullets"])
if action_items:
markdown_lines.extend(["", "## Action Items"])
markdown_lines.extend(f"- {item['task']}" for item in action_items)
if risk_items:
markdown_lines.extend(["", "## Risks"])
markdown_lines.extend(f"- [{item['impact']}] {item['risk']}" for item in risk_items)
schedule_payload = {
"suggested_name": "Daily Summary",
"message": "[Scheduled Task Triggered] 请立即汇总最新内容并输出结构化摘要,如有行动项和风险请一并列出,然后选择合适的通知方式发送给用户。",
}
return {
"summary": summary_line,
"sections": sections,
"action_items": action_items,
"risk_items": risk_items,
"markdown": "\n".join(markdown_lines),
"schedule_payload": schedule_payload,
"style": style,
}
def _validate_source(source: Any, index: int) -> list[str]:
errors = []
if not isinstance(source, dict):
return [f"data.sources[{index}] must be an object"]
if not _clean_text(str(source.get("content", ""))):
errors.append(f"data.sources[{index}].content is required")
return errors
def validate_payload(payload: dict[str, Any]) -> list[str]:
errors = []
data = payload.get("data")
if not isinstance(data, dict):
return ["data must be an object"]
sources = data.get("sources")
if not isinstance(sources, list) or not sources:
errors.append("data.sources must be a non-empty array")
else:
for index, source in enumerate(sources):
errors.extend(_validate_source(source, index))
objective = data.get("objective")
if not isinstance(objective, str) or not objective.strip():
errors.append("data.objective is required")
style = data.get("style")
if style is not None and style not in VALID_STYLES:
errors.append(f"data.style must be one of {sorted(VALID_STYLES)}")
length = data.get("length")
if length is not None and length not in VALID_LENGTHS:
errors.append(f"data.length must be one of {sorted(VALID_LENGTHS)}")
for field in BOOL_FIELDS:
value = data.get(field)
if value is not None and not isinstance(value, bool):
errors.append(f"data.{field} must be a boolean")
return errors