qwen_agent/plans/new-api-payment-integration.md
2026-02-25 23:42:44 +08:00

20 KiB
Raw Blame History

New API 支付系统集成计划(简化版)

概述

将 New API 作为 qwen-client 和 qwen-agent 的支付后端。qwen-agent 只做代理转发,不维护配额和交易数据。

创建日期: 2026-02-25 更新日期: 2026-02-25 状态: 待审核 复杂度: 中


核心设计

设计原则

  1. New API 是数据源: 所有配额、交易、订单数据由 New API 维护
  2. qwen-agent 是代理层: 只转发请求,不存储支付相关数据
  3. Session 同步登录: 用户登录 qwen-agent 时,同步登录 New API 并存储 session
  4. Cookie 转发: 代理请求时转发用户的 session cookie 实现 New API 鉴权

架构图

┌─────────────┐      ┌─────────────────┐      ┌─────────────┐
│ qwen-client │ ───► │   qwen-agent    │ ───► │   New API   │
│   (前端)    │      │  (代理 + Session)│      │ (支付后端)  │
└─────────────┘      └─────────────────┘      └─────────────┘
                            │
                            ▼
                     存储用户对应的
                     New API Session Cookie

支付方式支持

支付方式 New API 接口 说明
易支付 /api/user/pay 通用支付方式(微信/支付宝等)
Stripe /api/user/stripe/pay 国际信用卡支付
Creem /api/user/creem/pay 新支付方式,支持套餐

数据库设计

只需新增一个字段

-- 在 agent_user 表添加 New API Session
ALTER TABLE agent_user ADD COLUMN new_api_session VARCHAR(500);
ALTER TABLE agent_user ADD COLUMN new_api_user_id INTEGER;

不需要新建表,所有配额、交易、订单数据都在 New API 中。


核心流程

1. 用户登录同步

用户登录 qwen-client
    │
    ▼
qwen-agent 验证本地用户
    │
    ▼
qwen-agent 用相同凭证调用 New API /api/user/login
    │
    ├─ 成功 → 存储 new_api_session 到 agent_user 表
    │
    └─ 失败 → 如果是用户不存在,先在 New API 注册再登录
    │
    ▼
返回 qwen-agent 的 token 给前端

2. 支付/充值流程

用户点击充值
    │
    ▼
qwen-agent 读取用户的 new_api_session
    │
    ▼
代理请求到 New API (带 session cookie)
    │
    ▼
返回支付链接给前端
    │
    ▼
用户完成支付 (New API 直接处理)
    │
    ▼
前端跳转到 New API 的用户中心查看结果

3. 配额检查

用户发起聊天请求
    │
    ▼
qwen-agent 调用 New API /api/user/self 获取实时配额
    │
    ├─ 配额不足 → 返回 402 错误 + New API 充值链接
    │
    └─ 配额充足 → 处理请求
    │
    ▼
(可选) 调用 New API 扣减配额

API 设计

qwen-agent 新增接口

端点 方法 描述 代理的 New API 接口
/api/v1/payment/info GET 获取充值配置(含套餐) /api/user/topup/info
/api/v1/payment/plans GET 获取订阅套餐列表 /api/subscription/plans
/api/v1/payment/create POST 创建支付订单(易支付) /api/user/pay
/api/v1/payment/stripe POST 创建 Stripe 支付 /api/user/stripe/pay
/api/v1/payment/creem POST 创建 Creem 支付(套餐) /api/user/creem/pay
/api/v1/payment/amount POST 计算实际支付金额 /api/user/amount
/api/v1/quota GET 查询用户配额 /api/user/self
/api/v1/topup/history GET 充值记录 /api/user/topup/self

所有接口都是代理转发,使用存储的 new_api_session。


New API 接口详情

1. 获取充值配置 /api/user/topup/info (GET)

鉴权: 需要登录Cookie

响应示例:

