From d7dfd8810edc5d0ca008c405823b1f198d7207f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Wed, 25 Feb 2026 23:42:44 +0800 Subject: [PATCH] update payment --- fastapi_app.py | 3 +- plans/new-api-payment-integration.md | 756 +++++++++++++++++++++++++++ routes/bot_manager.py | 256 ++++++++- routes/payment.py | 381 ++++++++++++++ utils/fastapi_utils.py | 54 +- utils/new_api_proxy.py | 387 ++++++++++++++ utils/settings.py | 13 + 7 files changed, 1802 insertions(+), 48 deletions(-) create mode 100644 plans/new-api-payment-integration.md create mode 100644 routes/payment.py create mode 100644 utils/new_api_proxy.py diff --git a/fastapi_app.py b/fastapi_app.py index 41842d5..176c40e 100644 --- a/fastapi_app.py +++ b/fastapi_app.py @@ -81,7 +81,7 @@ from utils.log_util.logger import init_with_fastapi logger = logging.getLogger('app') # Import route modules -from routes import chat, files, projects, system, skill_manager, database, bot_manager, knowledge_base +from routes import chat, files, projects, system, skill_manager, database, bot_manager, knowledge_base, payment @asynccontextmanager @@ -200,6 +200,7 @@ app.include_router(system.router) app.include_router(skill_manager.router) app.include_router(database.router) app.include_router(bot_manager.router) +app.include_router(payment.router) # 注册文件管理API路由 app.include_router(file_manager_router) diff --git a/plans/new-api-payment-integration.md b/plans/new-api-payment-integration.md new file mode 100644 index 0000000..7e39b88 --- /dev/null +++ b/plans/new-api-payment-integration.md @@ -0,0 +1,756 @@ +# New API 支付系统集成计划(简化版) + +## 概述 + +将 New API 作为 qwen-client 和 qwen-agent 的支付后端。qwen-agent 只做代理转发,不维护配额和交易数据。 + +**创建日期**: 2026-02-25 +**更新日期**: 2026-02-25 +**状态**: 待审核 +**复杂度**: 中 + +--- + +## 核心设计 + +### 设计原则 + +1. **New API 是数据源**: 所有配额、交易、订单数据由 New API 维护 +2. **qwen-agent 是代理层**: 只转发请求,不存储支付相关数据 +3. **Session 同步登录**: 用户登录 qwen-agent 时,同步登录 New API 并存储 session +4. **Cookie 转发**: 代理请求时转发用户的 session cookie 实现 New API 鉴权 + +### 架构图 + +``` +┌─────────────┐ ┌─────────────────┐ ┌─────────────┐ +│ qwen-client │ ───► │ qwen-agent │ ───► │ New API │ +│ (前端) │ │ (代理 + Session)│ │ (支付后端) │ +└─────────────┘ └─────────────────┘ └─────────────┘ + │ + ▼ + 存储用户对应的 + New API Session Cookie +``` + +### 支付方式支持 + +| 支付方式 | New API 接口 | 说明 | +|---------|-------------|------| +| 易支付 | `/api/user/pay` | 通用支付方式(微信/支付宝等) | +| Stripe | `/api/user/stripe/pay` | 国际信用卡支付 | +| Creem | `/api/user/creem/pay` | 新支付方式,支持套餐 | + +--- + +## 数据库设计 + +### 只需新增一个字段 + +```sql +-- 在 agent_user 表添加 New API Session +ALTER TABLE agent_user ADD COLUMN new_api_session VARCHAR(500); +ALTER TABLE agent_user ADD COLUMN new_api_user_id INTEGER; +``` + +**不需要新建表**,所有配额、交易、订单数据都在 New API 中。 + +--- + +## 核心流程 + +### 1. 用户登录同步 + +``` +用户登录 qwen-client + │ + ▼ +qwen-agent 验证本地用户 + │ + ▼ +qwen-agent 用相同凭证调用 New API /api/user/login + │ + ├─ 成功 → 存储 new_api_session 到 agent_user 表 + │ + └─ 失败 → 如果是用户不存在,先在 New API 注册再登录 + │ + ▼ +返回 qwen-agent 的 token 给前端 +``` + +### 2. 支付/充值流程 + +``` +用户点击充值 + │ + ▼ +qwen-agent 读取用户的 new_api_session + │ + ▼ +代理请求到 New API (带 session cookie) + │ + ▼ +返回支付链接给前端 + │ + ▼ +用户完成支付 (New API 直接处理) + │ + ▼ +前端跳转到 New API 的用户中心查看结果 +``` + +### 3. 配额检查 + +``` +用户发起聊天请求 + │ + ▼ +qwen-agent 调用 New API /api/user/self 获取实时配额 + │ + ├─ 配额不足 → 返回 402 错误 + New API 充值链接 + │ + └─ 配额充足 → 处理请求 + │ + ▼ +(可选) 调用 New API 扣减配额 +``` + +--- + +## API 设计 + +### qwen-agent 新增接口 + +| 端点 | 方法 | 描述 | 代理的 New API 接口 | +|------|------|------|---------------------| +| `/api/v1/payment/info` | GET | 获取充值配置(含套餐) | `/api/user/topup/info` | +| `/api/v1/payment/plans` | GET | 获取订阅套餐列表 | `/api/subscription/plans` | +| `/api/v1/payment/create` | POST | 创建支付订单(易支付) | `/api/user/pay` | +| `/api/v1/payment/stripe` | POST | 创建 Stripe 支付 | `/api/user/stripe/pay` | +| `/api/v1/payment/creem` | POST | 创建 Creem 支付(套餐) | `/api/user/creem/pay` | +| `/api/v1/payment/amount` | POST | 计算实际支付金额 | `/api/user/amount` | +| `/api/v1/quota` | GET | 查询用户配额 | `/api/user/self` | +| `/api/v1/topup/history` | GET | 充值记录 | `/api/user/topup/self` | + +所有接口都是代理转发,使用存储的 new_api_session。 + +--- + +## New API 接口详情 + +### 1. 获取充值配置 `/api/user/topup/info` (GET) + +**鉴权**: 需要登录(Cookie) + +**响应示例**: +```json +{ + "success": true, + "message": "", + "data": { + "enable_online_topup": true, + "enable_stripe_topup": true, + "enable_creem_topup": true, + "creem_products": [ + { + "productId": "prod_xxx", + "name": "基础套餐", + "price": 9.99, + "currency": "USD", + "quota": 500000 + } + ], + "pay_methods": [ + {"name": "微信支付", "type": "wxpay", "color": "#07C160", "min_topup": "1"}, + {"name": "支付宝", "type": "alipay", "color": "#1677FF", "min_topup": "1"} + ], + "min_topup": 1, + "stripe_min_topup": 1, + "amount_options": [100, 500, 1000, 5000], + "discount": { + "100": 1.0, + "500": 0.95, + "1000": 0.9, + "5000": 0.85 + } + } +} +``` + +**字段说明**: +- `enable_online_topup`: 是否启用易支付 +- `enable_stripe_topup`: 是否启用 Stripe 支付 +- `enable_creem_topup`: 是否启用 Creem 支付 +- `creem_products`: Creem 套餐列表(**这是套餐列表的数据来源**) +- `pay_methods`: 支持的支付方式列表 +- `min_topup`: 最小充值额度 +- `amount_options`: 预设充值额度选项 +- `discount`: 折扣配置(额度 -> 折扣率) + +### 2. 获取订阅套餐列表 `/api/subscription/plans` (GET) + +**鉴权**: 需要登录(Cookie) + +**响应示例**: +```json +{ + "success": true, + "data": [ + { + "plan": { + "id": 1, + "title": "月度订阅", + "subtitle": "适合轻度用户", + "price_amount": 9.99, + "currency": "USD", + "duration_unit": "month", + "duration_value": 1, + "total_amount": 500000, + "enabled": true, + "sort_order": 100, + "stripe_price_id": "price_xxx", + "creem_product_id": "prod_xxx", + "max_purchase_per_user": 0, + "upgrade_group": "", + "quota_reset_period": "monthly" + } + } + ] +} +``` + +**字段说明**: +- `title`: 套餐名称 +- `price_amount`: 价格 +- `duration_unit`: 周期单位(month/year/custom) +- `duration_value`: 周期数值 +- `total_amount`: 包含的配额 +- `stripe_price_id`: Stripe 价格 ID +- `creem_product_id`: Creem 产品 ID + +### 3. 创建易支付订单 `/api/user/pay` (POST) + +**请求**: +```json +{ + "amount": 1000, + "payment_method": "wxpay" +} +``` + +**响应**: +```json +{ + "message": "success", + "data": {...}, + "url": "https://pay.example.com/submit.php?..." +} +``` + +### 4. 创建 Creem 支付 `/api/user/creem/pay` (POST) + +**请求**: +```json +{ + "product_id": "prod_xxx", + "payment_method": "creem" +} +``` + +**响应**: +```json +{ + "message": "success", + "data": { + "checkout_url": "https://checkout.creem.io/...", + "order_id": "ref_xxx" + } +} +``` + +### 5. 获取用户信息(含配额)`/api/user/self` (GET) + +**鉴权**: 需要登录(Cookie) + +**响应示例**: +```json +{ + "success": true, + "data": { + "id": 1, + "username": "user", + "email": "user@example.com", + "quota": 500000, + "used_quota": 10000, + "group": "default" + } +} +``` + +### 6. 用户登录 `/api/user/login` (POST) + +**请求**: +```json +{ + "username": "user", + "password": "password" +} +``` + +**响应**: +- 返回 Set-Cookie header 包含 session +- 用户信息 JSON + +--- + +## 实施步骤 + +### Step 1: 添加环境变量配置 + +在 `utils/settings.py` 添加 New API 配置: + +```python +from pydantic_settings import BaseSettings + +class Settings(BaseSettings): + # ... 现有配置 ... + + # New API 配置 + NEW_API_BASE_URL: str = "https://new-api.example.com" + NEW_API_TIMEOUT: int = 30 + + class Config: + env_file = ".env" +``` + +### Step 2: 创建 New API 代理工具类 + +创建 `utils/new_api_proxy.py`: + +```python +import httpx +from typing import Optional, Any +from utils.settings import Settings + +settings = Settings() + +class NewAPIProxy: + """New API 代理工具类""" + + def __init__(self): + self.base_url = settings.NEW_API_BASE_URL + self.timeout = settings.NEW_API_TIMEOUT + + async def login(self, username: str, password: str) -> Optional[str]: + """登录 New API 并返回 session cookie""" + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{self.base_url}/api/user/login", + json={"username": username, "password": password}, + timeout=self.timeout + ) + if resp.status_code == 200 and resp.json().get("success"): + # 提取 session cookie + cookies = resp.cookies.get("session") + return cookies + return None + + async def register(self, username: str, password: str, email: str) -> bool: + """在 New API 注册用户""" + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{self.base_url}/api/user/register", + json={"username": username, "password": password, "email": email}, + timeout=self.timeout + ) + return resp.status_code == 200 + + async def proxy_get( + self, + endpoint: str, + session_cookie: str + ) -> dict: + """代理 GET 请求到 New API""" + async with httpx.AsyncClient() as client: + resp = await client.get( + f"{self.base_url}{endpoint}", + cookies={"session": session_cookie}, + timeout=self.timeout + ) + return resp.json() + + async def proxy_post( + self, + endpoint: str, + data: dict, + session_cookie: str + ) -> dict: + """代理 POST 请求到 New API""" + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{self.base_url}{endpoint}", + json=data, + cookies={"session": session_cookie}, + timeout=self.timeout + ) + return resp.json() + + async def get_user_info(self, session_cookie: str) -> Optional[dict]: + """获取用户信息(含配额)""" + try: + result = await self.proxy_get("/api/user/self", session_cookie) + if result.get("success"): + return result.get("data") + except: + pass + return None + + async def check_quota(self, session_cookie: str) -> tuple[bool, int]: + """检查用户配额""" + user_info = await self.get_user_info(session_cookie) + if user_info: + quota = user_info.get("quota", 0) + return quota > 0, quota + # 降级策略:无法获取配额时允许使用 + return True, -1 + +# 全局实例 +new_api_proxy = NewAPIProxy() +``` + +### Step 3: 修改登录逻辑 + +修改 `routes/bot_manager.py` 中的登录函数: + +```python +from utils.new_api_proxy import new_api_proxy + +async def login_user(username: str, password: str): + # 1. 验证本地用户 + user = await verify_local_user(username, password) + + # 2. 同步登录 New API + new_api_session = await new_api_proxy.login(username, password) + + # 3. 如果登录失败,尝试注册 + if not new_api_session: + await new_api_proxy.register(username, password, user.email) + new_api_session = await new_api_proxy.login(username, password) + + # 4. 存储 session + if new_api_session: + await db.execute( + "UPDATE agent_user SET new_api_session = $1 WHERE id = $2", + new_api_session, user.id + ) + + # 5. 返回本地 token + return generate_local_token(user) +``` + +### Step 4: 创建支付代理路由 + +创建 `routes/payment.py`: + +```python +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from typing import Optional +from utils.new_api_proxy import new_api_proxy +from utils.auth import get_current_user + +router = APIRouter(prefix="/api/v1", tags=["payment"]) + +class CreatePaymentRequest(BaseModel): + amount: int + payment_method: str + +class CreemPaymentRequest(BaseModel): + product_id: str + payment_method: str = "creem" + +class AmountRequest(BaseModel): + amount: int + +@router.get("/payment/info") +async def get_payment_info(user = Depends(get_current_user)): + """获取充值配置(含套餐列表)""" + if not user.new_api_session: + raise HTTPException(status_code=401, detail="未同步 New API session") + return await new_api_proxy.proxy_get("/api/user/topup/info", user.new_api_session) + +@router.get("/payment/plans") +async def get_subscription_plans(user = Depends(get_current_user)): + """获取订阅套餐列表""" + if not user.new_api_session: + raise HTTPException(status_code=401, detail="未同步 New API session") + return await new_api_proxy.proxy_get("/api/subscription/plans", user.new_api_session) + +@router.post("/payment/create") +async def create_payment(req: CreatePaymentRequest, user = Depends(get_current_user)): + """创建支付订单(易支付)""" + if not user.new_api_session: + raise HTTPException(status_code=401, detail="未同步 New API session") + return await new_api_proxy.proxy_post( + "/api/user/pay", + {"amount": req.amount, "payment_method": req.payment_method}, + user.new_api_session + ) + +@router.post("/payment/stripe") +async def create_stripe_payment(req: AmountRequest, user = Depends(get_current_user)): + """创建 Stripe 支付""" + if not user.new_api_session: + raise HTTPException(status_code=401, detail="未同步 New API session") + return await new_api_proxy.proxy_post( + "/api/user/stripe/pay", + {"amount": req.amount}, + user.new_api_session + ) + +@router.post("/payment/creem") +async def create_creem_payment(req: CreemPaymentRequest, user = Depends(get_current_user)): + """创建 Creem 支付(套餐)""" + if not user.new_api_session: + raise HTTPException(status_code=401, detail="未同步 New API session") + return await new_api_proxy.proxy_post( + "/api/user/creem/pay", + {"product_id": req.product_id, "payment_method": req.payment_method}, + user.new_api_session + ) + +@router.post("/payment/amount") +async def get_payment_amount(req: AmountRequest, user = Depends(get_current_user)): + """计算实际支付金额""" + if not user.new_api_session: + raise HTTPException(status_code=401, detail="未同步 New API session") + return await new_api_proxy.proxy_post( + "/api/user/amount", + {"amount": req.amount}, + user.new_api_session + ) + +@router.get("/quota") +async def get_quota(user = Depends(get_current_user)): + """获取用户配额""" + if not user.new_api_session: + raise HTTPException(status_code=401, detail="未同步 New API session") + user_info = await new_api_proxy.get_user_info(user.new_api_session) + if user_info: + return { + "quota": user_info.get("quota", 0), + "used_quota": user_info.get("used_quota", 0) + } + raise HTTPException(status_code=500, detail="获取配额失败") + +@router.get("/topup/history") +async def get_topup_history(user = Depends(get_current_user)): + """获取充值记录""" + if not user.new_api_session: + raise HTTPException(status_code=401, detail="未同步 New API session") + return await new_api_proxy.proxy_get("/api/user/topup/self", user.new_api_session) +``` + +### Step 5: 在 Chat 路由添加配额检查 + +在 `routes/chat.py` 添加配额检查中间件: + +```python +from utils.new_api_proxy import new_api_proxy + +async def check_quota_before_chat(user) -> tuple[bool, int]: + """聊天前检查配额""" + if not user.new_api_session: + # 没有 session,允许通过(可能是非支付用户) + return True, -1 + + try: + has_quota, quota = await new_api_proxy.check_quota(user.new_api_session) + return has_quota, quota + except Exception as e: + # 降级策略:New API 不可用时允许使用 + logger.warning(f"配额检查失败: {e}") + return True, -1 + +# 在 chat 处理函数中调用 +async def handle_chat(request, user): + has_quota, quota = await check_quota_before_chat(user) + if not has_quota: + raise HTTPException( + status_code=402, + detail={ + "error": "quota_exceeded", + "message": "配额不足,请充值", + "topup_url": f"{settings.NEW_API_BASE_URL}/console/topup" + } + ) + # ... 继续处理聊天请求 ... +``` + +### Step 6: 注册路由 + +在 `fastapi_app.py` 中注册新路由: + +```python +from routes.payment import router as payment_router + +app.include_router(payment_router) +``` + +### Step 7: 数据库迁移 + +```sql +-- 在 agent_user 表添加 New API Session +ALTER TABLE agent_user ADD COLUMN new_api_session VARCHAR(500); +ALTER TABLE agent_user ADD COLUMN new_api_user_id INTEGER; +``` + +--- + +## 环境变量 + +```bash +# .env +NEW_API_BASE_URL=https://new-api.example.com +NEW_API_TIMEOUT=30 +``` + +--- + +## 前端集成指南 + +### 1. 充值页面流程 + +```typescript +// 1. 获取充值配置 +const paymentInfo = await fetch('/api/v1/payment/info'); + +// 2. 根据返回的支付方式展示选项 +const { pay_methods, creem_products, amount_options, discount } = paymentInfo.data; + +// 3. 用户选择套餐或输入金额 +// - 如果选择 Creem 套餐:调用 /api/v1/payment/creem +// - 如果选择自定义金额:调用 /api/v1/payment/create + +// 4. 获取支付链接并跳转 +const result = await fetch('/api/v1/payment/creem', { + method: 'POST', + body: JSON.stringify({ product_id: 'prod_xxx' }) +}); + +// 5. 跳转到支付页面 +window.location.href = result.data.checkout_url; +``` + +### 2. 显示配额余额 + +```typescript +// 获取用户配额 +const quota = await fetch('/api/v1/quota'); + +// 显示格式化后的配额 +// quota.quota - 当前可用配额 +// quota.used_quota - 已使用配额 +``` + +### 3. 配额不足处理 + +```typescript +// 当聊天返回 402 错误时 +if (error.status === 402) { + const { topup_url } = error.detail; + // 显示充值提示 + // 提供"去充值"按钮,跳转到 topup_url +} +``` + +--- + +## 用户体验流程 + +### 充值流程(易支付) + +``` +1. 用户点击"充值"按钮 +2. 前端调用 GET /api/v1/payment/info 获取配置 +3. 展示支付方式和预设金额选项 +4. 用户选择金额和支付方式 +5. 前端调用 POST /api/v1/payment/create +6. 获取支付链接,跳转到支付页面 +7. 用户完成支付 +8. 支付成功后跳转回前端 +``` + +### 充值流程(Creem 套餐) + +``` +1. 用户点击"购买套餐" +2. 前端调用 GET /api/v1/payment/info 获取 creem_products +3. 展示套餐列表(名称、价格、配额) +4. 用户选择套餐 +5. 前端调用 POST /api/v1/payment/creem +6. 获取 checkout_url,跳转到 Creem 支付页面 +7. 用户完成支付 +8. 自动跳转回前端 +``` + +### 配额检查 + +``` +1. 用户发送聊天消息 +2. qwen-agent 检查配额 +3. 如果配额充足:正常处理 +4. 如果配额不足:返回 402 + 充值链接 +5. 前端显示"配额不足,请充值"提示 +``` + +--- + +## 总结 + +简化方案的核心是 **不重复存储数据**,让 New API 作为唯一的支付数据源。qwen-agent 只负责: + +1. 用户登录时同步 New API session +2. 代理转发支付相关请求 +3. 实时查询配额进行检查 + +### 套餐列表获取方式 + +套餐列表有两个来源: + +1. **Creem 产品列表**(推荐) + - 接口:`/api/user/topup/info` → `creem_products` 字段 + - 适用于一次性购买配额包 + +2. **订阅套餐列表** + - 接口:`/api/subscription/plans` + - 适用于周期性订阅 + +### 与原方案对比 + +| 项目 | 原方案 | 简化方案 | +|------|--------|----------| +| 新建数据库表 | 4 个 | 0 个 | +| 新增字段 | 多个 | 2 个 | +| 本地配额管理 | 需要 | 不需要 | +| Webhook 处理 | 需要 | 不需要 | +| 交易记录 | 本地存储 | New API 管理 | +| 实施周期 | 2-3 周 | 3-5 天 | + +### 文件变更清单 + +| 文件 | 操作 | 说明 | +|------|------|------| +| `utils/settings.py` | 修改 | 添加 New API 环境变量 | +| `utils/new_api_proxy.py` | 新建 | New API 代理工具类 | +| `routes/payment.py` | 新建 | 支付代理路由 | +| `routes/bot_manager.py` | 修改 | 登录逻辑添加 session 同步 | +| `routes/chat.py` | 修改 | 添加配额检查 | +| `fastapi_app.py` | 修改 | 注册新路由 | +| 数据库迁移 | 执行 | 添加 new_api_session 字段 | + +### 依赖添加 + +```bash +poetry add httpx +``` diff --git a/routes/bot_manager.py b/routes/bot_manager.py index 4c38dc6..4adecfd 100644 --- a/routes/bot_manager.py +++ b/routes/bot_manager.py @@ -16,6 +16,7 @@ from pydantic import BaseModel from agent.db_pool_manager import get_db_pool_manager from utils.fastapi_utils import extract_api_key_from_auth +from utils.new_api_proxy import get_new_api_proxy logger = logging.getLogger('app') @@ -633,6 +634,51 @@ async def migrate_bot_owner_and_shares(): """) logger.info("is_admin column added successfully") + # 3. 添加 new_api_session 字段 + await cursor.execute(""" + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'agent_user' AND column_name = 'new_api_session' + """) + has_session_column = await cursor.fetchone() + if not has_session_column: + logger.info("Adding new_api_session column to agent_user table") + await cursor.execute(""" + ALTER TABLE agent_user + ADD COLUMN new_api_session VARCHAR(500) + """) + logger.info("new_api_session column added successfully") + + # 检查并添加 new_api_user_id 字段 + await cursor.execute(""" + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'agent_user' AND column_name = 'new_api_user_id' + """) + has_user_id_column = await cursor.fetchone() + if not has_user_id_column: + logger.info("Adding new_api_user_id column to agent_user table") + await cursor.execute(""" + ALTER TABLE agent_user + ADD COLUMN new_api_user_id INTEGER + """) + logger.info("new_api_user_id column added successfully") + + # 检查并添加 new_api_token 字段(用于存储 API Key) + await cursor.execute(""" + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'agent_user' AND column_name = 'new_api_token' + """) + has_token_column = await cursor.fetchone() + if not has_token_column: + logger.info("Adding new_api_token column to agent_user table") + await cursor.execute(""" + ALTER TABLE agent_user + ADD COLUMN new_api_token VARCHAR(255) + """) + logger.info("new_api_token column added successfully") + # 2. 创建 bot_shares 表 await cursor.execute(""" SELECT EXISTS ( @@ -1593,23 +1639,20 @@ async def get_bot_settings(bot_uuid: str, authorization: Optional[str] = Header( is_owner = (str(owner_id) == str(user_id)) # 获取关联的模型信息 + # 注意:model_id 现在来自 New API,格式为 "Provider/ModelName" + # 不再从本地 agent_models 表查询 model_info = None model_id = settings.get('model_id') if model_id: - await cursor.execute(""" - SELECT id, name, provider, model, server, api_key - FROM agent_models WHERE id = %s - """, (model_id,)) - model_row = await cursor.fetchone() - if model_row: - model_info = ModelInfo( - id=str(model_row[0]), - name=model_row[1], - provider=model_row[2], - model=model_row[3], - server=model_row[4], - api_key=mask_api_key(model_row[5]) - ) + # 直接使用 model_id 作为模型信息 + model_info = ModelInfo( + id=model_id, + name=model_id, + provider=model_id.split('/')[0] if '/' in model_id else 'new-api', + model=model_id.split('/')[-1] if '/' in model_id else model_id, + server=None, + api_key=None + ) # 处理 dataset_ids:将字符串转换为数组 dataset_ids = settings.get('dataset_ids') @@ -1681,13 +1724,7 @@ async def update_bot_settings( # 处理 model_id:将空字符串转换为 None model_id_value = request.model_id.strip() if request.model_id else None - # 验证 model_id 是否存在 - if model_id_value: - async with pool.connection() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT id FROM agent_models WHERE id = %s", (model_id_value,)) - if not await cursor.fetchone(): - raise HTTPException(status_code=400, detail=f"Model with id '{request.model_id}' not found") + # 注意:model_id 现在来自 New API,不再在本地 agent_models 表中验证 # 构建 JSONB 更新对象 update_json = {} @@ -2256,6 +2293,44 @@ async def user_register(request: UserRegisterRequest): """, (request.username, request.email, password_hash)) user_id, created_at = await cursor.fetchone() + # 5. 同步注册到 New API(静默失败) + new_api_session = None + new_api_user_id = None + try: + proxy = get_new_api_proxy() + # 使用假邮箱或提供的邮箱注册 + register_email = request.email or f"{request.username}@fake.local" + register_result = await proxy.register(request.username, request.password, register_email) + logger.info(f"New API register result for {request.username}: success={register_result.get('success')}, message={register_result.get('message', 'N/A')}") + + if register_result.get("success"): + # 注册成功后立即登录获取 session 和 user_id + login_result = await proxy.login(request.username, request.password) + if login_result.get("success"): + new_api_session = login_result.get("session") + new_api_user_id = login_result.get("data", {}).get("id") + await cursor.execute(""" + UPDATE agent_user + SET new_api_session = %s, new_api_user_id = %s + WHERE id = %s + """, (new_api_session, new_api_user_id, user_id)) + logger.info(f"New API session and user_id stored for user {request.username}") + else: + # 如果注册失败(可能用户已存在),尝试直接登录 + logger.warning(f"New API register failed, trying login: {register_result.get('message')}") + login_result = await proxy.login(request.username, request.password) + if login_result.get("success"): + new_api_session = login_result.get("session") + new_api_user_id = login_result.get("data", {}).get("id") + await cursor.execute(""" + UPDATE agent_user + SET new_api_session = %s, new_api_user_id = %s + WHERE id = %s + """, (new_api_session, new_api_user_id, user_id)) + logger.info(f"New API session and user_id stored for user {request.username} via login") + except Exception as e: + logger.warning(f"New API sync failed for user {request.username}: {e}") + # 6. 生成 token token = secrets.token_urlsafe(32) expires_at = datetime.now() + timedelta(hours=TOKEN_EXPIRE_HOURS) @@ -2337,6 +2412,69 @@ async def user_login(request: UserLoginRequest): VALUES (%s, %s, %s) """, (user_id, token, expires_at)) + # 5. 尝试同步 New API session 和令牌(静默失败) + new_api_session = None + new_api_user_id = None + new_api_token = None + try: + proxy = get_new_api_proxy() + # 使用 username 登录 New API + logger.info(f"Attempting New API login for user {username}") + login_result = await proxy.login(request.username, request.password) + logger.info(f"New API login result: success={login_result.get('success')}, message={login_result.get('message', 'N/A')}") + + if login_result.get("success"): + logger.info(f"New API login successful for user {username}") + # 存储 session 和 user_id + new_api_session = login_result.get("session") + new_api_user_id = login_result.get("data", {}).get("id") + if new_api_session or new_api_user_id: + # 获取或创建令牌 + cookies = {"session": new_api_session} if new_api_session else {} + token_result = await proxy.get_or_create_token(cookies, new_api_user_id) + if token_result.get("success") and token_result.get("data"): + new_api_token = token_result["data"].get("key") + logger.info(f"Got New API token for user {username}") + + await cursor.execute(""" + UPDATE agent_user + SET new_api_session = %s, new_api_user_id = %s, new_api_token = %s + WHERE id = %s + """, (new_api_session, new_api_user_id, new_api_token, user_id)) + logger.info(f"Stored New API session, user_id and token for user {username}") + else: + logger.warning(f"New API login succeeded but no session/user_id. Response: {login_result}") + else: + # 登录失败,尝试先注册再登录(可能是用户在 New API 不存在) + logger.info(f"New API login failed, trying to register user {username}") + register_email = email or f"{request.username}@fake.local" + register_result = await proxy.register(request.username, request.password, register_email) + logger.info(f"New API register result: success={register_result.get('success')}, message={register_result.get('message', 'N/A')}") + + if register_result.get("success"): + # 注册成功后再次登录 + login_result = await proxy.login(request.username, request.password) + if login_result.get("success"): + new_api_session = login_result.get("session") + new_api_user_id = login_result.get("data", {}).get("id") + # 获取或创建令牌 + cookies = {"session": new_api_session} if new_api_session else {} + token_result = await proxy.get_or_create_token(cookies, new_api_user_id) + if token_result.get("success") and token_result.get("data"): + new_api_token = token_result["data"].get("key") + logger.info(f"Got New API token for user {username} after registration") + await cursor.execute(""" + UPDATE agent_user + SET new_api_session = %s, new_api_user_id = %s, new_api_token = %s + WHERE id = %s + """, (new_api_session, new_api_user_id, new_api_token, user_id)) + logger.info(f"New API session, user_id and token stored for user {username} after registration") + else: + logger.warning(f"New API register also failed for user {username}: {register_result.get('message')}") + except Exception as e: + # 静默失败,不影响本地登录 + logger.warning(f"New API login sync failed for user {username}: {e}") + await conn.commit() return UserLoginResponse( @@ -3560,3 +3698,79 @@ async def sync_bot_from_source( message="Bot synced from source successfully" ) + +# ============== New API 模型和令牌管理 API ============== + +class NewAPIModelResponse(BaseModel): + """New API 模型响应""" + id: str + object: str = "model" + created: Optional[int] = None + owned_by: str = "system" + + +@router.get("/api/v1/newapi/models", response_model=List[NewAPIModelResponse]) +async def get_newapi_models(authorization: Optional[str] = Header(None)): + """ + 获取用户可用的模型列表(从 New API) + + Args: + authorization: Bearer token + + Returns: + List[NewAPIModelResponse]: 模型列表 + """ + user_valid, user_id, _ = await verify_user_auth(authorization) + if not user_valid: + raise HTTPException( + status_code=401, + detail="Unauthorized" + ) + + pool = get_db_pool_manager().pool + + async with pool.connection() as conn: + async with conn.cursor() as cursor: + # 获取用户的 new_api_session 和 new_api_user_id + await cursor.execute(""" + SELECT new_api_session, new_api_user_id + FROM agent_user + WHERE id = %s + """, (user_id,)) + row = await cursor.fetchone() + + if not row or not row[0] or not row[1]: + raise HTTPException( + status_code=400, + detail="New API session not found. Please log in again." + ) + + new_api_session = row[0] + new_api_user_id = row[1] + + # 调用 New API 获取模型列表 + proxy = get_new_api_proxy() + cookies = {"session": new_api_session} + + result = await proxy.get_user_models(cookies, new_api_user_id) + + if not result.get("success"): + raise HTTPException( + status_code=500, + detail=result.get("message", "Failed to fetch models from New API") + ) + + # 解析模型列表 + models = [] + data = result.get("data", []) + if isinstance(data, list): + for model in data: + models.append(NewAPIModelResponse( + id=model if isinstance(model, str) else model.get("id", ""), + object="model", + created=None, + owned_by="system" + )) + + return models + diff --git a/routes/payment.py b/routes/payment.py new file mode 100644 index 0000000..8bf788a --- /dev/null +++ b/routes/payment.py @@ -0,0 +1,381 @@ +""" +支付代理 API 路由 +代理前端请求到 New API 支付后端 +""" +import logging +from typing import Optional, List +from fastapi import APIRouter, HTTPException, Header, Cookie +from pydantic import BaseModel + +from utils.new_api_proxy import get_new_api_proxy +from agent.db_pool_manager import get_db_pool_manager +from utils.fastapi_utils import extract_api_key_from_auth + +logger = logging.getLogger('app') + +router = APIRouter() + + +# ============== Pydantic Models ============== + +class TopupRequest(BaseModel): + """充值请求""" + amount: int # 充值金额 + payment_method: str = "alipay" # 支付方式: alipay, wxpay + + +class StripeTopupRequest(BaseModel): + """Stripe 充值请求""" + amount: int + + +class CreemTopupRequest(BaseModel): + """Creem 充值请求""" + product_id: str + + +class SubscribeRequest(BaseModel): + """订阅请求""" + plan_id: int + payment_method: str = "alipay" # 支付方式: alipay, wxpay + + +class PaymentInfoResponse(BaseModel): + """支付信息响应""" + success: bool + message: str = "" + data: Optional[dict] = None + + +class OrderListResponse(BaseModel): + """订单列表响应""" + success: bool + message: str = "" + data: Optional[dict] = None + + +# ============== 辅助函数 ============== + +async def verify_user_and_get_session(authorization: Optional[str]) -> tuple[str, Optional[dict], Optional[int]]: + """ + 验证用户并获取 New API session cookies 和 user_id + + Args: + authorization: Authorization header + + Returns: + tuple[str, Optional[dict], Optional[int]]: (user_id, cookies_dict, new_api_user_id) + + Raises: + HTTPException: 认证失败 + """ + token = extract_api_key_from_auth(authorization) + if not token: + raise HTTPException(status_code=401, detail="Authorization required") + + pool = get_db_pool_manager().pool + + async with pool.connection() as conn: + async with conn.cursor() as cursor: + # 验证 token 并获取用户信息 + await cursor.execute(""" + SELECT u.id, u.new_api_session, u.new_api_user_id + FROM agent_user_tokens t + JOIN agent_user u ON t.user_id = u.id + WHERE t.token = %s + AND t.expires_at > NOW() + AND u.is_active = TRUE + """, (token,)) + row = await cursor.fetchone() + + if not row: + raise HTTPException(status_code=401, detail="Invalid or expired token") + + user_id, new_api_session, new_api_user_id = row + + # 如果有 session,构建 cookies + cookies = None + if new_api_session: + cookies = {"session": new_api_session} + + return str(user_id), cookies, new_api_user_id + + +# ============== 套餐/产品相关 API ============== + +@router.get("/api/v1/payment/packages") +async def get_payment_packages(authorization: Optional[str] = Header(None)): + """ + 获取可购买的套餐列表 + """ + user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization) + + # 如果没有 New API session 或 user_id,返回需要绑定的提示 + if not cookies or not new_api_user_id: + return { + "success": False, + "need_bind": True, + "message": "请先绑定支付账户" + } + + proxy = get_new_api_proxy() + result = await proxy.get_topup_info(cookies, new_api_user_id) + + return result + + +@router.get("/api/v1/payment/subscription-plans") +async def get_subscription_plans(authorization: Optional[str] = Header(None)): + """ + 获取订阅计划列表 + """ + user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization) + + if not cookies or not new_api_user_id: + return { + "success": False, + "need_bind": True, + "message": "请先绑定支付账户" + } + + proxy = get_new_api_proxy() + result = await proxy.get_subscription_plans(cookies, new_api_user_id) + + return result + + +# ============== 支付相关 API ============== + +@router.post("/api/v1/payment/topup") +async def create_topup_order( + request: TopupRequest, + authorization: Optional[str] = Header(None) +): + """ + 创建充值订单(易支付) + """ + user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization) + + if not cookies or not new_api_user_id: + return { + "success": False, + "need_bind": True, + "message": "请先绑定支付账户" + } + + proxy = get_new_api_proxy() + result = await proxy.create_topup_order( + cookies=cookies, + amount=request.amount, + payment_method=request.payment_method, + new_api_user_id=new_api_user_id + ) + + return result + + +@router.post("/api/v1/payment/stripe") +async def create_stripe_order( + request: StripeTopupRequest, + authorization: Optional[str] = Header(None) +): + """ + 创建 Stripe 支付订单 + """ + user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization) + + if not cookies or not new_api_user_id: + return { + "success": False, + "need_bind": True, + "message": "请先绑定支付账户" + } + + proxy = get_new_api_proxy() + result = await proxy.create_stripe_order( + cookies=cookies, + amount=request.amount, + new_api_user_id=new_api_user_id + ) + + return result + + +@router.post("/api/v1/payment/creem") +async def create_creem_order( + request: CreemTopupRequest, + authorization: Optional[str] = Header(None) +): + """ + 创建 Creem 支付订单 + """ + user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization) + + if not cookies or not new_api_user_id: + return { + "success": False, + "need_bind": True, + "message": "请先绑定支付账户" + } + + proxy = get_new_api_proxy() + result = await proxy.create_creem_order( + cookies=cookies, + product_id=request.product_id, + new_api_user_id=new_api_user_id + ) + + return result + + +# ============== 订单相关 API ============== + +@router.get("/api/v1/payment/orders") +async def get_order_list( + page: int = 1, + page_size: int = 20, + authorization: Optional[str] = Header(None) +): + """ + 获取订单列表 + """ + user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization) + + if not cookies or not new_api_user_id: + return { + "success": False, + "need_bind": True, + "message": "请先绑定支付账户" + } + + proxy = get_new_api_proxy() + result = await proxy.get_order_list( + cookies=cookies, + page=page, + page_size=page_size, + new_api_user_id=new_api_user_id + ) + + return result + + +@router.get("/api/v1/payment/orders/{order_id}") +async def get_order_status( + order_id: str, + authorization: Optional[str] = Header(None) +): + """ + 查询订单状态 + """ + user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization) + + if not cookies or not new_api_user_id: + return { + "success": False, + "need_bind": True, + "message": "请先绑定支付账户" + } + + proxy = get_new_api_proxy() + result = await proxy.get_order_status( + cookies=cookies, + order_id=order_id, + new_api_user_id=new_api_user_id + ) + + return result + + +# ============== 订阅相关 API ============== + +@router.post("/api/v1/payment/subscribe") +async def subscribe_plan( + request: SubscribeRequest, + authorization: Optional[str] = Header(None) +): + """ + 订阅计划(易支付) + """ + user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization) + + if not cookies or not new_api_user_id: + return { + "success": False, + "need_bind": True, + "message": "请先绑定支付账户" + } + + proxy = get_new_api_proxy() + result = await proxy.subscribe_plan( + cookies=cookies, + plan_id=request.plan_id, + payment_method=request.payment_method, + new_api_user_id=new_api_user_id + ) + + return result + + +@router.get("/api/v1/payment/subscription/status") +async def get_subscription_status(authorization: Optional[str] = Header(None)): + """ + 获取订阅状态 + """ + user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization) + + if not cookies or not new_api_user_id: + return { + "success": False, + "need_bind": True, + "message": "请先绑定支付账户" + } + + proxy = get_new_api_proxy() + result = await proxy.get_subscription_status(cookies, new_api_user_id) + + return result + + +@router.post("/api/v1/payment/subscription/cancel") +async def cancel_subscription(authorization: Optional[str] = Header(None)): + """ + 取消订阅 + + 注意:New API 不支持用户自行取消订阅,需要联系管理员 + """ + user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization) + + if not cookies or not new_api_user_id: + return { + "success": False, + "need_bind": True, + "message": "请先绑定支付账户" + } + + # New API 不支持用户自行取消订阅,需要管理员操作 + return { + "success": False, + "message": "如需取消订阅,请联系管理员处理" + } + + +# ============== 额度相关 API ============== + +@router.get("/api/v1/payment/quota") +async def get_quota(authorization: Optional[str] = Header(None)): + """ + 获取用户额度信息 + """ + user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization) + + if not cookies or not new_api_user_id: + return { + "success": False, + "need_bind": True, + "message": "请先绑定支付账户" + } + + proxy = get_new_api_proxy() + result = await proxy.get_quota(cookies, new_api_user_id) + + return result diff --git a/utils/fastapi_utils.py b/utils/fastapi_utils.py index c1a71fc..09530ca 100644 --- a/utils/fastapi_utils.py +++ b/utils/fastapi_utils.py @@ -454,15 +454,16 @@ async def fetch_bot_config_from_db(bot_user_id: str) -> Dict[str, Any]: """ try: from agent.db_pool_manager import get_db_pool_manager + from utils.settings import NEW_API_BASE_URL pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: - # 从 agent_bots 表获取 bot 信息和 settings + # 从 agent_bots 表获取 bot 信息和 settings,同时获取 owner_id await cursor.execute( """ - SELECT id, name, settings + SELECT id, name, settings, owner_id FROM agent_bots WHERE id = %s """, (bot_user_id,) @@ -478,6 +479,7 @@ async def fetch_bot_config_from_db(bot_user_id: str) -> Dict[str, Any]: bot_uuid = bot_row[0] bot_name = bot_row[1] settings_json = bot_row[2] + owner_id = bot_row[3] # 解析 settings JSONB 字段 if settings_json: @@ -492,14 +494,32 @@ async def fetch_bot_config_from_db(bot_user_id: str) -> Dict[str, Any]: else: settings_data = {} - # 获取 model_id + # 获取 model_id(来自 New API,格式为 "Provider/ModelName") model_id = settings_data.get("model_id", "") - # 构建 config 字典,使用默认值填充缺失的字段 + # 获取 bot owner 的 new_api_token 作为 api_key + api_key = "" + if owner_id: + await cursor.execute( + """ + SELECT new_api_token + FROM agent_user WHERE id = %s + """, + (owner_id,) + ) + user_row = await cursor.fetchone() + if user_row and user_row[0]: + api_key = user_row[0] + + # 构建 config 字典 + # model_id 格式为 "Provider/ModelName",需要拆分 + model_name = model_id + model_server = NEW_API_BASE_URL.rstrip('/') + "/v1" if NEW_API_BASE_URL else "" + config = { - "model": "qwen3-next", - "api_key": "", - "model_server": "", + "model": model_name, + "api_key": api_key, + "model_server": model_server, "language": settings_data.get("language", "zh"), "dataset_ids": settings_data.get("dataset_ids", []), "system_prompt": settings_data.get("system_prompt", ""), @@ -512,25 +532,6 @@ async def fetch_bot_config_from_db(bot_user_id: str) -> Dict[str, Any]: "suggestions": settings_data.get("suggestions", []) } - # 根据 model_id 查询模型信息 - if model_id: - await cursor.execute( - """ - SELECT model, server, api_key - FROM agent_models WHERE id = %s - """, - (model_id,) - ) - model_row = await cursor.fetchone() - if model_row: - config['model'] = model_row[0] - config['model_server'] = model_row[1] - config['api_key'] = model_row[2] - else: - logger.warning(f"Model with id {model_id} not found, using defaults") - else: - logger.warning(f"No model_id set for bot {bot_user_id}, using defaults") - # 处理 dataset_ids dataset_ids = config['dataset_ids'] if dataset_ids: @@ -603,6 +604,7 @@ async def fetch_bot_config_from_db(bot_user_id: str) -> Dict[str, Any]: else: config["mcp_settings"] = [] + logger.info(f"Fetched bot config for {bot_user_id}: model={config['model']}, api_key={'*' + config['api_key'][-4:] if config['api_key'] else 'N/A'}") return config except HTTPException: diff --git a/utils/new_api_proxy.py b/utils/new_api_proxy.py new file mode 100644 index 0000000..750b835 --- /dev/null +++ b/utils/new_api_proxy.py @@ -0,0 +1,387 @@ +""" +New API 代理工具类 +用于与 New API 支付后端进行通信 +""" +import logging +from typing import Optional, Dict, Any +import httpx + +from utils.settings import NEW_API_BASE_URL, NEW_API_TIMEOUT + +logger = logging.getLogger('app') + + +class NewAPIProxy: + """New API 代理类,处理与支付后端的所有通信""" + + def __init__(self): + self.base_url = NEW_API_BASE_URL.rstrip('/') + self.timeout = NEW_API_TIMEOUT + + def _get_headers( + self, + cookies: Optional[Dict[str, str]] = None, + new_api_user_id: Optional[int] = None + ) -> Dict[str, str]: + """获取请求头""" + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + # 添加 New-Api-User header(用户 ID) + if new_api_user_id: + headers["New-Api-User"] = str(new_api_user_id) + return headers + + async def _request( + self, + method: str, + endpoint: str, + cookies: Optional[Dict[str, str]] = None, + data: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, Any]] = None, + new_api_user_id: Optional[int] = None + ) -> Dict[str, Any]: + """ + 发送请求到 New API + """ + url = f"{self.base_url}{endpoint}" + headers = self._get_headers(cookies, new_api_user_id) + + async with httpx.AsyncClient(timeout=self.timeout) as client: + try: + response = await client.request( + method=method, + url=url, + headers=headers, + cookies=cookies, + json=data if data else None, + params=params + ) + + try: + result = response.json() + if response.cookies: + result["_cookies"] = dict(response.cookies) + return result + except Exception: + return { + "success": False, + "message": f"Invalid response: {response.text[:500]}", + "status_code": response.status_code + } + + except httpx.TimeoutException: + logger.error(f"New API request timeout: {url}") + return {"success": False, "message": "Request timeout"} + except httpx.RequestError as e: + logger.error(f"New API request error: {e}") + return {"success": False, "message": str(e)} + + async def _request_with_cookies( + self, + method: str, + endpoint: str, + cookies: Optional[Dict[str, str]] = None, + data: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, Any]] = None, + new_api_user_id: Optional[int] = None + ) -> tuple[Dict[str, Any], Dict[str, str]]: + """发送请求到 New API,并返回响应和 cookies""" + url = f"{self.base_url}{endpoint}" + headers = self._get_headers(cookies, new_api_user_id) + + async with httpx.AsyncClient(timeout=self.timeout) as client: + try: + response = await client.request( + method=method, + url=url, + headers=headers, + cookies=cookies, + json=data if data else None, + params=params + ) + + new_cookies = dict(response.cookies) if response.cookies else {} + + try: + result = response.json() + return result, new_cookies + except Exception: + return { + "success": False, + "message": f"Invalid response: {response.text[:500]}", + "status_code": response.status_code + }, new_cookies + + except httpx.TimeoutException: + logger.error(f"New API request timeout: {url}") + return {"success": False, "message": "Request timeout"}, {} + except httpx.RequestError as e: + logger.error(f"New API request error: {e}") + return {"success": False, "message": str(e)}, {} + + # ==================== 用户认证相关 ==================== + + async def login(self, username: str, password: str) -> Dict[str, Any]: + """用户登录到 New API""" + result, cookies = await self._request_with_cookies( + "POST", + "/api/user/login", + data={"username": username, "password": password} + ) + + if result.get("success") and cookies: + result["session"] = cookies.get("session") or cookies.get("SESSION") + result["_cookies"] = cookies + + return result + + async def register(self, username: str, password: str, email: Optional[str] = None) -> Dict[str, Any]: + """在 New API 注册用户""" + data = {"username": username, "password": password} + if email: + data["email"] = email + return await self._request("POST", "/api/user/register", data=data) + + async def get_user_info(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]: + """获取用户信息""" + return await self._request("GET", "/api/user/self", cookies=cookies, new_api_user_id=new_api_user_id) + + # ==================== 充值/套餐相关 ==================== + + async def get_topup_info(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]: + """获取充值信息,包含 creem_products 套餐列表""" + return await self._request("GET", "/api/user/topup/info", cookies=cookies, new_api_user_id=new_api_user_id) + + async def get_subscription_plans(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]: + """获取订阅计划列表""" + return await self._request("GET", "/api/subscription/plans", cookies=cookies, new_api_user_id=new_api_user_id) + + # ==================== 支付相关 ==================== + + async def create_topup_order( + self, + cookies: Dict[str, str], + amount: int, + payment_method: str, + new_api_user_id: int + ) -> Dict[str, Any]: + """创建充值订单(易支付)""" + return await self._request( + "POST", + "/api/user/pay", + cookies=cookies, + data={"amount": amount, "payment_method": payment_method}, + new_api_user_id=new_api_user_id + ) + + async def create_stripe_order( + self, + cookies: Dict[str, str], + amount: int, + new_api_user_id: int + ) -> Dict[str, Any]: + """创建 Stripe 支付订单""" + return await self._request( + "POST", + "/api/user/stripe/pay", + cookies=cookies, + data={"amount": amount}, + new_api_user_id=new_api_user_id + ) + + async def create_creem_order( + self, + cookies: Dict[str, str], + product_id: str, + new_api_user_id: int + ) -> Dict[str, Any]: + """创建 Creem 支付订单""" + return await self._request( + "POST", + "/api/user/creem/pay", + cookies=cookies, + data={"product_id": product_id}, + new_api_user_id=new_api_user_id + ) + + async def get_order_status( + self, + cookies: Dict[str, str], + order_id: str, + new_api_user_id: int + ) -> Dict[str, Any]: + """查询订单状态""" + return await self._request( + "GET", + f"/api/user/order/{order_id}", + cookies=cookies, + new_api_user_id=new_api_user_id + ) + + async def get_order_list( + self, + cookies: Dict[str, str], + page: int, + page_size: int, + new_api_user_id: int + ) -> Dict[str, Any]: + """获取订单列表(充值记录)""" + return await self._request( + "GET", + "/api/user/topup/self", + cookies=cookies, + params={"p": page, "size": page_size}, + new_api_user_id=new_api_user_id + ) + + # ==================== 订阅相关 ==================== + + async def subscribe_plan( + self, + cookies: Dict[str, str], + plan_id: int, + new_api_user_id: int, + payment_method: str = "alipay" + ) -> Dict[str, Any]: + """订阅计划(使用易支付)""" + return await self._request( + "POST", + "/api/subscription/epay/pay", + cookies=cookies, + data={"plan_id": plan_id, "payment_method": payment_method}, + new_api_user_id=new_api_user_id + ) + + async def get_subscription_status(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]: + """获取订阅状态""" + return await self._request( + "GET", + "/api/subscription/self", + cookies=cookies, + new_api_user_id=new_api_user_id + ) + + async def cancel_subscription(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]: + """取消订阅""" + return await self._request( + "POST", + "/api/subscription/cancel", + cookies=cookies, + new_api_user_id=new_api_user_id + ) + + # ==================== 额度/余额相关 ==================== + + async def get_quota(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]: + """获取用户额度信息(从用户信息中获取)""" + return await self._request( + "GET", + "/api/user/self", + cookies=cookies, + new_api_user_id=new_api_user_id + ) + + # ==================== 模型相关 ==================== + + async def get_user_models(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]: + """获取用户可用模型列表""" + return await self._request( + "GET", + "/api/user/models", + cookies=cookies, + new_api_user_id=new_api_user_id + ) + + # ==================== 令牌相关 ==================== + + async def get_tokens( + self, + cookies: Dict[str, str], + new_api_user_id: int, + page: int = 1, + page_size: int = 100 + ) -> Dict[str, Any]: + """获取令牌列表""" + return await self._request( + "GET", + "/api/token/", + cookies=cookies, + params={"p": page, "page_size": page_size}, + new_api_user_id=new_api_user_id + ) + + async def create_token( + self, + cookies: Dict[str, str], + new_api_user_id: int, + name: str = "API Token", + remain_quota: Optional[int] = None, + expired_time: Optional[int] = None, + unlimited_quota: bool = True + ) -> Dict[str, Any]: + """创建令牌""" + data = { + "name": name, + "unlimited_quota": unlimited_quota + } + if remain_quota is not None: + data["remain_quota"] = remain_quota + if expired_time is not None: + data["expired_time"] = expired_time + return await self._request( + "POST", + "/api/token/", + cookies=cookies, + data=data, + new_api_user_id=new_api_user_id + ) + + async def get_or_create_token( + self, + cookies: Dict[str, str], + new_api_user_id: int, + token_name: str = "Bot Manager Token" + ) -> Dict[str, Any]: + """获取或创建令牌(如果没有则创建一个)""" + # 先尝试获取现有令牌 + tokens_result = await self.get_tokens(cookies, new_api_user_id) + + if tokens_result.get("success") and tokens_result.get("data"): + tokens = tokens_result["data"] + if isinstance(tokens, list) and len(tokens) > 0: + # 返回第一个可用的令牌 + return { + "success": True, + "data": tokens[0], + "message": "使用现有令牌" + } + + # 没有令牌,创建一个新的 + create_result = await self.create_token( + cookies, + new_api_user_id, + name=token_name + ) + + if create_result.get("success"): + return { + "success": True, + "data": create_result.get("data"), + "message": "已创建新令牌" + } + + return create_result + + +# 全局单例 +_new_api_proxy: Optional[NewAPIProxy] = None + + +def get_new_api_proxy() -> NewAPIProxy: + """获取 New API 代理单例""" + global _new_api_proxy + if _new_api_proxy is None: + _new_api_proxy = NewAPIProxy() + return _new_api_proxy diff --git a/utils/settings.py b/utils/settings.py index 96e2dcd..3423b4f 100644 --- a/utils/settings.py +++ b/utils/settings.py @@ -100,3 +100,16 @@ RAGFLOW_ALLOWED_EXTENSIONS = os.getenv( # 性能配置 RAGFLOW_CONNECTION_TIMEOUT = int(os.getenv("RAGFLOW_CONNECTION_TIMEOUT", "30")) # 30秒 RAGFLOW_MAX_CONCURRENT_UPLOADS = int(os.getenv("RAGFLOW_MAX_CONCURRENT_UPLOADS", "5")) + +# ============================================================ +# New API Payment Configuration +# ============================================================ + +# New API 基础 URL(支付后端) +NEW_API_BASE_URL = os.getenv("NEW_API_BASE_URL", "http://116.62.16.218:3000") + +# New API 请求超时(秒) +NEW_API_TIMEOUT = int(os.getenv("NEW_API_TIMEOUT", "30")) + +# New API 管理员密钥(用于同步用户等管理操作,可选) +NEW_API_ADMIN_KEY = os.getenv("NEW_API_ADMIN_KEY", "")