qwen_agent/skills/common/web2md/tools/convert.py
2026-06-23 10:12:55 +08:00

684 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Web to Markdown Converter Tool
优先级方法非Python优先
1. Jina Reader API - 零安装一行URL转换
2. Firecrawl API - 需要API密钥
3. Python实现 - 以上方法失败时的回退
arXiv 特殊处理:
- 输入: https://arxiv.org/abs/2601.04500v1
- 转换为: https://arxiv.org/html/2601.04500v1
- 优先 Jina Reader失败则 Python 下载 PDF
微信公众号特殊处理:
- 优先 WeSpy
- 失败则回退 Playwright
- 最后回退 Python 方法
用法:
python convert.py <url> [--pure-text] [--output <file>]
示例:
python convert.py https://www.breezedeus.com/article/ai-agent-context-engineering
python convert.py https://arxiv.org/abs/2601.04500v1 --output paper.md
python convert.py https://x.com/user/status/123 --pure-text
"""
import sys
import argparse
import logging
from pathlib import Path
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from bs4 import BeautifulSoup
from markdownify import markdownify as md
import tempfile
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import re
import os
logger = logging.getLogger(__name__)
class WebToMarkdown:
"""网页转 Markdown 转换器(并行优先级方法)"""
# 超时常量(秒)
TIMEOUT_HEAD = 3
TIMEOUT_JINA = 8
TIMEOUT_FIRECRAWL = 10
TIMEOUT_REQUESTS = 15
TIMEOUT_PLAYWRIGHT = 15000 # 毫秒
def __init__(self):
self.session = requests.Session()
self.session.headers.update(
{"User-Agent": "Mozilla/5.0 (compatible; DocAI-Converter/1.0)"}
)
# 配置重试策略:仅针对 429/5xx最多 2 次,指数退避
retry = Retry(
total=2,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "POST"],
)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
# 从环境变量获取 Firecrawl API 密钥
self.firecrawl_api_key = os.environ.get("FIRECRAWL_API_KEY")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.session.close()
def convert(self, url, pure_text=False, use_python=False):
"""转换 URL 到 Markdown并行优先级方法
并行发起 Jina Reader / Firecrawl / Python取最快成功的结果。
微信公众号和 --use-python 模式走直连路径。
Args:
url: 网页 URL
pure_text: 是否返回纯文本(无格式)
use_python: 强制使用Python方法
Returns:
str: Markdown 或纯文本内容
"""
url = url.strip()
# URL 校验
parsed = urlparse(url)
if parsed.scheme not in ("http", "https") or not parsed.netloc:
raise ValueError(f"无效的 URL: {url}")
# arXiv 特殊处理:转换为 HTML URL
if self._is_arxiv(url):
url = self._convert_arxiv_to_html(url)
# 微信公众号:优先使用 WeSpy失败则回退到 Playwright / Python
if self._is_wechat(url):
result = self._try_wespy(url, pure_text)
if result:
return result
result = self._try_playwright(url, pure_text)
if result:
return result
return self._python_convert(url, pure_text)
# 推特 X.com 特殊处理如果URL是twitter/x.com转换为fxtwitter/fixupx以获取元数据渲染的内容
if self._is_twitter(url):
url = self._convert_twitter_to_proxy(url)
# 强制 Python 模式
if use_python:
if self._is_arxiv(url):
return self._handle_arxiv(url, pure_text)
return self._python_convert(url, pure_text)
# 并行发起多种方法,取最快成功的
result = self._parallel_convert(url, pure_text)
if result:
return result
# 所有并行方法都失败arXiv 尝试 PDF 回退
if self._is_arxiv(url):
return self._handle_arxiv(url, pure_text)
return None
def _parallel_convert(self, url, pure_text):
"""并行尝试多种方法,返回最快成功的结果"""
futures = {}
with ThreadPoolExecutor(max_workers=4) as executor:
futures[executor.submit(self._try_jina_reader, url, pure_text)] = "jina"
if self.firecrawl_api_key:
futures[executor.submit(self._try_firecrawl, url, pure_text)] = (
"firecrawl"
)
futures[executor.submit(self._python_convert, url, pure_text)] = "python"
futures[executor.submit(self._try_playwright, url, pure_text)] = (
"playwright"
)
for future in as_completed(futures):
try:
result = future.result()
except Exception:
continue
if result:
for f in futures:
f.cancel()
return result
return None
def _try_jina_reader(self, url, pure_text):
"""尝试使用 Jina Reader API
用法: https://r.jina.ai/https://www.breezedeus.com/article/ai-agent-context-engineering
"""
jina_base_urls = ["https://r.jinaai.cn", "https://r.jina.ai"]
try:
for jina_base_url in jina_base_urls:
jina_url = f"{jina_base_url}/{url}"
try:
response = self.session.get(jina_url, timeout=self.TIMEOUT_JINA)
response.raise_for_status()
content = response.text
if content and len(content.strip()) > 50: # 验证有内容
if pure_text:
return content
# Jina 已经返回不错的 Markdown稍作清理即可
return self._clean_jina_markdown(content)
except Exception as e:
logger.warning("Jina Reader 失败 (%s): %s", jina_base_url, e)
except Exception as e:
logger.warning("Jina Reader 失败: %s", e)
return None
def _try_firecrawl(self, url, pure_text):
"""尝试使用 Firecrawl API"""
if not self.firecrawl_api_key:
logger.info("Firecrawl API 密钥未设置 (FIRECRAWL_API_KEY)")
return None
try:
response = self.session.post(
"https://api.firecrawl.dev/v0/scrape",
headers={"Authorization": f"Bearer {self.firecrawl_api_key}"},
json={"url": url, "formats": ["markdown"]},
timeout=self.TIMEOUT_FIRECRAWL,
)
if response.status_code == 200:
data = response.json()
if data.get("success") and data.get("data", {}).get("markdown"):
markdown = data["data"]["markdown"]
if pure_text:
# 从 Markdown 提取纯文本
return re.sub(r"[\*\#\`\[\]\(\)]", "", markdown)
return markdown
else:
logger.warning("Firecrawl 错误: %s", response.status_code)
except Exception as e:
logger.warning("Firecrawl 失败: %s", e)
return None
def _try_playwright(self, url, pure_text):
"""尝试使用 Playwright 获取动态页面"""
try:
content = self._get_with_playwright(url)
if not content or len(content.strip()) < 50:
return None
if pure_text:
return self._to_plain_text(content)
return self._to_markdown(content)
except Exception as e:
logger.warning("Playwright 失败: %s", e)
return None
def _try_wespy(self, url, pure_text):
"""尝试使用 WeSpy 获取微信公众号内容"""
try:
from wespy import ArticleFetcher
except ImportError as e:
logger.warning("WeSpy 未安装: %s", e)
return None
try:
fetcher = ArticleFetcher()
with tempfile.TemporaryDirectory() as output_dir:
article_info = fetcher.fetch_article(
url=url,
output_dir=output_dir,
save_markdown=True,
save_html=False,
save_json=False,
)
if not article_info:
return None
markdown = self._read_wespy_markdown(output_dir, article_info)
if not markdown:
return None
if pure_text:
return self._markdown_to_plain_text(markdown)
return markdown.strip()
except Exception as e:
logger.warning("WeSpy 失败: %s", e)
return None
def _python_convert(self, url, pure_text):
"""Python实现回退方法"""
# 自动检测是否需要浏览器
use_browser = self._needs_browser(url)
if use_browser:
content = self._get_with_playwright(url)
is_pdf = False
else:
content, is_pdf = self._get_with_requests(url)
if is_pdf:
return self._process_pdf(content, pure_text)
# HTML 转换
if pure_text:
return self._to_plain_text(content)
else:
return self._to_markdown(content)
def _handle_arxiv(self, url, pure_text):
"""arXiv Python回退方法从HTML URL转为PDF下载"""
try:
pdf_url = self._convert_arxiv_to_pdf(url)
logger.info("arXiv Python回退: 下载PDF %s", pdf_url)
pdf_content, _ = self._get_with_requests(pdf_url)
return self._process_pdf(pdf_content, pure_text)
except Exception as e:
logger.error("arXiv PDF失败: %s", e)
return None
def _clean_jina_markdown(self, markdown):
"""清理 Jina Reader 返回的 Markdown"""
# 移除多余的空行
markdown = re.sub(r"\n{3,}", "\n\n", markdown)
# 移除行尾空格
markdown = re.sub(r" +\n", "\n", markdown)
return markdown.strip()
def _markdown_to_plain_text(self, markdown):
"""从 Markdown 提取纯文本"""
text = re.sub(r"!\[[^\]]*\]\([^\)]*\)", "", markdown)
text = re.sub(r"\[([^\]]+)\]\([^\)]*\)", r"\1", text)
text = re.sub(r"^#{1,6}\s*", "", text, flags=re.MULTILINE)
text = re.sub(r"^[>*-]\s*", "", text, flags=re.MULTILINE)
text = re.sub(r"[`*_~]", "", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def _read_wespy_markdown(self, output_dir, article_info):
"""从 WeSpy 输出目录或返回值读取 Markdown"""
if isinstance(article_info, dict):
for key in ("markdown", "markdown_content", "content"):
value = article_info.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
markdown_files = sorted(Path(output_dir).rglob("*.md"))
if not markdown_files:
return None
return markdown_files[0].read_text(encoding="utf-8").strip()
def _is_twitter(self, url):
"""检查是否是推特/X URL"""
parsed = urlparse(url)
netloc = parsed.netloc.lower()
return "twitter.com" in netloc or "x.com" in netloc
def _convert_twitter_to_proxy(self, url):
"""转换 Twitter/X URL 到支持元数据预览的 fxtwitter 或 fixupx"""
parsed = urlparse(url)
netloc = parsed.netloc.lower()
# 将 twitter.com 替换为 fxtwitter.com, x.com 替换为 fixupx.com
if "twitter.com" in netloc:
new_netloc = netloc.replace("twitter.com", "fxtwitter.com")
elif "x.com" in netloc:
new_netloc = netloc.replace("x.com", "fixupx.com")
else:
return url
return url.replace(netloc, new_netloc)
def _is_arxiv(self, url):
"""检测是否为 arXiv 链接"""
return "arxiv.org" in url and (
"/abs/" in url or "/pdf/" in url or "/html/" in url
)
def _is_wechat(self, url):
"""检测是否为微信公众号链接"""
return "weixin.qq.com" in url
def _convert_arxiv_to_html(self, url):
"""转换 arXiv 链接为 HTML URL"""
if "/html/" in url:
return url
if "/pdf/" in url:
paper_id = url.split("/pdf/")[-1].split("?")[0].replace(".pdf", "")
return f"https://arxiv.org/html/{paper_id}"
paper_id = url.split("/abs/")[-1].split("?")[0]
return f"https://arxiv.org/html/{paper_id}"
def _convert_arxiv_to_pdf(self, url):
"""转换 arXiv 链接为 PDF URL"""
if "/pdf/" in url:
return url
if "/html/" in url:
paper_id = url.split("/html/")[-1].split("?")[0]
return f"https://arxiv.org/pdf/{paper_id}.pdf"
paper_id = url.split("/abs/")[-1].split("?")[0]
return f"https://arxiv.org/pdf/{paper_id}.pdf"
def _is_known_dynamic_site(self, url):
"""检测是否为已知的动态网站(纯函数,无网络调用)"""
parsed = urlparse(url)
domain = parsed.netloc.lower()
dynamic_domains = [
"x.com",
"twitter.com",
"medium.com",
"substack.com",
"github.com",
"reddit.com",
]
for dynamic in dynamic_domains:
if domain.endswith(dynamic):
return True
if "weixin.qq.com" in domain:
return False
return None # 未知,需要探测
def _probe_for_spa(self, url):
"""通过 HEAD 请求探测是否为 SPA有网络调用"""
try:
response = self.session.head(
url, timeout=self.TIMEOUT_HEAD, allow_redirects=True
)
content_type = response.headers.get("content-type", "").lower()
if "application/json" in content_type:
return True
server = response.headers.get("server", "").lower()
if any(s in server for s in ["nextjs", "vercel", "vite"]):
return True
except Exception:
return True
return False
def _needs_browser(self, url):
"""自动检测是否需要浏览器渲染"""
known = self._is_known_dynamic_site(url)
if known is not None:
return known
return self._probe_for_spa(url)
def _get_with_requests(self, url):
"""使用 requests 获取静态页面或 PDF
Returns:
tuple: (content, is_pdf) - content 为 bytes(PDF) 或 str(HTML)
"""
response = self.session.get(url, timeout=self.TIMEOUT_REQUESTS)
response.raise_for_status()
content_type = response.headers.get("content-type", "").lower()
is_pdf = "application/pdf" in content_type or url.lower().endswith(".pdf")
if is_pdf:
return response.content, True
return response.text, False
def _process_pdf(self, pdf_content, pure_text=False):
"""处理 PDF 内容,返回 Markdown 或纯文本(只打开一次文档)"""
try:
import fitz # PyMuPDF
except ImportError:
raise ImportError("PDF 处理需要 PyMuPDF。\n" "请运行: pip install pymupdf")
try:
doc = fitz.open(stream=pdf_content, filetype="pdf")
# 提取全文
text = ""
for page_num, page in enumerate(doc):
page_text = page.get_text()
if page_text.strip():
text += f"--- Page {page_num + 1} ---\n\n"
text += page_text + "\n\n"
text = text.strip()
if pure_text:
return text
# 提取标题
title = None
metadata = doc.metadata
if metadata and metadata.get("title"):
title = metadata["title"]
elif doc.page_count > 0:
first_page = doc[0]
lines = [
line.strip()
for line in first_page.get_text().split("\n")
if line.strip()
]
if lines:
title = " ".join(lines[:2])
if title:
return f"# {title}\n\n{text}"
return text
except Exception as e:
raise Exception(f"PDF 处理失败: {e}")
def _get_with_playwright(self, url):
"""使用 Playwright 获取动态页面"""
try:
from playwright.sync_api import sync_playwright
except ImportError:
raise ImportError(
"Playwright 未安装。\n"
"请运行: pip install playwright && playwright install chromium"
)
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
# 微信公众号使用移动 UA
if "weixin.qq.com" in url:
page.set_extra_http_headers(
{
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0) AppleWebKit/605.1.15"
}
)
try:
page.goto(
url, wait_until="networkidle", timeout=self.TIMEOUT_PLAYWRIGHT
)
page.wait_for_timeout(2000)
content = page.content()
finally:
browser.close()
return content
def _to_markdown(self, html):
"""HTML 转 Markdown"""
soup = BeautifulSoup(html, "html.parser")
# 提取标题(微信公众号等)
title = None
# 尝试多种标题来源
title_selectors = ["title", "h1#activity-name", ".rich_media_title", "h1"]
for selector in title_selectors:
title_elem = soup.select_one(selector)
if title_elem:
title = title_elem.get_text(strip=True)
if title:
break
# 查找正文内容(优先级)
content_elem = None
content_selectors = [
"#js_content", # 微信公众号
".rich_media_content", # 微信公众号
"#activity-detail", # 微信公众号
"article", # 标准文章
"main", # 标准主内容
".post-content", # 博客
".article-content", # 博客
]
for selector in content_selectors:
elem = soup.select_one(selector)
if elem and elem.get_text(strip=True):
content_elem = elem
break
# 如果没找到特定内容,使用 body
if not content_elem:
content_elem = soup.body or soup
# 移除噪音元素
for tag in content_elem(
["script", "style", "nav", "footer", "header", "iframe", "aside"]
):
tag.decompose()
# 移除广告和交互元素
for tag in content_elem.find_all(
class_=lambda x: x
and any(
w in x.lower()
for w in [
"ad",
"banner",
"cookie",
"consent",
"popup",
"modal",
"share",
"like",
"comment",
]
)
):
tag.decompose()
# 移除按钮和链接区域
for tag in content_elem.find_all(
class_=lambda x: x
and any(w in x.lower() for w in ["btn", "button", "share", "reward"])
):
tag.decompose()
# 移除空段落
for tag in content_elem.find_all("p"):
if not tag.get_text(strip=True):
tag.decompose()
# 构建最终内容
if title:
markdown = f"# {title}\n\n"
else:
markdown = ""
cleaned_html = str(content_elem)
markdown += md(cleaned_html, heading_style="ATX")
# 清理多余空白
markdown = re.sub(r"\n{3,}", "\n\n", markdown)
markdown = re.sub(r" +\n", "\n", markdown) # 行尾空格
return markdown.strip()
def _to_plain_text(self, html):
"""提取纯文本"""
soup = BeautifulSoup(html, "html.parser")
main = soup.find("main") or soup.find("article") or soup.body
if not main:
return soup.get_text(separator="\n\n", strip=True)
for tag in main(["script", "style", "nav", "footer", "header", "aside"]):
tag.decompose()
text = main.get_text(separator="\n\n", strip=True)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def main():
"""命令行入口"""
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s: %(message)s",
stream=sys.stderr,
)
parser = argparse.ArgumentParser(
description="将网页转换为 Markdown 格式优先使用非Python方法",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""优先级方法:
1. Jina Reader API (https://r.jina.ai/URL) - 零安装
2. Firecrawl API (需要 FIRECRAWL_API_KEY)
3. Python实现 (回退)
示例:
%(prog)s https://www.breezedeus.com/article/ai-agent-context-engineering
%(prog)s https://arxiv.org/abs/2601.04500v1 --output paper.md
%(prog)s https://x.com/user/status/123 --pure-text
%(prog)s https://www.breezedeus.com/article/ai-agent-context-engineering --use-python # 强制使用Python方法
""",
)
parser.add_argument("url", help="要转换的网页 URL")
parser.add_argument(
"--pure-text", action="store_true", help="输出纯文本(无 Markdown 格式)"
)
parser.add_argument(
"--use-python",
action="store_true",
help="强制使用Python方法跳过Jina/Firecrawl",
)
parser.add_argument("--output", "-o", help="输出到文件")
args = parser.parse_args()
try:
with WebToMarkdown() as converter:
result = converter.convert(
args.url, pure_text=args.pure_text, use_python=args.use_python
)
if result is None:
logger.error("转换失败:所有方法均不可用")
sys.exit(1)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(result)
print(f"✓ 已保存到: {args.output}")
else:
print(result)
except Exception as e:
logger.error("错误: %s", e)
sys.exit(1)
if __name__ == "__main__":
main()