Local-Voice/test_streaming.py
2025-09-20 20:13:55 +08:00

108 lines
3.2 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试流式响应解析的脚本
"""
import json
import requests
import os
def test_streaming_response():
"""测试流式响应解析"""
# 检查API密钥
api_key = os.environ.get("ARK_API_KEY")
if not api_key:
print("❌ 请设置 ARK_API_KEY 环境变量")
return
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
data = {
"messages": [
{
"content": "你是一个智能助手,请回答问题。",
"role": "system"
},
{
"content": "你好,请简单介绍一下自己",
"role": "user"
}
],
"model": "doubao-1-5-pro-32k-250115",
"stream": True
}
print("🚀 开始测试流式响应...")
try:
response = requests.post(
"https://ark.cn-beijing.volces.com/api/v3/chat/completions",
headers=headers,
json=data,
stream=True,
timeout=30
)
print(f"📊 响应状态: {response.status_code}")
if response.status_code != 200:
print(f"❌ 请求失败: {response.text}")
return
print("🔍 开始解析流式响应...")
accumulated_text = ""
line_count = 0
for line in response.iter_lines(decode_unicode=True):
line_count += 1
if not line or not line.strip():
continue
# 预处理
line = line.strip()
print(f"\n--- 第{line_count}行 ---")
print(f"原始内容: {repr(line)}")
if line.startswith("data: "):
data_str = line[6:] # 移除 "data: " 前缀
print(f"处理后: {repr(data_str)}")
if data_str == "[DONE]":
print("✅ 流结束")
break
try:
chunk_data = json.loads(data_str)
print(f"✅ JSON解析成功: {chunk_data}")
if "choices" in chunk_data and len(chunk_data["choices"]) > 0:
delta = chunk_data["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
accumulated_text += content
print(f"💬 累计内容: {accumulated_text}")
except json.JSONDecodeError as e:
print(f"❌ JSON解析失败: {e}")
print(f"🔍 问题数据: {repr(data_str)}")
except Exception as e:
print(f"❌ 其他错误: {e}")
print(f"\n✅ 测试完成,总共处理了 {line_count}")
print(f"📝 最终内容: {accumulated_text}")
except Exception as e:
print(f"❌ 测试失败: {e}")
if __name__ == "__main__":
test_streaming_response()