qwen_agent/utils/new_api_proxy.py
2026-02-26 00:35:42 +08:00

392 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
New API 代理工具类
用于与 New API 支付后端进行通信
"""
import logging
from typing import Optional, Dict, Any
import httpx
from utils.settings import NEW_API_BASE_URL, NEW_API_TIMEOUT
logger = logging.getLogger('app')
class NewAPIProxy:
"""New API 代理类,处理与支付后端的所有通信"""
def __init__(self):
self.base_url = NEW_API_BASE_URL.rstrip('/')
self.timeout = NEW_API_TIMEOUT
def _get_headers(
self,
cookies: Optional[Dict[str, str]] = None,
new_api_user_id: Optional[int] = None
) -> Dict[str, str]:
"""获取请求头"""
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
}
# 添加 New-Api-User header用户 ID
if new_api_user_id:
headers["New-Api-User"] = str(new_api_user_id)
return headers
async def _request(
self,
method: str,
endpoint: str,
cookies: Optional[Dict[str, str]] = None,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
new_api_user_id: Optional[int] = None
) -> Dict[str, Any]:
"""
发送请求到 New API
"""
url = f"{self.base_url}{endpoint}"
headers = self._get_headers(cookies, new_api_user_id)
async with httpx.AsyncClient(timeout=self.timeout) as client:
try:
response = await client.request(
method=method,
url=url,
headers=headers,
cookies=cookies,
json=data if data else None,
params=params
)
try:
result = response.json()
if response.cookies:
result["_cookies"] = dict(response.cookies)
return result
except Exception:
return {
"success": False,
"message": f"Invalid response: {response.text[:500]}",
"status_code": response.status_code
}
except httpx.TimeoutException:
logger.error(f"New API request timeout: {url}")
return {"success": False, "message": "Request timeout"}
except httpx.RequestError as e:
logger.error(f"New API request error: {e}")
return {"success": False, "message": str(e)}
async def _request_with_cookies(
self,
method: str,
endpoint: str,
cookies: Optional[Dict[str, str]] = None,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
new_api_user_id: Optional[int] = None
) -> tuple[Dict[str, Any], Dict[str, str]]:
"""发送请求到 New API并返回响应和 cookies"""
url = f"{self.base_url}{endpoint}"
headers = self._get_headers(cookies, new_api_user_id)
async with httpx.AsyncClient(timeout=self.timeout) as client:
try:
response = await client.request(
method=method,
url=url,
headers=headers,
cookies=cookies,
json=data if data else None,
params=params
)
new_cookies = dict(response.cookies) if response.cookies else {}
try:
result = response.json()
return result, new_cookies
except Exception:
return {
"success": False,
"message": f"Invalid response: {response.text[:500]}",
"status_code": response.status_code
}, new_cookies
except httpx.TimeoutException:
logger.error(f"New API request timeout: {url}")
return {"success": False, "message": "Request timeout"}, {}
except httpx.RequestError as e:
logger.error(f"New API request error: {e}")
return {"success": False, "message": str(e)}, {}
# ==================== 用户认证相关 ====================
async def login(self, username: str, password: str) -> Dict[str, Any]:
"""用户登录到 New API"""
result, cookies = await self._request_with_cookies(
"POST",
"/api/user/login",
data={"username": username, "password": password}
)
if result.get("success") and cookies:
result["session"] = cookies.get("session") or cookies.get("SESSION")
result["_cookies"] = cookies
return result
async def register(self, username: str, password: str, email: Optional[str] = None) -> Dict[str, Any]:
"""在 New API 注册用户"""
data = {"username": username, "password": password}
if email:
data["email"] = email
return await self._request("POST", "/api/user/register", data=data)
async def get_user_info(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""获取用户信息"""
return await self._request("GET", "/api/user/self", cookies=cookies, new_api_user_id=new_api_user_id)
# ==================== 充值/套餐相关 ====================
async def get_topup_info(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""获取充值信息,包含 creem_products 套餐列表"""
return await self._request("GET", "/api/user/topup/info", cookies=cookies, new_api_user_id=new_api_user_id)
async def get_subscription_plans(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""获取订阅计划列表"""
return await self._request("GET", "/api/subscription/plans", cookies=cookies, new_api_user_id=new_api_user_id)
# ==================== 支付相关 ====================
async def create_topup_order(
self,
cookies: Dict[str, str],
amount: int,
payment_method: str,
new_api_user_id: int
) -> Dict[str, Any]:
"""创建充值订单(易支付)"""
return await self._request(
"POST",
"/api/user/pay",
cookies=cookies,
data={"amount": amount, "payment_method": payment_method},
new_api_user_id=new_api_user_id
)
async def create_stripe_order(
self,
cookies: Dict[str, str],
amount: int,
new_api_user_id: int
) -> Dict[str, Any]:
"""创建 Stripe 支付订单"""
return await self._request(
"POST",
"/api/user/stripe/pay",
cookies=cookies,
data={"amount": amount},
new_api_user_id=new_api_user_id
)
async def create_creem_order(
self,
cookies: Dict[str, str],
product_id: str,
new_api_user_id: int
) -> Dict[str, Any]:
"""创建 Creem 支付订单"""
return await self._request(
"POST",
"/api/user/creem/pay",
cookies=cookies,
data={"product_id": product_id},
new_api_user_id=new_api_user_id
)
async def get_order_status(
self,
cookies: Dict[str, str],
order_id: str,
new_api_user_id: int
) -> Dict[str, Any]:
"""查询订单状态"""
return await self._request(
"GET",
f"/api/user/order/{order_id}",
cookies=cookies,
new_api_user_id=new_api_user_id
)
async def get_order_list(
self,
cookies: Dict[str, str],
page: int,
page_size: int,
new_api_user_id: int
) -> Dict[str, Any]:
"""获取订单列表(充值记录)"""
return await self._request(
"GET",
"/api/user/topup/self",
cookies=cookies,
params={"p": page, "size": page_size},
new_api_user_id=new_api_user_id
)
# ==================== 订阅相关 ====================
async def subscribe_plan(
self,
cookies: Dict[str, str],
plan_id: int,
new_api_user_id: int,
payment_method: str = "alipay"
) -> Dict[str, Any]:
"""订阅计划(使用易支付)"""
return await self._request(
"POST",
"/api/subscription/epay/pay",
cookies=cookies,
data={"plan_id": plan_id, "payment_method": payment_method},
new_api_user_id=new_api_user_id
)
async def get_subscription_status(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""获取订阅状态"""
return await self._request(
"GET",
"/api/subscription/self",
cookies=cookies,
new_api_user_id=new_api_user_id
)
async def cancel_subscription(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""取消订阅"""
return await self._request(
"POST",
"/api/subscription/cancel",
cookies=cookies,
new_api_user_id=new_api_user_id
)
# ==================== 额度/余额相关 ====================
async def get_quota(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""获取用户额度信息(从用户信息中获取)"""
return await self._request(
"GET",
"/api/user/self",
cookies=cookies,
new_api_user_id=new_api_user_id
)
# ==================== 模型相关 ====================
async def get_user_models(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""获取用户可用模型列表"""
return await self._request(
"GET",
"/api/user/models",
cookies=cookies,
new_api_user_id=new_api_user_id
)
# ==================== 令牌相关 ====================
async def get_tokens(
self,
cookies: Dict[str, str],
new_api_user_id: int,
page: int = 1,
page_size: int = 100
) -> Dict[str, Any]:
"""获取令牌列表"""
return await self._request(
"GET",
"/api/token/",
cookies=cookies,
params={"p": page, "page_size": page_size},
new_api_user_id=new_api_user_id
)
async def create_token(
self,
cookies: Dict[str, str],
new_api_user_id: int,
name: str = "API Token",
remain_quota: Optional[int] = None,
expired_time: Optional[int] = None,
unlimited_quota: bool = True
) -> Dict[str, Any]:
"""创建令牌"""
data = {
"name": name,
"unlimited_quota": unlimited_quota
}
if remain_quota is not None:
data["remain_quota"] = remain_quota
if expired_time is not None:
data["expired_time"] = expired_time
return await self._request(
"POST",
"/api/token/",
cookies=cookies,
data=data,
new_api_user_id=new_api_user_id
)
async def get_or_create_token(
self,
cookies: Dict[str, str],
new_api_user_id: int,
token_name: str = "Bot Manager Token"
) -> Dict[str, Any]:
"""获取或创建令牌(如果没有则创建一个)"""
# 先尝试获取现有令牌
tokens_result = await self.get_tokens(cookies, new_api_user_id)
if tokens_result.get("success") and tokens_result.get("data"):
# data 格式: {'page': 1, 'page_size': 100, 'total': 4, 'items': [...]}
tokens = tokens_result["data"].get("items", [])
if isinstance(tokens, list) and len(tokens) > 0:
# 返回第一个可用的令牌
return {
"success": True,
"data": tokens[0],
"message": "使用现有令牌"
}
# 没有令牌,创建一个新的
create_result = await self.create_token(
cookies,
new_api_user_id,
name=token_name
)
if create_result.get("success"):
# 创建成功后重新获取令牌列表
tokens_result = await self.get_tokens(cookies, new_api_user_id)
if tokens_result.get("success") and tokens_result.get("data"):
tokens = tokens_result["data"].get("items", [])
if isinstance(tokens, list) and len(tokens) > 0:
return {
"success": True,
"data": tokens[0],
"message": "已创建新令牌"
}
return create_result
# 全局单例
_new_api_proxy: Optional[NewAPIProxy] = None
def get_new_api_proxy() -> NewAPIProxy:
"""获取 New API 代理单例"""
global _new_api_proxy
if _new_api_proxy is None:
_new_api_proxy = NewAPIProxy()
return _new_api_proxy