{
  "success": true,
  "message": "",
  "data": {
    "enable_online_topup": true,
    "enable_stripe_topup": true,
    "enable_creem_topup": true,
    "creem_products": [
      {
        "productId": "prod_xxx",
        "name": "基础套餐",
        "price": 9.99,
        "currency": "USD",
        "quota": 500000
      }
    ],
    "pay_methods": [
      {"name": "微信支付", "type": "wxpay", "color": "#07C160", "min_topup": "1"},
      {"name": "支付宝", "type": "alipay", "color": "#1677FF", "min_topup": "1"}
    ],
    "min_topup": 1,
    "stripe_min_topup": 1,
    "amount_options": [100, 500, 1000, 5000],
    "discount": {
      "100": 1.0,
      "500": 0.95,
      "1000": 0.9,
      "5000": 0.85
    }
  }
}

字段说明:

  • enable_online_topup: 是否启用易支付
  • enable_stripe_topup: 是否启用 Stripe 支付
  • enable_creem_topup: 是否启用 Creem 支付
  • creem_products: Creem 套餐列表(这是套餐列表的数据来源
  • pay_methods: 支持的支付方式列表
  • min_topup: 最小充值额度
  • amount_options: 预设充值额度选项
  • discount: 折扣配置(额度 -> 折扣率)

2. 获取订阅套餐列表 /api/subscription/plans (GET)

鉴权: 需要登录Cookie

响应示例:

{
  "success": true,
  "data": [
    {
      "plan": {
        "id": 1,
        "title": "月度订阅",
        "subtitle": "适合轻度用户",
        "price_amount": 9.99,
        "currency": "USD",
        "duration_unit": "month",
        "duration_value": 1,
        "total_amount": 500000,
        "enabled": true,
        "sort_order": 100,
        "stripe_price_id": "price_xxx",
        "creem_product_id": "prod_xxx",
        "max_purchase_per_user": 0,
        "upgrade_group": "",
        "quota_reset_period": "monthly"
      }
    }
  ]
}

字段说明:

  • title: 套餐名称
  • price_amount: 价格
  • duration_unit: 周期单位month/year/custom
  • duration_value: 周期数值
  • total_amount: 包含的配额
  • stripe_price_id: Stripe 价格 ID
  • creem_product_id: Creem 产品 ID

3. 创建易支付订单 /api/user/pay (POST)

请求:

{
  "amount": 1000,
  "payment_method": "wxpay"
}

响应:

{
  "message": "success",
  "data": {...},
  "url": "https://pay.example.com/submit.php?..."
}

4. 创建 Creem 支付 /api/user/creem/pay (POST)

请求:

{
  "product_id": "prod_xxx",
  "payment_method": "creem"
}

响应:

{
  "message": "success",
  "data": {
    "checkout_url": "https://checkout.creem.io/...",
    "order_id": "ref_xxx"
  }
}

5. 获取用户信息(含配额)/api/user/self (GET)

鉴权: 需要登录Cookie

响应示例:

{
  "success": true,
  "data": {
    "id": 1,
    "username": "user",
    "email": "user@example.com",
    "quota": 500000,
    "used_quota": 10000,
    "group": "default"
  }
}

6. 用户登录 /api/user/login (POST)

请求:

{
  "username": "user",
  "password": "password"
}

响应:

  • 返回 Set-Cookie header 包含 session
  • 用户信息 JSON

实施步骤

Step 1: 添加环境变量配置

utils/settings.py 添加 New API 配置:

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    # ... 现有配置 ...

    # New API 配置
    NEW_API_BASE_URL: str = "https://new-api.example.com"
    NEW_API_TIMEOUT: int = 30

    class Config:
        env_file = ".env"

Step 2: 创建 New API 代理工具类

创建 utils/new_api_proxy.py

import httpx
from typing import Optional, Any
from utils.settings import Settings

settings = Settings()

class NewAPIProxy:
    """New API 代理工具类"""

    def __init__(self):
        self.base_url = settings.NEW_API_BASE_URL
        self.timeout = settings.NEW_API_TIMEOUT

    async def login(self, username: str, password: str) -> Optional[str]:
        """登录 New API 并返回 session cookie"""
        async with httpx.AsyncClient() as client:
            resp = await client.post(
                f"{self.base_url}/api/user/login",
                json={"username": username, "password": password},
                timeout=self.timeout
            )
            if resp.status_code == 200 and resp.json().get("success"):
                # 提取 session cookie
                cookies = resp.cookies.get("session")
                return cookies
            return None

    async def register(self, username: str, password: str, email: str) -> bool:
        """在 New API 注册用户"""
        async with httpx.AsyncClient() as client:
            resp = await client.post(
                f"{self.base_url}/api/user/register",
                json={"username": username, "password": password, "email": email},
                timeout=self.timeout
            )
            return resp.status_code == 200

    async def proxy_get(
        self,
        endpoint: str,
        session_cookie: str
    ) -> dict:
        """代理 GET 请求到 New API"""
        async with httpx.AsyncClient() as client:
            resp = await client.get(
                f"{self.base_url}{endpoint}",
                cookies={"session": session_cookie},
                timeout=self.timeout
            )
            return resp.json()

    async def proxy_post(
        self,
        endpoint: str,
        data: dict,
        session_cookie: str
    ) -> dict:
        """代理 POST 请求到 New API"""
        async with httpx.AsyncClient() as client:
            resp = await client.post(
                f"{self.base_url}{endpoint}",
                json=data,
                cookies={"session": session_cookie},
                timeout=self.timeout
            )
            return resp.json()

    async def get_user_info(self, session_cookie: str) -> Optional[dict]:
        """获取用户信息(含配额)"""
        try:
            result = await self.proxy_get("/api/user/self", session_cookie)
            if result.get("success"):
                return result.get("data")
        except:
            pass
        return None

    async def check_quota(self, session_cookie: str) -> tuple[bool, int]:
        """检查用户配额"""
        user_info = await self.get_user_info(session_cookie)
        if user_info:
            quota = user_info.get("quota", 0)
            return quota > 0, quota
        # 降级策略:无法获取配额时允许使用
        return True, -1

# 全局实例
new_api_proxy = NewAPIProxy()

Step 3: 修改登录逻辑

修改 routes/bot_manager.py 中的登录函数:

from utils.new_api_proxy import new_api_proxy

async def login_user(username: str, password: str):
    # 1. 验证本地用户
    user = await verify_local_user(username, password)

    # 2. 同步登录 New API
    new_api_session = await new_api_proxy.login(username, password)

    # 3. 如果登录失败,尝试注册
    if not new_api_session:
        await new_api_proxy.register(username, password, user.email)
        new_api_session = await new_api_proxy.login(username, password)

    # 4. 存储 session
    if new_api_session:
        await db.execute(
            "UPDATE agent_user SET new_api_session = $1 WHERE id = $2",
            new_api_session, user.id
        )

    # 5. 返回本地 token
    return generate_local_token(user)

Step 4: 创建支付代理路由

创建 routes/payment.py

from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from typing import Optional
from utils.new_api_proxy import new_api_proxy
from utils.auth import get_current_user

router = APIRouter(prefix="/api/v1", tags=["payment"])

class CreatePaymentRequest(BaseModel):
    amount: int
    payment_method: str

class CreemPaymentRequest(BaseModel):
    product_id: str
    payment_method: str = "creem"

class AmountRequest(BaseModel):
    amount: int

@router.get("/payment/info")
async def get_payment_info(user = Depends(get_current_user)):
    """获取充值配置(含套餐列表)"""
    if not user.new_api_session:
        raise HTTPException(status_code=401, detail="未同步 New API session")
    return await new_api_proxy.proxy_get("/api/user/topup/info", user.new_api_session)

@router.get("/payment/plans")
async def get_subscription_plans(user = Depends(get_current_user)):
    """获取订阅套餐列表"""
    if not user.new_api_session:
        raise HTTPException(status_code=401, detail="未同步 New API session")
    return await new_api_proxy.proxy_get("/api/subscription/plans", user.new_api_session)

@router.post("/payment/create")
async def create_payment(req: CreatePaymentRequest, user = Depends(get_current_user)):
    """创建支付订单(易支付)"""
    if not user.new_api_session:
        raise HTTPException(status_code=401, detail="未同步 New API session")
    return await new_api_proxy.proxy_post(
        "/api/user/pay",
        {"amount": req.amount, "payment_method": req.payment_method},
        user.new_api_session
    )

@router.post("/payment/stripe")
async def create_stripe_payment(req: AmountRequest, user = Depends(get_current_user)):
    """创建 Stripe 支付"""
    if not user.new_api_session:
        raise HTTPException(status_code=401, detail="未同步 New API session")
    return await new_api_proxy.proxy_post(
        "/api/user/stripe/pay",
        {"amount": req.amount},
        user.new_api_session
    )

@router.post("/payment/creem")
async def create_creem_payment(req: CreemPaymentRequest, user = Depends(get_current_user)):
    """创建 Creem 支付(套餐)"""
    if not user.new_api_session:
        raise HTTPException(status_code=401, detail="未同步 New API session")
    return await new_api_proxy.proxy_post(
        "/api/user/creem/pay",
        {"product_id": req.product_id, "payment_method": req.payment_method},
        user.new_api_session
    )

@router.post("/payment/amount")
async def get_payment_amount(req: AmountRequest, user = Depends(get_current_user)):
    """计算实际支付金额"""
    if not user.new_api_session:
        raise HTTPException(status_code=401, detail="未同步 New API session")
    return await new_api_proxy.proxy_post(
        "/api/user/amount",
        {"amount": req.amount},
        user.new_api_session
    )

@router.get("/quota")
async def get_quota(user = Depends(get_current_user)):
    """获取用户配额"""
    if not user.new_api_session:
        raise HTTPException(status_code=401, detail="未同步 New API session")
    user_info = await new_api_proxy.get_user_info(user.new_api_session)
    if user_info:
        return {
            "quota": user_info.get("quota", 0),
            "used_quota": user_info.get("used_quota", 0)
        }
    raise HTTPException(status_code=500, detail="获取配额失败")

@router.get("/topup/history")
async def get_topup_history(user = Depends(get_current_user)):
    """获取充值记录"""
    if not user.new_api_session:
        raise HTTPException(status_code=401, detail="未同步 New API session")
    return await new_api_proxy.proxy_get("/api/user/topup/self", user.new_api_session)

Step 5: 在 Chat 路由添加配额检查

routes/chat.py 添加配额检查中间件:

from utils.new_api_proxy import new_api_proxy

async def check_quota_before_chat(user) -> tuple[bool, int]:
    """聊天前检查配额"""
    if not user.new_api_session:
        # 没有 session允许通过可能是非支付用户
        return True, -1

    try:
        has_quota, quota = await new_api_proxy.check_quota(user.new_api_session)
        return has_quota, quota
    except Exception as e:
        # 降级策略New API 不可用时允许使用
        logger.warning(f"配额检查失败: {e}")
        return True, -1

# 在 chat 处理函数中调用
async def handle_chat(request, user):
    has_quota, quota = await check_quota_before_chat(user)
    if not has_quota:
        raise HTTPException(
            status_code=402,
            detail={
                "error": "quota_exceeded",
                "message": "配额不足,请充值",
                "topup_url": f"{settings.NEW_API_BASE_URL}/console/topup"
            }
        )
    # ... 继续处理聊天请求 ...

Step 6: 注册路由

fastapi_app.py 中注册新路由:

from routes.payment import router as payment_router

app.include_router(payment_router)

Step 7: 数据库迁移

-- 在 agent_user 表添加 New API Session
ALTER TABLE agent_user ADD COLUMN new_api_session VARCHAR(500);
ALTER TABLE agent_user ADD COLUMN new_api_user_id INTEGER;

环境变量

# .env
NEW_API_BASE_URL=https://new-api.example.com
NEW_API_TIMEOUT=30

前端集成指南

1. 充值页面流程

// 1. 获取充值配置
const paymentInfo = await fetch('/api/v1/payment/info');

// 2. 根据返回的支付方式展示选项
const { pay_methods, creem_products, amount_options, discount } = paymentInfo.data;

// 3. 用户选择套餐或输入金额
// - 如果选择 Creem 套餐:调用 /api/v1/payment/creem
// - 如果选择自定义金额:调用 /api/v1/payment/create

// 4. 获取支付链接并跳转
const result = await fetch('/api/v1/payment/creem', {
  method: 'POST',
  body: JSON.stringify({ product_id: 'prod_xxx' })
});

// 5. 跳转到支付页面
window.location.href = result.data.checkout_url;

2. 显示配额余额

// 获取用户配额
const quota = await fetch('/api/v1/quota');

// 显示格式化后的配额
// quota.quota - 当前可用配额
// quota.used_quota - 已使用配额

3. 配额不足处理

// 当聊天返回 402 错误时
if (error.status === 402) {
  const { topup_url } = error.detail;
  // 显示充值提示
  // 提供"去充值"按钮,跳转到 topup_url
}

用户体验流程

充值流程(易支付)

1. 用户点击"充值"按钮
2. 前端调用 GET /api/v1/payment/info 获取配置
3. 展示支付方式和预设金额选项
4. 用户选择金额和支付方式
5. 前端调用 POST /api/v1/payment/create
6. 获取支付链接,跳转到支付页面
7. 用户完成支付
8. 支付成功后跳转回前端

充值流程Creem 套餐)

