388 lines
13 KiB
Python
388 lines
13 KiB
Python
"""
|
||
New API 代理工具类
|
||
用于与 New API 支付后端进行通信
|
||
"""
|
||
import logging
|
||
from typing import Optional, Dict, Any
|
||
import httpx
|
||
|
||
from utils.settings import NEW_API_BASE_URL, NEW_API_TIMEOUT
|
||
|
||
logger = logging.getLogger('app')
|
||
|
||
|
||
class NewAPIProxy:
|
||
"""New API 代理类,处理与支付后端的所有通信"""
|
||
|
||
def __init__(self):
|
||
self.base_url = NEW_API_BASE_URL.rstrip('/')
|
||
self.timeout = NEW_API_TIMEOUT
|
||
|
||
def _get_headers(
|
||
self,
|
||
cookies: Optional[Dict[str, str]] = None,
|
||
new_api_user_id: Optional[int] = None
|
||
) -> Dict[str, str]:
|
||
"""获取请求头"""
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Accept": "application/json",
|
||
}
|
||
# 添加 New-Api-User header(用户 ID)
|
||
if new_api_user_id:
|
||
headers["New-Api-User"] = str(new_api_user_id)
|
||
return headers
|
||
|
||
async def _request(
|
||
self,
|
||
method: str,
|
||
endpoint: str,
|
||
cookies: Optional[Dict[str, str]] = None,
|
||
data: Optional[Dict[str, Any]] = None,
|
||
params: Optional[Dict[str, Any]] = None,
|
||
new_api_user_id: Optional[int] = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
发送请求到 New API
|
||
"""
|
||
url = f"{self.base_url}{endpoint}"
|
||
headers = self._get_headers(cookies, new_api_user_id)
|
||
|
||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||
try:
|
||
response = await client.request(
|
||
method=method,
|
||
url=url,
|
||
headers=headers,
|
||
cookies=cookies,
|
||
json=data if data else None,
|
||
params=params
|
||
)
|
||
|
||
try:
|
||
result = response.json()
|
||
if response.cookies:
|
||
result["_cookies"] = dict(response.cookies)
|
||
return result
|
||
except Exception:
|
||
return {
|
||
"success": False,
|
||
"message": f"Invalid response: {response.text[:500]}",
|
||
"status_code": response.status_code
|
||
}
|
||
|
||
except httpx.TimeoutException:
|
||
logger.error(f"New API request timeout: {url}")
|
||
return {"success": False, "message": "Request timeout"}
|
||
except httpx.RequestError as e:
|
||
logger.error(f"New API request error: {e}")
|
||
return {"success": False, "message": str(e)}
|
||
|
||
async def _request_with_cookies(
|
||
self,
|
||
method: str,
|
||
endpoint: str,
|
||
cookies: Optional[Dict[str, str]] = None,
|
||
data: Optional[Dict[str, Any]] = None,
|
||
params: Optional[Dict[str, Any]] = None,
|
||
new_api_user_id: Optional[int] = None
|
||
) -> tuple[Dict[str, Any], Dict[str, str]]:
|
||
"""发送请求到 New API,并返回响应和 cookies"""
|
||
url = f"{self.base_url}{endpoint}"
|
||
headers = self._get_headers(cookies, new_api_user_id)
|
||
|
||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||
try:
|
||
response = await client.request(
|
||
method=method,
|
||
url=url,
|
||
headers=headers,
|
||
cookies=cookies,
|
||
json=data if data else None,
|
||
params=params
|
||
)
|
||
|
||
new_cookies = dict(response.cookies) if response.cookies else {}
|
||
|
||
try:
|
||
result = response.json()
|
||
return result, new_cookies
|
||
except Exception:
|
||
return {
|
||
"success": False,
|
||
"message": f"Invalid response: {response.text[:500]}",
|
||
"status_code": response.status_code
|
||
}, new_cookies
|
||
|
||
except httpx.TimeoutException:
|
||
logger.error(f"New API request timeout: {url}")
|
||
return {"success": False, "message": "Request timeout"}, {}
|
||
except httpx.RequestError as e:
|
||
logger.error(f"New API request error: {e}")
|
||
return {"success": False, "message": str(e)}, {}
|
||
|
||
# ==================== 用户认证相关 ====================
|
||
|
||
async def login(self, username: str, password: str) -> Dict[str, Any]:
|
||
"""用户登录到 New API"""
|
||
result, cookies = await self._request_with_cookies(
|
||
"POST",
|
||
"/api/user/login",
|
||
data={"username": username, "password": password}
|
||
)
|
||
|
||
if result.get("success") and cookies:
|
||
result["session"] = cookies.get("session") or cookies.get("SESSION")
|
||
result["_cookies"] = cookies
|
||
|
||
return result
|
||
|
||
async def register(self, username: str, password: str, email: Optional[str] = None) -> Dict[str, Any]:
|
||
"""在 New API 注册用户"""
|
||
data = {"username": username, "password": password}
|
||
if email:
|
||
data["email"] = email
|
||
return await self._request("POST", "/api/user/register", data=data)
|
||
|
||
async def get_user_info(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
|
||
"""获取用户信息"""
|
||
return await self._request("GET", "/api/user/self", cookies=cookies, new_api_user_id=new_api_user_id)
|
||
|
||
# ==================== 充值/套餐相关 ====================
|
||
|
||
async def get_topup_info(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
|
||
"""获取充值信息,包含 creem_products 套餐列表"""
|
||
return await self._request("GET", "/api/user/topup/info", cookies=cookies, new_api_user_id=new_api_user_id)
|
||
|
||
async def get_subscription_plans(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
|
||
"""获取订阅计划列表"""
|
||
return await self._request("GET", "/api/subscription/plans", cookies=cookies, new_api_user_id=new_api_user_id)
|
||
|
||
# ==================== 支付相关 ====================
|
||
|
||
async def create_topup_order(
|
||
self,
|
||
cookies: Dict[str, str],
|
||
amount: int,
|
||
payment_method: str,
|
||
new_api_user_id: int
|
||
) -> Dict[str, Any]:
|
||
"""创建充值订单(易支付)"""
|
||
return await self._request(
|
||
"POST",
|
||
"/api/user/pay",
|
||
cookies=cookies,
|
||
data={"amount": amount, "payment_method": payment_method},
|
||
new_api_user_id=new_api_user_id
|
||
)
|
||
|
||
async def create_stripe_order(
|
||
self,
|
||
cookies: Dict[str, str],
|
||
amount: int,
|
||
new_api_user_id: int
|
||
) -> Dict[str, Any]:
|
||
"""创建 Stripe 支付订单"""
|
||
return await self._request(
|
||
"POST",
|
||
"/api/user/stripe/pay",
|
||
cookies=cookies,
|
||
data={"amount": amount},
|
||
new_api_user_id=new_api_user_id
|
||
)
|
||
|
||
async def create_creem_order(
|
||
self,
|
||
cookies: Dict[str, str],
|
||
product_id: str,
|
||
new_api_user_id: int
|
||
) -> Dict[str, Any]:
|
||
"""创建 Creem 支付订单"""
|
||
return await self._request(
|
||
"POST",
|
||
"/api/user/creem/pay",
|
||
cookies=cookies,
|
||
data={"product_id": product_id},
|
||
new_api_user_id=new_api_user_id
|
||
)
|
||
|
||
async def get_order_status(
|
||
self,
|
||
cookies: Dict[str, str],
|
||
order_id: str,
|
||
new_api_user_id: int
|
||
) -> Dict[str, Any]:
|
||
"""查询订单状态"""
|
||
return await self._request(
|
||
"GET",
|
||
f"/api/user/order/{order_id}",
|
||
cookies=cookies,
|
||
new_api_user_id=new_api_user_id
|
||
)
|
||
|
||
async def get_order_list(
|
||
self,
|
||
cookies: Dict[str, str],
|
||
page: int,
|
||
page_size: int,
|
||
new_api_user_id: int
|
||
) -> Dict[str, Any]:
|
||
"""获取订单列表(充值记录)"""
|
||
return await self._request(
|
||
"GET",
|
||
"/api/user/topup/self",
|
||
cookies=cookies,
|
||
params={"p": page, "size": page_size},
|
||
new_api_user_id=new_api_user_id
|
||
)
|
||
|
||
# ==================== 订阅相关 ====================
|
||
|
||
async def subscribe_plan(
|
||
self,
|
||
cookies: Dict[str, str],
|
||
plan_id: int,
|
||
new_api_user_id: int,
|
||
payment_method: str = "alipay"
|
||
) -> Dict[str, Any]:
|
||
"""订阅计划(使用易支付)"""
|
||
return await self._request(
|
||
"POST",
|
||
"/api/subscription/epay/pay",
|
||
cookies=cookies,
|
||
data={"plan_id": plan_id, "payment_method": payment_method},
|
||
new_api_user_id=new_api_user_id
|
||
)
|
||
|
||
async def get_subscription_status(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
|
||
"""获取订阅状态"""
|
||
return await self._request(
|
||
"GET",
|
||
"/api/subscription/self",
|
||
cookies=cookies,
|
||
new_api_user_id=new_api_user_id
|
||
)
|
||
|
||
async def cancel_subscription(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
|
||
"""取消订阅"""
|
||
return await self._request(
|
||
"POST",
|
||
"/api/subscription/cancel",
|
||
cookies=cookies,
|
||
new_api_user_id=new_api_user_id
|
||
)
|
||
|
||
# ==================== 额度/余额相关 ====================
|
||
|
||
async def get_quota(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
|
||
"""获取用户额度信息(从用户信息中获取)"""
|
||
return await self._request(
|
||
"GET",
|
||
"/api/user/self",
|
||
cookies=cookies,
|
||
new_api_user_id=new_api_user_id
|
||
)
|
||
|
||
# ==================== 模型相关 ====================
|
||
|
||
async def get_user_models(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
|
||
"""获取用户可用模型列表"""
|
||
return await self._request(
|
||
"GET",
|
||
"/api/user/models",
|
||
cookies=cookies,
|
||
new_api_user_id=new_api_user_id
|
||
)
|
||
|
||
# ==================== 令牌相关 ====================
|
||
|
||
async def get_tokens(
|
||
self,
|
||
cookies: Dict[str, str],
|
||
new_api_user_id: int,
|
||
page: int = 1,
|
||
page_size: int = 100
|
||
) -> Dict[str, Any]:
|
||
"""获取令牌列表"""
|
||
return await self._request(
|
||
"GET",
|
||
"/api/token/",
|
||
cookies=cookies,
|
||
params={"p": page, "page_size": page_size},
|
||
new_api_user_id=new_api_user_id
|
||
)
|
||
|
||
async def create_token(
|
||
self,
|
||
cookies: Dict[str, str],
|
||
new_api_user_id: int,
|
||
name: str = "API Token",
|
||
remain_quota: Optional[int] = None,
|
||
expired_time: Optional[int] = None,
|
||
unlimited_quota: bool = True
|
||
) -> Dict[str, Any]:
|
||
"""创建令牌"""
|
||
data = {
|
||
"name": name,
|
||
"unlimited_quota": unlimited_quota
|
||
}
|
||
if remain_quota is not None:
|
||
data["remain_quota"] = remain_quota
|
||
if expired_time is not None:
|
||
data["expired_time"] = expired_time
|
||
return await self._request(
|
||
"POST",
|
||
"/api/token/",
|
||
cookies=cookies,
|
||
data=data,
|
||
new_api_user_id=new_api_user_id
|
||
)
|
||
|
||
async def get_or_create_token(
|
||
self,
|
||
cookies: Dict[str, str],
|
||
new_api_user_id: int,
|
||
token_name: str = "Bot Manager Token"
|
||
) -> Dict[str, Any]:
|
||
"""获取或创建令牌(如果没有则创建一个)"""
|
||
# 先尝试获取现有令牌
|
||
tokens_result = await self.get_tokens(cookies, new_api_user_id)
|
||
|
||
if tokens_result.get("success") and tokens_result.get("data"):
|
||
tokens = tokens_result["data"]
|
||
if isinstance(tokens, list) and len(tokens) > 0:
|
||
# 返回第一个可用的令牌
|
||
return {
|
||
"success": True,
|
||
"data": tokens[0],
|
||
"message": "使用现有令牌"
|
||
}
|
||
|
||
# 没有令牌,创建一个新的
|
||
create_result = await self.create_token(
|
||
cookies,
|
||
new_api_user_id,
|
||
name=token_name
|
||
)
|
||
|
||
if create_result.get("success"):
|
||
return {
|
||
"success": True,
|
||
"data": create_result.get("data"),
|
||
"message": "已创建新令牌"
|
||
}
|
||
|
||
return create_result
|
||
|
||
|
||
# 全局单例
|
||
_new_api_proxy: Optional[NewAPIProxy] = None
|
||
|
||
|
||
def get_new_api_proxy() -> NewAPIProxy:
|
||
"""获取 New API 代理单例"""
|
||
global _new_api_proxy
|
||
if _new_api_proxy is None:
|
||
_new_api_proxy = NewAPIProxy()
|
||
return _new_api_proxy
|