1. 用户点击"购买套餐"
2. 前端调用 GET /api/v1/payment/info 获取 creem_products
3. 展示套餐列表(名称、价格、配额)
4. 用户选择套餐
5. 前端调用 POST /api/v1/payment/creem
6. 获取 checkout_url跳转到 Creem 支付页面
7. 用户完成支付
8. 自动跳转回前端

配额检查

1. 用户发送聊天消息
2. qwen-agent 检查配额
3. 如果配额充足:正常处理
4. 如果配额不足:返回 402 + 充值链接
5. 前端显示"配额不足,请充值"提示

总结

简化方案的核心是 不重复存储数据,让 New API 作为唯一的支付数据源。qwen-agent 只负责:

  1. 用户登录时同步 New API session
  2. 代理转发支付相关请求
  3. 实时查询配额进行检查

套餐列表获取方式

套餐列表有两个来源:

  1. Creem 产品列表(推荐)

    • 接口:/api/user/topup/infocreem_products 字段
    • 适用于一次性购买配额包
  2. 订阅套餐列表

    • 接口:/api/subscription/plans
    • 适用于周期性订阅

与原方案对比

项目 原方案 简化方案
新建数据库表 4 个 0 个
新增字段 多个 2 个
本地配额管理 需要 不需要
Webhook 处理 需要 不需要
交易记录 本地存储 New API 管理
实施周期 2-3 周 3-5 天

文件变更清单

文件 操作 说明
utils/settings.py 修改 添加 New API 环境变量
utils/new_api_proxy.py 新建 New API 代理工具类
routes/payment.py 新建 支付代理路由
routes/bot_manager.py 修改 登录逻辑添加 session 同步
routes/chat.py 修改 添加配额检查
fastapi_app.py 修改 注册新路由
数据库迁移 执行 添加 new_api_session 字段

依赖添加

poetry add httpx