Compare commits

...

2 Commits

Author SHA1 Message Date
朱潮
98f23fa346 update payment 2026-02-26 00:35:42 +08:00
朱潮
d7dfd8810e update payment 2026-02-25 23:42:44 +08:00
10 changed files with 1817 additions and 1433 deletions

View File

@ -60,23 +60,7 @@ CREATE TABLE IF NOT EXISTS agent_admin_tokens (
CREATE INDEX IF NOT EXISTS idx_agent_admin_tokens_token ON agent_admin_tokens(token);
CREATE INDEX IF NOT EXISTS idx_agent_admin_tokens_expires ON agent_admin_tokens(expires_at);
-- 5. 创建 agent_models 表
CREATE TABLE IF NOT EXISTS agent_models (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
provider VARCHAR(100) NOT NULL,
model VARCHAR(255) NOT NULL,
server VARCHAR(500),
api_key VARCHAR(500),
is_default BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- agent_models 索引
CREATE INDEX IF NOT EXISTS idx_agent_models_is_default ON agent_models(is_default);
-- 6. 创建 agent_mcp_servers 表
-- 6. 创建 agent_mcp_servers 表 (agent_models 表已废弃,模型管理已迁移到 New API)
CREATE TABLE IF NOT EXISTS agent_mcp_servers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
bot_id UUID REFERENCES agent_bots(id) ON DELETE CASCADE,

View File

@ -81,7 +81,7 @@ from utils.log_util.logger import init_with_fastapi
logger = logging.getLogger('app')
# Import route modules
from routes import chat, files, projects, system, skill_manager, database, bot_manager, knowledge_base
from routes import chat, files, projects, system, skill_manager, database, bot_manager, knowledge_base, payment
@asynccontextmanager
@ -200,6 +200,7 @@ app.include_router(system.router)
app.include_router(skill_manager.router)
app.include_router(database.router)
app.include_router(bot_manager.router)
app.include_router(payment.router)
# 注册文件管理API路由
app.include_router(file_manager_router)

View File

@ -0,0 +1,756 @@
# New API 支付系统集成计划(简化版)
## 概述
将 New API 作为 qwen-client 和 qwen-agent 的支付后端。qwen-agent 只做代理转发,不维护配额和交易数据。
**创建日期**: 2026-02-25
**更新日期**: 2026-02-25
**状态**: 待审核
**复杂度**: 中
---
## 核心设计
### 设计原则
1. **New API 是数据源**: 所有配额、交易、订单数据由 New API 维护
2. **qwen-agent 是代理层**: 只转发请求,不存储支付相关数据
3. **Session 同步登录**: 用户登录 qwen-agent 时,同步登录 New API 并存储 session
4. **Cookie 转发**: 代理请求时转发用户的 session cookie 实现 New API 鉴权
### 架构图
```
┌─────────────┐ ┌─────────────────┐ ┌─────────────┐
│ qwen-client │ ───► │ qwen-agent │ ───► │ New API │
│ (前端) │ │ (代理 + Session)│ │ (支付后端) │
└─────────────┘ └─────────────────┘ └─────────────┘
存储用户对应的
New API Session Cookie
```
### 支付方式支持
| 支付方式 | New API 接口 | 说明 |
|---------|-------------|------|
| 易支付 | `/api/user/pay` | 通用支付方式(微信/支付宝等) |
| Stripe | `/api/user/stripe/pay` | 国际信用卡支付 |
| Creem | `/api/user/creem/pay` | 新支付方式,支持套餐 |
---
## 数据库设计
### 只需新增一个字段
```sql
-- 在 agent_user 表添加 New API Session
ALTER TABLE agent_user ADD COLUMN new_api_session VARCHAR(500);
ALTER TABLE agent_user ADD COLUMN new_api_user_id INTEGER;
```
**不需要新建表**,所有配额、交易、订单数据都在 New API 中。
---
## 核心流程
### 1. 用户登录同步
```
用户登录 qwen-client
qwen-agent 验证本地用户
qwen-agent 用相同凭证调用 New API /api/user/login
├─ 成功 → 存储 new_api_session 到 agent_user 表
└─ 失败 → 如果是用户不存在,先在 New API 注册再登录
返回 qwen-agent 的 token 给前端
```
### 2. 支付/充值流程
```
用户点击充值
qwen-agent 读取用户的 new_api_session
代理请求到 New API (带 session cookie)
返回支付链接给前端
用户完成支付 (New API 直接处理)
前端跳转到 New API 的用户中心查看结果
```
### 3. 配额检查
```
用户发起聊天请求
qwen-agent 调用 New API /api/user/self 获取实时配额
├─ 配额不足 → 返回 402 错误 + New API 充值链接
└─ 配额充足 → 处理请求
(可选) 调用 New API 扣减配额
```
---
## API 设计
### qwen-agent 新增接口
| 端点 | 方法 | 描述 | 代理的 New API 接口 |
|------|------|------|---------------------|
| `/api/v1/payment/info` | GET | 获取充值配置(含套餐) | `/api/user/topup/info` |
| `/api/v1/payment/plans` | GET | 获取订阅套餐列表 | `/api/subscription/plans` |
| `/api/v1/payment/create` | POST | 创建支付订单(易支付) | `/api/user/pay` |
| `/api/v1/payment/stripe` | POST | 创建 Stripe 支付 | `/api/user/stripe/pay` |
| `/api/v1/payment/creem` | POST | 创建 Creem 支付(套餐) | `/api/user/creem/pay` |
| `/api/v1/payment/amount` | POST | 计算实际支付金额 | `/api/user/amount` |
| `/api/v1/quota` | GET | 查询用户配额 | `/api/user/self` |
| `/api/v1/topup/history` | GET | 充值记录 | `/api/user/topup/self` |
所有接口都是代理转发,使用存储的 new_api_session。
---
## New API 接口详情
### 1. 获取充值配置 `/api/user/topup/info` (GET)
**鉴权**: 需要登录Cookie
**响应示例**:
```json
{
"success": true,
"message": "",
"data": {
"enable_online_topup": true,
"enable_stripe_topup": true,
"enable_creem_topup": true,
"creem_products": [
{
"productId": "prod_xxx",
"name": "基础套餐",
"price": 9.99,
"currency": "USD",
"quota": 500000
}
],
"pay_methods": [
{"name": "微信支付", "type": "wxpay", "color": "#07C160", "min_topup": "1"},
{"name": "支付宝", "type": "alipay", "color": "#1677FF", "min_topup": "1"}
],
"min_topup": 1,
"stripe_min_topup": 1,
"amount_options": [100, 500, 1000, 5000],
"discount": {
"100": 1.0,
"500": 0.95,
"1000": 0.9,
"5000": 0.85
}
}
}
```
**字段说明**:
- `enable_online_topup`: 是否启用易支付
- `enable_stripe_topup`: 是否启用 Stripe 支付
- `enable_creem_topup`: 是否启用 Creem 支付
- `creem_products`: Creem 套餐列表(**这是套餐列表的数据来源**
- `pay_methods`: 支持的支付方式列表
- `min_topup`: 最小充值额度
- `amount_options`: 预设充值额度选项
- `discount`: 折扣配置(额度 -> 折扣率)
### 2. 获取订阅套餐列表 `/api/subscription/plans` (GET)
**鉴权**: 需要登录Cookie
**响应示例**:
```json
{
"success": true,
"data": [
{
"plan": {
"id": 1,
"title": "月度订阅",
"subtitle": "适合轻度用户",
"price_amount": 9.99,
"currency": "USD",
"duration_unit": "month",
"duration_value": 1,
"total_amount": 500000,
"enabled": true,
"sort_order": 100,
"stripe_price_id": "price_xxx",
"creem_product_id": "prod_xxx",
"max_purchase_per_user": 0,
"upgrade_group": "",
"quota_reset_period": "monthly"
}
}
]
}
```
**字段说明**:
- `title`: 套餐名称
- `price_amount`: 价格
- `duration_unit`: 周期单位month/year/custom
- `duration_value`: 周期数值
- `total_amount`: 包含的配额
- `stripe_price_id`: Stripe 价格 ID
- `creem_product_id`: Creem 产品 ID
### 3. 创建易支付订单 `/api/user/pay` (POST)
**请求**:
```json
{
"amount": 1000,
"payment_method": "wxpay"
}
```
**响应**:
```json
{
"message": "success",
"data": {...},
"url": "https://pay.example.com/submit.php?..."
}
```
### 4. 创建 Creem 支付 `/api/user/creem/pay` (POST)
**请求**:
```json
{
"product_id": "prod_xxx",
"payment_method": "creem"
}
```
**响应**:
```json
{
"message": "success",
"data": {
"checkout_url": "https://checkout.creem.io/...",
"order_id": "ref_xxx"
}
}
```
### 5. 获取用户信息(含配额)`/api/user/self` (GET)
**鉴权**: 需要登录Cookie
**响应示例**:
```json
{
"success": true,
"data": {
"id": 1,
"username": "user",
"email": "user@example.com",
"quota": 500000,
"used_quota": 10000,
"group": "default"
}
}
```
### 6. 用户登录 `/api/user/login` (POST)
**请求**:
```json
{
"username": "user",
"password": "password"
}
```
**响应**:
- 返回 Set-Cookie header 包含 session
- 用户信息 JSON
---
## 实施步骤
### Step 1: 添加环境变量配置
`utils/settings.py` 添加 New API 配置:
```python
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# ... 现有配置 ...
# New API 配置
NEW_API_BASE_URL: str = "https://new-api.example.com"
NEW_API_TIMEOUT: int = 30
class Config:
env_file = ".env"
```
### Step 2: 创建 New API 代理工具类
创建 `utils/new_api_proxy.py`
```python
import httpx
from typing import Optional, Any
from utils.settings import Settings
settings = Settings()
class NewAPIProxy:
"""New API 代理工具类"""
def __init__(self):
self.base_url = settings.NEW_API_BASE_URL
self.timeout = settings.NEW_API_TIMEOUT
async def login(self, username: str, password: str) -> Optional[str]:
"""登录 New API 并返回 session cookie"""
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self.base_url}/api/user/login",
json={"username": username, "password": password},
timeout=self.timeout
)
if resp.status_code == 200 and resp.json().get("success"):
# 提取 session cookie
cookies = resp.cookies.get("session")
return cookies
return None
async def register(self, username: str, password: str, email: str) -> bool:
"""在 New API 注册用户"""
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self.base_url}/api/user/register",
json={"username": username, "password": password, "email": email},
timeout=self.timeout
)
return resp.status_code == 200
async def proxy_get(
self,
endpoint: str,
session_cookie: str
) -> dict:
"""代理 GET 请求到 New API"""
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{self.base_url}{endpoint}",
cookies={"session": session_cookie},
timeout=self.timeout
)
return resp.json()
async def proxy_post(
self,
endpoint: str,
data: dict,
session_cookie: str
) -> dict:
"""代理 POST 请求到 New API"""
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self.base_url}{endpoint}",
json=data,
cookies={"session": session_cookie},
timeout=self.timeout
)
return resp.json()
async def get_user_info(self, session_cookie: str) -> Optional[dict]:
"""获取用户信息(含配额)"""
try:
result = await self.proxy_get("/api/user/self", session_cookie)
if result.get("success"):
return result.get("data")
except:
pass
return None
async def check_quota(self, session_cookie: str) -> tuple[bool, int]:
"""检查用户配额"""
user_info = await self.get_user_info(session_cookie)
if user_info:
quota = user_info.get("quota", 0)
return quota > 0, quota
# 降级策略:无法获取配额时允许使用
return True, -1
# 全局实例
new_api_proxy = NewAPIProxy()
```
### Step 3: 修改登录逻辑
修改 `routes/bot_manager.py` 中的登录函数:
```python
from utils.new_api_proxy import new_api_proxy
async def login_user(username: str, password: str):
# 1. 验证本地用户
user = await verify_local_user(username, password)
# 2. 同步登录 New API
new_api_session = await new_api_proxy.login(username, password)
# 3. 如果登录失败,尝试注册
if not new_api_session:
await new_api_proxy.register(username, password, user.email)
new_api_session = await new_api_proxy.login(username, password)
# 4. 存储 session
if new_api_session:
await db.execute(
"UPDATE agent_user SET new_api_session = $1 WHERE id = $2",
new_api_session, user.id
)
# 5. 返回本地 token
return generate_local_token(user)
```
### Step 4: 创建支付代理路由
创建 `routes/payment.py`
```python
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from typing import Optional
from utils.new_api_proxy import new_api_proxy
from utils.auth import get_current_user
router = APIRouter(prefix="/api/v1", tags=["payment"])
class CreatePaymentRequest(BaseModel):
amount: int
payment_method: str
class CreemPaymentRequest(BaseModel):
product_id: str
payment_method: str = "creem"
class AmountRequest(BaseModel):
amount: int
@router.get("/payment/info")
async def get_payment_info(user = Depends(get_current_user)):
"""获取充值配置(含套餐列表)"""
if not user.new_api_session:
raise HTTPException(status_code=401, detail="未同步 New API session")
return await new_api_proxy.proxy_get("/api/user/topup/info", user.new_api_session)
@router.get("/payment/plans")
async def get_subscription_plans(user = Depends(get_current_user)):
"""获取订阅套餐列表"""
if not user.new_api_session:
raise HTTPException(status_code=401, detail="未同步 New API session")
return await new_api_proxy.proxy_get("/api/subscription/plans", user.new_api_session)
@router.post("/payment/create")
async def create_payment(req: CreatePaymentRequest, user = Depends(get_current_user)):
"""创建支付订单(易支付)"""
if not user.new_api_session:
raise HTTPException(status_code=401, detail="未同步 New API session")
return await new_api_proxy.proxy_post(
"/api/user/pay",
{"amount": req.amount, "payment_method": req.payment_method},
user.new_api_session
)
@router.post("/payment/stripe")
async def create_stripe_payment(req: AmountRequest, user = Depends(get_current_user)):
"""创建 Stripe 支付"""
if not user.new_api_session:
raise HTTPException(status_code=401, detail="未同步 New API session")
return await new_api_proxy.proxy_post(
"/api/user/stripe/pay",
{"amount": req.amount},
user.new_api_session
)
@router.post("/payment/creem")
async def create_creem_payment(req: CreemPaymentRequest, user = Depends(get_current_user)):
"""创建 Creem 支付(套餐)"""
if not user.new_api_session:
raise HTTPException(status_code=401, detail="未同步 New API session")
return await new_api_proxy.proxy_post(
"/api/user/creem/pay",
{"product_id": req.product_id, "payment_method": req.payment_method},
user.new_api_session
)
@router.post("/payment/amount")
async def get_payment_amount(req: AmountRequest, user = Depends(get_current_user)):
"""计算实际支付金额"""
if not user.new_api_session:
raise HTTPException(status_code=401, detail="未同步 New API session")
return await new_api_proxy.proxy_post(
"/api/user/amount",
{"amount": req.amount},
user.new_api_session
)
@router.get("/quota")
async def get_quota(user = Depends(get_current_user)):
"""获取用户配额"""
if not user.new_api_session:
raise HTTPException(status_code=401, detail="未同步 New API session")
user_info = await new_api_proxy.get_user_info(user.new_api_session)
if user_info:
return {
"quota": user_info.get("quota", 0),
"used_quota": user_info.get("used_quota", 0)
}
raise HTTPException(status_code=500, detail="获取配额失败")
@router.get("/topup/history")
async def get_topup_history(user = Depends(get_current_user)):
"""获取充值记录"""
if not user.new_api_session:
raise HTTPException(status_code=401, detail="未同步 New API session")
return await new_api_proxy.proxy_get("/api/user/topup/self", user.new_api_session)
```
### Step 5: 在 Chat 路由添加配额检查
`routes/chat.py` 添加配额检查中间件:
```python
from utils.new_api_proxy import new_api_proxy
async def check_quota_before_chat(user) -> tuple[bool, int]:
"""聊天前检查配额"""
if not user.new_api_session:
# 没有 session允许通过可能是非支付用户
return True, -1
try:
has_quota, quota = await new_api_proxy.check_quota(user.new_api_session)
return has_quota, quota
except Exception as e:
# 降级策略New API 不可用时允许使用
logger.warning(f"配额检查失败: {e}")
return True, -1
# 在 chat 处理函数中调用
async def handle_chat(request, user):
has_quota, quota = await check_quota_before_chat(user)
if not has_quota:
raise HTTPException(
status_code=402,
detail={
"error": "quota_exceeded",
"message": "配额不足,请充值",
"topup_url": f"{settings.NEW_API_BASE_URL}/console/topup"
}
)
# ... 继续处理聊天请求 ...
```
### Step 6: 注册路由
`fastapi_app.py` 中注册新路由:
```python
from routes.payment import router as payment_router
app.include_router(payment_router)
```
### Step 7: 数据库迁移
```sql
-- 在 agent_user 表添加 New API Session
ALTER TABLE agent_user ADD COLUMN new_api_session VARCHAR(500);
ALTER TABLE agent_user ADD COLUMN new_api_user_id INTEGER;
```
---
## 环境变量
```bash
# .env
NEW_API_BASE_URL=https://new-api.example.com
NEW_API_TIMEOUT=30
```
---
## 前端集成指南
### 1. 充值页面流程
```typescript
// 1. 获取充值配置
const paymentInfo = await fetch('/api/v1/payment/info');
// 2. 根据返回的支付方式展示选项
const { pay_methods, creem_products, amount_options, discount } = paymentInfo.data;
// 3. 用户选择套餐或输入金额
// - 如果选择 Creem 套餐:调用 /api/v1/payment/creem
// - 如果选择自定义金额:调用 /api/v1/payment/create
// 4. 获取支付链接并跳转
const result = await fetch('/api/v1/payment/creem', {
method: 'POST',
body: JSON.stringify({ product_id: 'prod_xxx' })
});
// 5. 跳转到支付页面
window.location.href = result.data.checkout_url;
```
### 2. 显示配额余额
```typescript
// 获取用户配额
const quota = await fetch('/api/v1/quota');
// 显示格式化后的配额
// quota.quota - 当前可用配额
// quota.used_quota - 已使用配额
```
### 3. 配额不足处理
```typescript
// 当聊天返回 402 错误时
if (error.status === 402) {
const { topup_url } = error.detail;
// 显示充值提示
// 提供"去充值"按钮,跳转到 topup_url
}
```
---
## 用户体验流程
### 充值流程(易支付)
```
1. 用户点击"充值"按钮
2. 前端调用 GET /api/v1/payment/info 获取配置
3. 展示支付方式和预设金额选项
4. 用户选择金额和支付方式
5. 前端调用 POST /api/v1/payment/create
6. 获取支付链接,跳转到支付页面
7. 用户完成支付
8. 支付成功后跳转回前端
```
### 充值流程Creem 套餐)
```
1. 用户点击"购买套餐"
2. 前端调用 GET /api/v1/payment/info 获取 creem_products
3. 展示套餐列表(名称、价格、配额)
4. 用户选择套餐
5. 前端调用 POST /api/v1/payment/creem
6. 获取 checkout_url跳转到 Creem 支付页面
7. 用户完成支付
8. 自动跳转回前端
```
### 配额检查
```
1. 用户发送聊天消息
2. qwen-agent 检查配额
3. 如果配额充足:正常处理
4. 如果配额不足:返回 402 + 充值链接
5. 前端显示"配额不足,请充值"提示
```
---
## 总结
简化方案的核心是 **不重复存储数据**,让 New API 作为唯一的支付数据源。qwen-agent 只负责:
1. 用户登录时同步 New API session
2. 代理转发支付相关请求
3. 实时查询配额进行检查
### 套餐列表获取方式
套餐列表有两个来源:
1. **Creem 产品列表**(推荐)
- 接口:`/api/user/topup/info` → `creem_products` 字段
- 适用于一次性购买配额包
2. **订阅套餐列表**
- 接口:`/api/subscription/plans`
- 适用于周期性订阅
### 与原方案对比
| 项目 | 原方案 | 简化方案 |
|------|--------|----------|
| 新建数据库表 | 4 个 | 0 个 |
| 新增字段 | 多个 | 2 个 |
| 本地配额管理 | 需要 | 不需要 |
| Webhook 处理 | 需要 | 不需要 |
| 交易记录 | 本地存储 | New API 管理 |
| 实施周期 | 2-3 周 | 3-5 天 |
### 文件变更清单
| 文件 | 操作 | 说明 |
|------|------|------|
| `utils/settings.py` | 修改 | 添加 New API 环境变量 |
| `utils/new_api_proxy.py` | 新建 | New API 代理工具类 |
| `routes/payment.py` | 新建 | 支付代理路由 |
| `routes/bot_manager.py` | 修改 | 登录逻辑添加 session 同步 |
| `routes/chat.py` | 修改 | 添加配额检查 |
| `fastapi_app.py` | 修改 | 注册新路由 |
| 数据库迁移 | 执行 | 添加 new_api_session 字段 |
### 依赖添加
```bash
poetry add httpx
```

View File

@ -1832,13 +1832,7 @@
<div class="settings-tab-content active" data-tab="basic">
<div class="settings-grid">
<div class="settings-group">
<div style="display: flex; align-items: center; justify-content: space-between; margin-bottom: 8px;">
<label class="settings-label" for="model-select" style="margin-bottom: 0;">模型</label>
<a href="model-manager.html" style="font-size: 12px; color: var(--primary); text-decoration: none; display: flex; align-items: center; gap: 4px;">
<i data-lucide="settings" style="width: 12px; height: 12px;"></i>
管理模型
</a>
</div>
<label class="settings-label" for="model-select">模型</label>
<select id="model-select" class="settings-select">
<option value="">选择模型...</option>
<!-- 动态生成 -->

File diff suppressed because it is too large Load Diff

View File

@ -16,6 +16,7 @@ from pydantic import BaseModel
from agent.db_pool_manager import get_db_pool_manager
from utils.fastapi_utils import extract_api_key_from_auth
from utils.new_api_proxy import get_new_api_proxy
logger = logging.getLogger('app')
@ -361,38 +362,7 @@ class UserSearchResponse(BaseModel):
email: Optional[str] = None
# --- 模型相关 ---
class ModelCreate(BaseModel):
"""创建模型请求"""
name: str
provider: str
model: str
server: Optional[str] = None
api_key: Optional[str] = None
is_default: bool = False
class ModelUpdate(BaseModel):
"""更新模型请求"""
name: Optional[str] = None
provider: Optional[str] = None
model: Optional[str] = None
server: Optional[str] = None
api_key: Optional[str] = None
is_default: Optional[bool] = None
class ModelResponse(BaseModel):
"""模型响应"""
id: str
name: str
provider: str
model: str
server: Optional[str]
api_key: Optional[str] # 掩码显示
is_default: bool
created_at: str
updated_at: str
# --- 模型相关(已废弃,模型管理已迁移到 New API---
# --- Bot 相关 ---
@ -633,6 +603,51 @@ async def migrate_bot_owner_and_shares():
""")
logger.info("is_admin column added successfully")
# 3. 添加 new_api_session 字段
await cursor.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'agent_user' AND column_name = 'new_api_session'
""")
has_session_column = await cursor.fetchone()
if not has_session_column:
logger.info("Adding new_api_session column to agent_user table")
await cursor.execute("""
ALTER TABLE agent_user
ADD COLUMN new_api_session VARCHAR(500)
""")
logger.info("new_api_session column added successfully")
# 检查并添加 new_api_user_id 字段
await cursor.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'agent_user' AND column_name = 'new_api_user_id'
""")
has_user_id_column = await cursor.fetchone()
if not has_user_id_column:
logger.info("Adding new_api_user_id column to agent_user table")
await cursor.execute("""
ALTER TABLE agent_user
ADD COLUMN new_api_user_id INTEGER
""")
logger.info("new_api_user_id column added successfully")
# 检查并添加 new_api_token 字段(用于存储 API Key
await cursor.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'agent_user' AND column_name = 'new_api_token'
""")
has_token_column = await cursor.fetchone()
if not has_token_column:
logger.info("Adding new_api_token column to agent_user table")
await cursor.execute("""
ALTER TABLE agent_user
ADD COLUMN new_api_token VARCHAR(255)
""")
logger.info("new_api_token column added successfully")
# 2. 创建 bot_shares 表
await cursor.execute("""
SELECT EXISTS (
@ -955,22 +970,7 @@ async def init_bot_manager_tables():
"CREATE INDEX IF NOT EXISTS idx_agent_admin_tokens_token ON agent_admin_tokens(token)",
"CREATE INDEX IF NOT EXISTS idx_agent_admin_tokens_expires ON agent_admin_tokens(expires_at)",
# models 表
"""
CREATE TABLE IF NOT EXISTS agent_models (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
provider VARCHAR(100) NOT NULL,
model VARCHAR(255) NOT NULL,
server VARCHAR(500),
api_key VARCHAR(500),
is_default BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
)
""",
# models 索引
"CREATE INDEX IF NOT EXISTS idx_agent_models_is_default ON agent_models(is_default)",
# agent_models 表已废弃,模型管理已迁移到 New API
# bots 表(合并 settings 为 JSONB 字段)
"""
@ -1048,250 +1048,14 @@ def datetime_to_str(dt: datetime) -> str:
return dt.isoformat() if dt else ""
# ============== 模型管理 API ==============
@router.get("/api/v1/models", response_model=List[ModelResponse])
async def get_models(authorization: Optional[str] = Header(None)):
"""
获取所有模型配置
Args:
authorization: Bearer token
Returns:
List[ModelResponse]: 模型列表
"""
verify_auth(authorization)
pool = get_db_pool_manager().pool
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute("""
SELECT id, name, provider, model, server, api_key, is_default, created_at, updated_at
FROM agent_models
ORDER BY is_default DESC, created_at DESC
""")
rows = await cursor.fetchall()
return [
ModelResponse(
id=str(row[0]),
name=row[1],
provider=row[2],
model=row[3],
server=row[4],
api_key=mask_api_key(row[5]),
is_default=row[6],
created_at=datetime_to_str(row[7]),
updated_at=datetime_to_str(row[8])
)
for row in rows
]
@router.post("/api/v1/models", response_model=ModelResponse)
async def create_model(request: ModelCreate, authorization: Optional[str] = Header(None)):
"""
创建新模型
Args:
request: 模型创建请求
authorization: Bearer token
Returns:
ModelResponse: 创建的模型信息
"""
verify_auth(authorization)
pool = get_db_pool_manager().pool
# 如果设置为默认,需要先取消其他默认模型
if request.is_default:
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute("UPDATE agent_models SET is_default = FALSE WHERE is_default = TRUE")
await conn.commit()
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute("""
INSERT INTO agent_models (name, provider, model, server, api_key, is_default)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id, created_at, updated_at
""", (
request.name,
request.provider,
request.model,
request.server,
request.api_key,
request.is_default
))
row = await cursor.fetchone()
await conn.commit()
return ModelResponse(
id=str(row[0]),
name=request.name,
provider=request.provider,
model=request.model,
server=request.server,
api_key=mask_api_key(request.api_key),
is_default=request.is_default,
created_at=datetime_to_str(row[1]),
updated_at=datetime_to_str(row[2])
)
@router.put("/api/v1/models/{model_id}", response_model=ModelResponse)
async def update_model(
model_id: str,
request: ModelUpdate,
authorization: Optional[str] = Header(None)
):
"""
更新模型
Args:
model_id: 模型 ID
request: 模型更新请求
authorization: Bearer token
Returns:
ModelResponse: 更新后的模型信息
"""
verify_auth(authorization)
pool = get_db_pool_manager().pool
# 构建更新字段
update_fields = []
values = []
if request.name is not None:
update_fields.append("name = %s")
values.append(request.name)
if request.provider is not None:
update_fields.append("provider = %s")
values.append(request.provider)
if request.model is not None:
update_fields.append("model = %s")
values.append(request.model)
if request.server is not None:
update_fields.append("server = %s")
values.append(request.server)
if request.api_key is not None:
update_fields.append("api_key = %s")
values.append(request.api_key)
if request.is_default is not None:
update_fields.append("is_default = %s")
values.append(request.is_default)
if not update_fields:
raise HTTPException(status_code=400, detail="No fields to update")
update_fields.append("updated_at = NOW()")
values.append(model_id)
# 如果设置为默认,需要先取消其他默认模型
if request.is_default is True:
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute("UPDATE agent_models SET is_default = FALSE WHERE is_default = TRUE")
await conn.commit()
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute(f"""
UPDATE agent_models
SET {', '.join(update_fields)}
WHERE id = %s
RETURNING id, name, provider, model, server, api_key, is_default, created_at, updated_at
""", values)
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Model not found")
await conn.commit()
return ModelResponse(
id=str(row[0]),
name=row[1],
provider=row[2],
model=row[3],
server=row[4],
api_key=mask_api_key(row[5]),
is_default=row[6],
created_at=datetime_to_str(row[7]),
updated_at=datetime_to_str(row[8])
)
@router.delete("/api/v1/models/{model_id}", response_model=SuccessResponse)
async def delete_model(model_id: str, authorization: Optional[str] = Header(None)):
"""
删除模型
Args:
model_id: 模型 ID
authorization: Bearer token
Returns:
SuccessResponse: 删除结果
"""
verify_auth(authorization)
pool = get_db_pool_manager().pool
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute("DELETE FROM agent_models WHERE id = %s RETURNING id", (model_id,))
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Model not found")
await conn.commit()
return SuccessResponse(success=True, message="Model deleted successfully")
@router.patch("/api/v1/models/{model_id}/default", response_model=SuccessResponse)
async def set_default_model(model_id: str, authorization: Optional[str] = Header(None)):
"""
设置默认模型
Args:
model_id: 模型 ID
authorization: Bearer token
Returns:
SuccessResponse: 设置结果
"""
verify_auth(authorization)
pool = get_db_pool_manager().pool
# 首先检查模型是否存在
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT id FROM agent_models WHERE id = %s", (model_id,))
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Model not found")
# 取消所有默认设置
await cursor.execute("UPDATE agent_models SET is_default = FALSE WHERE is_default = TRUE")
# 设置新的默认模型
await cursor.execute("UPDATE agent_models SET is_default = TRUE WHERE id = %s", (model_id,))
await conn.commit()
return SuccessResponse(success=True, message="Default model updated successfully")
# ============== 模型管理 API已废弃已迁移到 New API=============
# 以下模型管理接口已被移除,现在使用 /api/v1/newapi/models 从 New API 获取模型列表
# - GET /api/v1/models
# - POST /api/v1/models
# - PUT /api/v1/models/{model_id}
# - DELETE /api/v1/models/{model_id}
# - PATCH /api/v1/models/{model_id}/default
# ============== Bot 管理 API ==============
@ -1593,23 +1357,20 @@ async def get_bot_settings(bot_uuid: str, authorization: Optional[str] = Header(
is_owner = (str(owner_id) == str(user_id))
# 获取关联的模型信息
# 注意model_id 现在来自 New API格式为 "Provider/ModelName"
# 不再从本地 agent_models 表查询
model_info = None
model_id = settings.get('model_id')
if model_id:
await cursor.execute("""
SELECT id, name, provider, model, server, api_key
FROM agent_models WHERE id = %s
""", (model_id,))
model_row = await cursor.fetchone()
if model_row:
model_info = ModelInfo(
id=str(model_row[0]),
name=model_row[1],
provider=model_row[2],
model=model_row[3],
server=model_row[4],
api_key=mask_api_key(model_row[5])
)
# 直接使用 model_id 作为模型信息
model_info = ModelInfo(
id=model_id,
name=model_id,
provider=model_id.split('/')[0] if '/' in model_id else 'new-api',
model=model_id.split('/')[-1] if '/' in model_id else model_id,
server=None,
api_key=None
)
# 处理 dataset_ids将字符串转换为数组
dataset_ids = settings.get('dataset_ids')
@ -1681,13 +1442,7 @@ async def update_bot_settings(
# 处理 model_id将空字符串转换为 None
model_id_value = request.model_id.strip() if request.model_id else None
# 验证 model_id 是否存在
if model_id_value:
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT id FROM agent_models WHERE id = %s", (model_id_value,))
if not await cursor.fetchone():
raise HTTPException(status_code=400, detail=f"Model with id '{request.model_id}' not found")
# 注意model_id 现在来自 New API不再在本地 agent_models 表中验证
# 构建 JSONB 更新对象
update_json = {}
@ -2256,6 +2011,44 @@ async def user_register(request: UserRegisterRequest):
""", (request.username, request.email, password_hash))
user_id, created_at = await cursor.fetchone()
# 5. 同步注册到 New API静默失败
new_api_session = None
new_api_user_id = None
try:
proxy = get_new_api_proxy()
# 使用假邮箱或提供的邮箱注册
register_email = request.email or f"{request.username}@fake.local"
register_result = await proxy.register(request.username, request.password, register_email)
logger.info(f"New API register result for {request.username}: success={register_result.get('success')}, message={register_result.get('message', 'N/A')}")
if register_result.get("success"):
# 注册成功后立即登录获取 session 和 user_id
login_result = await proxy.login(request.username, request.password)
if login_result.get("success"):
new_api_session = login_result.get("session")
new_api_user_id = login_result.get("data", {}).get("id")
await cursor.execute("""
UPDATE agent_user
SET new_api_session = %s, new_api_user_id = %s
WHERE id = %s
""", (new_api_session, new_api_user_id, user_id))
logger.info(f"New API session and user_id stored for user {request.username}")
else:
# 如果注册失败(可能用户已存在),尝试直接登录
logger.warning(f"New API register failed, trying login: {register_result.get('message')}")
login_result = await proxy.login(request.username, request.password)
if login_result.get("success"):
new_api_session = login_result.get("session")
new_api_user_id = login_result.get("data", {}).get("id")
await cursor.execute("""
UPDATE agent_user
SET new_api_session = %s, new_api_user_id = %s
WHERE id = %s
""", (new_api_session, new_api_user_id, user_id))
logger.info(f"New API session and user_id stored for user {request.username} via login")
except Exception as e:
logger.warning(f"New API sync failed for user {request.username}: {e}")
# 6. 生成 token
token = secrets.token_urlsafe(32)
expires_at = datetime.now() + timedelta(hours=TOKEN_EXPIRE_HOURS)
@ -2337,6 +2130,69 @@ async def user_login(request: UserLoginRequest):
VALUES (%s, %s, %s)
""", (user_id, token, expires_at))
# 5. 尝试同步 New API session 和令牌(静默失败)
new_api_session = None
new_api_user_id = None
new_api_token = None
try:
proxy = get_new_api_proxy()
# 使用 username 登录 New API
logger.info(f"Attempting New API login for user {username}")
login_result = await proxy.login(request.username, request.password)
logger.info(f"New API login result: success={login_result.get('success')}, message={login_result.get('message', 'N/A')}")
if login_result.get("success"):
logger.info(f"New API login successful for user {username}")
# 存储 session 和 user_id
new_api_session = login_result.get("session")
new_api_user_id = login_result.get("data", {}).get("id")
if new_api_session or new_api_user_id:
# 获取或创建令牌
cookies = {"session": new_api_session} if new_api_session else {}
token_result = await proxy.get_or_create_token(cookies, new_api_user_id)
if token_result.get("success") and token_result.get("data"):
new_api_token = token_result["data"].get("key")
logger.info(f"Got New API token for user {username}")
await cursor.execute("""
UPDATE agent_user
SET new_api_session = %s, new_api_user_id = %s, new_api_token = %s
WHERE id = %s
""", (new_api_session, new_api_user_id, new_api_token, user_id))
logger.info(f"Stored New API session, user_id and token for user {username} token: {new_api_token}")
else:
logger.warning(f"New API login succeeded but no session/user_id. Response: {login_result}")
else:
# 登录失败,尝试先注册再登录(可能是用户在 New API 不存在)
logger.info(f"New API login failed, trying to register user {username}")
register_email = email or f"{request.username}@fake.local"
register_result = await proxy.register(request.username, request.password, register_email)
logger.info(f"New API register result: success={register_result.get('success')}, message={register_result.get('message', 'N/A')}")
if register_result.get("success"):
# 注册成功后再次登录
login_result = await proxy.login(request.username, request.password)
if login_result.get("success"):
new_api_session = login_result.get("session")
new_api_user_id = login_result.get("data", {}).get("id")
# 获取或创建令牌
cookies = {"session": new_api_session} if new_api_session else {}
token_result = await proxy.get_or_create_token(cookies, new_api_user_id)
if token_result.get("success") and token_result.get("data"):
new_api_token = token_result["data"].get("key")
logger.info(f"Got New API token for user {username} after registration")
await cursor.execute("""
UPDATE agent_user
SET new_api_session = %s, new_api_user_id = %s, new_api_token = %s
WHERE id = %s
""", (new_api_session, new_api_user_id, new_api_token, user_id))
logger.info(f"New API session, user_id and token stored for user {username} after registration")
else:
logger.warning(f"New API register also failed for user {username}: {register_result.get('message')}")
except Exception as e:
# 静默失败,不影响本地登录
logger.warning(f"New API login sync failed for user {username}: {e}")
await conn.commit()
return UserLoginResponse(
@ -3560,3 +3416,79 @@ async def sync_bot_from_source(
message="Bot synced from source successfully"
)
# ============== New API 模型和令牌管理 API ==============
class NewAPIModelResponse(BaseModel):
"""New API 模型响应"""
id: str
object: str = "model"
created: Optional[int] = None
owned_by: str = "system"
@router.get("/api/v1/newapi/models", response_model=List[NewAPIModelResponse])
async def get_newapi_models(authorization: Optional[str] = Header(None)):
"""
获取用户可用的模型列表 New API
Args:
authorization: Bearer token
Returns:
List[NewAPIModelResponse]: 模型列表
"""
user_valid, user_id, _ = await verify_user_auth(authorization)
if not user_valid:
raise HTTPException(
status_code=401,
detail="Unauthorized"
)
pool = get_db_pool_manager().pool
async with pool.connection() as conn:
async with conn.cursor() as cursor:
# 获取用户的 new_api_session 和 new_api_user_id
await cursor.execute("""
SELECT new_api_session, new_api_user_id
FROM agent_user
WHERE id = %s
""", (user_id,))
row = await cursor.fetchone()
if not row or not row[0] or not row[1]:
raise HTTPException(
status_code=400,
detail="New API session not found. Please log in again."
)
new_api_session = row[0]
new_api_user_id = row[1]
# 调用 New API 获取模型列表
proxy = get_new_api_proxy()
cookies = {"session": new_api_session}
result = await proxy.get_user_models(cookies, new_api_user_id)
if not result.get("success"):
raise HTTPException(
status_code=500,
detail=result.get("message", "Failed to fetch models from New API")
)
# 解析模型列表
models = []
data = result.get("data", [])
if isinstance(data, list):
for model in data:
models.append(NewAPIModelResponse(
id=model if isinstance(model, str) else model.get("id", ""),
object="model",
created=None,
owned_by="system"
))
return models

381
routes/payment.py Normal file
View File

@ -0,0 +1,381 @@
"""
支付代理 API 路由
代理前端请求到 New API 支付后端
"""
import logging
from typing import Optional, List
from fastapi import APIRouter, HTTPException, Header, Cookie
from pydantic import BaseModel
from utils.new_api_proxy import get_new_api_proxy
from agent.db_pool_manager import get_db_pool_manager
from utils.fastapi_utils import extract_api_key_from_auth
logger = logging.getLogger('app')
router = APIRouter()
# ============== Pydantic Models ==============
class TopupRequest(BaseModel):
"""充值请求"""
amount: int # 充值金额
payment_method: str = "alipay" # 支付方式: alipay, wxpay
class StripeTopupRequest(BaseModel):
"""Stripe 充值请求"""
amount: int
class CreemTopupRequest(BaseModel):
"""Creem 充值请求"""
product_id: str
class SubscribeRequest(BaseModel):
"""订阅请求"""
plan_id: int
payment_method: str = "alipay" # 支付方式: alipay, wxpay
class PaymentInfoResponse(BaseModel):
"""支付信息响应"""
success: bool
message: str = ""
data: Optional[dict] = None
class OrderListResponse(BaseModel):
"""订单列表响应"""
success: bool
message: str = ""
data: Optional[dict] = None
# ============== 辅助函数 ==============
async def verify_user_and_get_session(authorization: Optional[str]) -> tuple[str, Optional[dict], Optional[int]]:
"""
验证用户并获取 New API session cookies user_id
Args:
authorization: Authorization header
Returns:
tuple[str, Optional[dict], Optional[int]]: (user_id, cookies_dict, new_api_user_id)
Raises:
HTTPException: 认证失败
"""
token = extract_api_key_from_auth(authorization)
if not token:
raise HTTPException(status_code=401, detail="Authorization required")
pool = get_db_pool_manager().pool
async with pool.connection() as conn:
async with conn.cursor() as cursor:
# 验证 token 并获取用户信息
await cursor.execute("""
SELECT u.id, u.new_api_session, u.new_api_user_id
FROM agent_user_tokens t
JOIN agent_user u ON t.user_id = u.id
WHERE t.token = %s
AND t.expires_at > NOW()
AND u.is_active = TRUE
""", (token,))
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=401, detail="Invalid or expired token")
user_id, new_api_session, new_api_user_id = row
# 如果有 session构建 cookies
cookies = None
if new_api_session:
cookies = {"session": new_api_session}
return str(user_id), cookies, new_api_user_id
# ============== 套餐/产品相关 API ==============
@router.get("/api/v1/payment/packages")
async def get_payment_packages(authorization: Optional[str] = Header(None)):
"""
获取可购买的套餐列表
"""
user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization)
# 如果没有 New API session 或 user_id返回需要绑定的提示
if not cookies or not new_api_user_id:
return {
"success": False,
"need_bind": True,
"message": "请先绑定支付账户"
}
proxy = get_new_api_proxy()
result = await proxy.get_topup_info(cookies, new_api_user_id)
return result
@router.get("/api/v1/payment/subscription-plans")
async def get_subscription_plans(authorization: Optional[str] = Header(None)):
"""
获取订阅计划列表
"""
user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization)
if not cookies or not new_api_user_id:
return {
"success": False,
"need_bind": True,
"message": "请先绑定支付账户"
}
proxy = get_new_api_proxy()
result = await proxy.get_subscription_plans(cookies, new_api_user_id)
return result
# ============== 支付相关 API ==============
@router.post("/api/v1/payment/topup")
async def create_topup_order(
request: TopupRequest,
authorization: Optional[str] = Header(None)
):
"""
创建充值订单易支付
"""
user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization)
if not cookies or not new_api_user_id:
return {
"success": False,
"need_bind": True,
"message": "请先绑定支付账户"
}
proxy = get_new_api_proxy()
result = await proxy.create_topup_order(
cookies=cookies,
amount=request.amount,
payment_method=request.payment_method,
new_api_user_id=new_api_user_id
)
return result
@router.post("/api/v1/payment/stripe")
async def create_stripe_order(
request: StripeTopupRequest,
authorization: Optional[str] = Header(None)
):
"""
创建 Stripe 支付订单
"""
user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization)
if not cookies or not new_api_user_id:
return {
"success": False,
"need_bind": True,
"message": "请先绑定支付账户"
}
proxy = get_new_api_proxy()
result = await proxy.create_stripe_order(
cookies=cookies,
amount=request.amount,
new_api_user_id=new_api_user_id
)
return result
@router.post("/api/v1/payment/creem")
async def create_creem_order(
request: CreemTopupRequest,
authorization: Optional[str] = Header(None)
):
"""
创建 Creem 支付订单
"""
user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization)
if not cookies or not new_api_user_id:
return {
"success": False,
"need_bind": True,
"message": "请先绑定支付账户"
}
proxy = get_new_api_proxy()
result = await proxy.create_creem_order(
cookies=cookies,
product_id=request.product_id,
new_api_user_id=new_api_user_id
)
return result
# ============== 订单相关 API ==============
@router.get("/api/v1/payment/orders")
async def get_order_list(
page: int = 1,
page_size: int = 20,
authorization: Optional[str] = Header(None)
):
"""
获取订单列表
"""
user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization)
if not cookies or not new_api_user_id:
return {
"success": False,
"need_bind": True,
"message": "请先绑定支付账户"
}
proxy = get_new_api_proxy()
result = await proxy.get_order_list(
cookies=cookies,
page=page,
page_size=page_size,
new_api_user_id=new_api_user_id
)
return result
@router.get("/api/v1/payment/orders/{order_id}")
async def get_order_status(
order_id: str,
authorization: Optional[str] = Header(None)
):
"""
查询订单状态
"""
user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization)
if not cookies or not new_api_user_id:
return {
"success": False,
"need_bind": True,
"message": "请先绑定支付账户"
}
proxy = get_new_api_proxy()
result = await proxy.get_order_status(
cookies=cookies,
order_id=order_id,
new_api_user_id=new_api_user_id
)
return result
# ============== 订阅相关 API ==============
@router.post("/api/v1/payment/subscribe")
async def subscribe_plan(
request: SubscribeRequest,
authorization: Optional[str] = Header(None)
):
"""
订阅计划易支付
"""
user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization)
if not cookies or not new_api_user_id:
return {
"success": False,
"need_bind": True,
"message": "请先绑定支付账户"
}
proxy = get_new_api_proxy()
result = await proxy.subscribe_plan(
cookies=cookies,
plan_id=request.plan_id,
payment_method=request.payment_method,
new_api_user_id=new_api_user_id
)
return result
@router.get("/api/v1/payment/subscription/status")
async def get_subscription_status(authorization: Optional[str] = Header(None)):
"""
获取订阅状态
"""
user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization)
if not cookies or not new_api_user_id:
return {
"success": False,
"need_bind": True,
"message": "请先绑定支付账户"
}
proxy = get_new_api_proxy()
result = await proxy.get_subscription_status(cookies, new_api_user_id)
return result
@router.post("/api/v1/payment/subscription/cancel")
async def cancel_subscription(authorization: Optional[str] = Header(None)):
"""
取消订阅
注意New API 不支持用户自行取消订阅需要联系管理员
"""
user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization)
if not cookies or not new_api_user_id:
return {
"success": False,
"need_bind": True,
"message": "请先绑定支付账户"
}
# New API 不支持用户自行取消订阅,需要管理员操作
return {
"success": False,
"message": "如需取消订阅,请联系管理员处理"
}
# ============== 额度相关 API ==============
@router.get("/api/v1/payment/quota")
async def get_quota(authorization: Optional[str] = Header(None)):
"""
获取用户额度信息
"""
user_id, cookies, new_api_user_id = await verify_user_and_get_session(authorization)
if not cookies or not new_api_user_id:
return {
"success": False,
"need_bind": True,
"message": "请先绑定支付账户"
}
proxy = get_new_api_proxy()
result = await proxy.get_quota(cookies, new_api_user_id)
return result

View File

@ -454,15 +454,16 @@ async def fetch_bot_config_from_db(bot_user_id: str) -> Dict[str, Any]:
"""
try:
from agent.db_pool_manager import get_db_pool_manager
from utils.settings import NEW_API_BASE_URL
pool = get_db_pool_manager().pool
async with pool.connection() as conn:
async with conn.cursor() as cursor:
# 从 agent_bots 表获取 bot 信息和 settings
# 从 agent_bots 表获取 bot 信息和 settings,同时获取 owner_id
await cursor.execute(
"""
SELECT id, name, settings
SELECT id, name, settings, owner_id
FROM agent_bots WHERE id = %s
""",
(bot_user_id,)
@ -478,6 +479,7 @@ async def fetch_bot_config_from_db(bot_user_id: str) -> Dict[str, Any]:
bot_uuid = bot_row[0]
bot_name = bot_row[1]
settings_json = bot_row[2]
owner_id = bot_row[3]
# 解析 settings JSONB 字段
if settings_json:
@ -492,14 +494,32 @@ async def fetch_bot_config_from_db(bot_user_id: str) -> Dict[str, Any]:
else:
settings_data = {}
# 获取 model_id
# 获取 model_id(来自 New API格式为 "Provider/ModelName"
model_id = settings_data.get("model_id", "")
# 构建 config 字典,使用默认值填充缺失的字段
# 获取 bot owner 的 new_api_token 作为 api_key
api_key = ""
if owner_id:
await cursor.execute(
"""
SELECT new_api_token
FROM agent_user WHERE id = %s
""",
(owner_id,)
)
user_row = await cursor.fetchone()
if user_row and user_row[0]:
api_key = user_row[0]
# 构建 config 字典
# model_id 格式为 "Provider/ModelName",需要拆分
model_name = model_id
model_server = NEW_API_BASE_URL.rstrip('/') + "/v1" if NEW_API_BASE_URL else ""
config = {
"model": "qwen3-next",
"api_key": "",
"model_server": "",
"model": model_name,
"api_key": api_key,
"model_server": model_server,
"language": settings_data.get("language", "zh"),
"dataset_ids": settings_data.get("dataset_ids", []),
"system_prompt": settings_data.get("system_prompt", ""),
@ -512,25 +532,6 @@ async def fetch_bot_config_from_db(bot_user_id: str) -> Dict[str, Any]:
"suggestions": settings_data.get("suggestions", [])
}
# 根据 model_id 查询模型信息
if model_id:
await cursor.execute(
"""
SELECT model, server, api_key
FROM agent_models WHERE id = %s
""",
(model_id,)
)
model_row = await cursor.fetchone()
if model_row:
config['model'] = model_row[0]
config['model_server'] = model_row[1]
config['api_key'] = model_row[2]
else:
logger.warning(f"Model with id {model_id} not found, using defaults")
else:
logger.warning(f"No model_id set for bot {bot_user_id}, using defaults")
# 处理 dataset_ids
dataset_ids = config['dataset_ids']
if dataset_ids:
@ -603,6 +604,7 @@ async def fetch_bot_config_from_db(bot_user_id: str) -> Dict[str, Any]:
else:
config["mcp_settings"] = []
logger.info(f"Fetched bot config for {bot_user_id}: model={config['model']}, api_key={'*' + config['api_key'][-4:] if config['api_key'] else 'N/A'}")
return config
except HTTPException:

391
utils/new_api_proxy.py Normal file
View File

@ -0,0 +1,391 @@
"""
New API 代理工具类
用于与 New API 支付后端进行通信
"""
import logging
from typing import Optional, Dict, Any
import httpx
from utils.settings import NEW_API_BASE_URL, NEW_API_TIMEOUT
logger = logging.getLogger('app')
class NewAPIProxy:
"""New API 代理类,处理与支付后端的所有通信"""
def __init__(self):
self.base_url = NEW_API_BASE_URL.rstrip('/')
self.timeout = NEW_API_TIMEOUT
def _get_headers(
self,
cookies: Optional[Dict[str, str]] = None,
new_api_user_id: Optional[int] = None
) -> Dict[str, str]:
"""获取请求头"""
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
}
# 添加 New-Api-User header用户 ID
if new_api_user_id:
headers["New-Api-User"] = str(new_api_user_id)
return headers
async def _request(
self,
method: str,
endpoint: str,
cookies: Optional[Dict[str, str]] = None,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
new_api_user_id: Optional[int] = None
) -> Dict[str, Any]:
"""
发送请求到 New API
"""
url = f"{self.base_url}{endpoint}"
headers = self._get_headers(cookies, new_api_user_id)
async with httpx.AsyncClient(timeout=self.timeout) as client:
try:
response = await client.request(
method=method,
url=url,
headers=headers,
cookies=cookies,
json=data if data else None,
params=params
)
try:
result = response.json()
if response.cookies:
result["_cookies"] = dict(response.cookies)
return result
except Exception:
return {
"success": False,
"message": f"Invalid response: {response.text[:500]}",
"status_code": response.status_code
}
except httpx.TimeoutException:
logger.error(f"New API request timeout: {url}")
return {"success": False, "message": "Request timeout"}
except httpx.RequestError as e:
logger.error(f"New API request error: {e}")
return {"success": False, "message": str(e)}
async def _request_with_cookies(
self,
method: str,
endpoint: str,
cookies: Optional[Dict[str, str]] = None,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
new_api_user_id: Optional[int] = None
) -> tuple[Dict[str, Any], Dict[str, str]]:
"""发送请求到 New API并返回响应和 cookies"""
url = f"{self.base_url}{endpoint}"
headers = self._get_headers(cookies, new_api_user_id)
async with httpx.AsyncClient(timeout=self.timeout) as client:
try:
response = await client.request(
method=method,
url=url,
headers=headers,
cookies=cookies,
json=data if data else None,
params=params
)
new_cookies = dict(response.cookies) if response.cookies else {}
try:
result = response.json()
return result, new_cookies
except Exception:
return {
"success": False,
"message": f"Invalid response: {response.text[:500]}",
"status_code": response.status_code
}, new_cookies
except httpx.TimeoutException:
logger.error(f"New API request timeout: {url}")
return {"success": False, "message": "Request timeout"}, {}
except httpx.RequestError as e:
logger.error(f"New API request error: {e}")
return {"success": False, "message": str(e)}, {}
# ==================== 用户认证相关 ====================
async def login(self, username: str, password: str) -> Dict[str, Any]:
"""用户登录到 New API"""
result, cookies = await self._request_with_cookies(
"POST",
"/api/user/login",
data={"username": username, "password": password}
)
if result.get("success") and cookies:
result["session"] = cookies.get("session") or cookies.get("SESSION")
result["_cookies"] = cookies
return result
async def register(self, username: str, password: str, email: Optional[str] = None) -> Dict[str, Any]:
"""在 New API 注册用户"""
data = {"username": username, "password": password}
if email:
data["email"] = email
return await self._request("POST", "/api/user/register", data=data)
async def get_user_info(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""获取用户信息"""
return await self._request("GET", "/api/user/self", cookies=cookies, new_api_user_id=new_api_user_id)
# ==================== 充值/套餐相关 ====================
async def get_topup_info(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""获取充值信息,包含 creem_products 套餐列表"""
return await self._request("GET", "/api/user/topup/info", cookies=cookies, new_api_user_id=new_api_user_id)
async def get_subscription_plans(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""获取订阅计划列表"""
return await self._request("GET", "/api/subscription/plans", cookies=cookies, new_api_user_id=new_api_user_id)
# ==================== 支付相关 ====================
async def create_topup_order(
self,
cookies: Dict[str, str],
amount: int,
payment_method: str,
new_api_user_id: int
) -> Dict[str, Any]:
"""创建充值订单(易支付)"""
return await self._request(
"POST",
"/api/user/pay",
cookies=cookies,
data={"amount": amount, "payment_method": payment_method},
new_api_user_id=new_api_user_id
)
async def create_stripe_order(
self,
cookies: Dict[str, str],
amount: int,
new_api_user_id: int
) -> Dict[str, Any]:
"""创建 Stripe 支付订单"""
return await self._request(
"POST",
"/api/user/stripe/pay",
cookies=cookies,
data={"amount": amount},
new_api_user_id=new_api_user_id
)
async def create_creem_order(
self,
cookies: Dict[str, str],
product_id: str,
new_api_user_id: int
) -> Dict[str, Any]:
"""创建 Creem 支付订单"""
return await self._request(
"POST",
"/api/user/creem/pay",
cookies=cookies,
data={"product_id": product_id},
new_api_user_id=new_api_user_id
)
async def get_order_status(
self,
cookies: Dict[str, str],
order_id: str,
new_api_user_id: int
) -> Dict[str, Any]:
"""查询订单状态"""
return await self._request(
"GET",
f"/api/user/order/{order_id}",
cookies=cookies,
new_api_user_id=new_api_user_id
)
async def get_order_list(
self,
cookies: Dict[str, str],
page: int,
page_size: int,
new_api_user_id: int
) -> Dict[str, Any]:
"""获取订单列表(充值记录)"""
return await self._request(
"GET",
"/api/user/topup/self",
cookies=cookies,
params={"p": page, "size": page_size},
new_api_user_id=new_api_user_id
)
# ==================== 订阅相关 ====================
async def subscribe_plan(
self,
cookies: Dict[str, str],
plan_id: int,
new_api_user_id: int,
payment_method: str = "alipay"
) -> Dict[str, Any]:
"""订阅计划(使用易支付)"""
return await self._request(
"POST",
"/api/subscription/epay/pay",
cookies=cookies,
data={"plan_id": plan_id, "payment_method": payment_method},
new_api_user_id=new_api_user_id
)
async def get_subscription_status(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""获取订阅状态"""
return await self._request(
"GET",
"/api/subscription/self",
cookies=cookies,
new_api_user_id=new_api_user_id
)
async def cancel_subscription(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""取消订阅"""
return await self._request(
"POST",
"/api/subscription/cancel",
cookies=cookies,
new_api_user_id=new_api_user_id
)
# ==================== 额度/余额相关 ====================
async def get_quota(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""获取用户额度信息(从用户信息中获取)"""
return await self._request(
"GET",
"/api/user/self",
cookies=cookies,
new_api_user_id=new_api_user_id
)
# ==================== 模型相关 ====================
async def get_user_models(self, cookies: Dict[str, str], new_api_user_id: int) -> Dict[str, Any]:
"""获取用户可用模型列表"""
return await self._request(
"GET",
"/api/user/models",
cookies=cookies,
new_api_user_id=new_api_user_id
)
# ==================== 令牌相关 ====================
async def get_tokens(
self,
cookies: Dict[str, str],
new_api_user_id: int,
page: int = 1,
page_size: int = 100
) -> Dict[str, Any]:
"""获取令牌列表"""
return await self._request(
"GET",
"/api/token/",
cookies=cookies,
params={"p": page, "page_size": page_size},
new_api_user_id=new_api_user_id
)
async def create_token(
self,
cookies: Dict[str, str],
new_api_user_id: int,
name: str = "API Token",
remain_quota: Optional[int] = None,
expired_time: Optional[int] = None,
unlimited_quota: bool = True
) -> Dict[str, Any]:
"""创建令牌"""
data = {
"name": name,
"unlimited_quota": unlimited_quota
}
if remain_quota is not None:
data["remain_quota"] = remain_quota
if expired_time is not None:
data["expired_time"] = expired_time
return await self._request(
"POST",
"/api/token/",
cookies=cookies,
data=data,
new_api_user_id=new_api_user_id
)
async def get_or_create_token(
self,
cookies: Dict[str, str],
new_api_user_id: int,
token_name: str = "Bot Manager Token"
) -> Dict[str, Any]:
"""获取或创建令牌(如果没有则创建一个)"""
# 先尝试获取现有令牌
tokens_result = await self.get_tokens(cookies, new_api_user_id)
if tokens_result.get("success") and tokens_result.get("data"):
# data 格式: {'page': 1, 'page_size': 100, 'total': 4, 'items': [...]}
tokens = tokens_result["data"].get("items", [])
if isinstance(tokens, list) and len(tokens) > 0:
# 返回第一个可用的令牌
return {
"success": True,
"data": tokens[0],
"message": "使用现有令牌"
}
# 没有令牌,创建一个新的
create_result = await self.create_token(
cookies,
new_api_user_id,
name=token_name
)
if create_result.get("success"):
# 创建成功后重新获取令牌列表
tokens_result = await self.get_tokens(cookies, new_api_user_id)
if tokens_result.get("success") and tokens_result.get("data"):
tokens = tokens_result["data"].get("items", [])
if isinstance(tokens, list) and len(tokens) > 0:
return {
"success": True,
"data": tokens[0],
"message": "已创建新令牌"
}
return create_result
# 全局单例
_new_api_proxy: Optional[NewAPIProxy] = None
def get_new_api_proxy() -> NewAPIProxy:
"""获取 New API 代理单例"""
global _new_api_proxy
if _new_api_proxy is None:
_new_api_proxy = NewAPIProxy()
return _new_api_proxy

View File

@ -100,3 +100,16 @@ RAGFLOW_ALLOWED_EXTENSIONS = os.getenv(
# 性能配置
RAGFLOW_CONNECTION_TIMEOUT = int(os.getenv("RAGFLOW_CONNECTION_TIMEOUT", "30")) # 30秒
RAGFLOW_MAX_CONCURRENT_UPLOADS = int(os.getenv("RAGFLOW_MAX_CONCURRENT_UPLOADS", "5"))
# ============================================================
# New API Payment Configuration
# ============================================================
# New API 基础 URL支付后端
NEW_API_BASE_URL = os.getenv("NEW_API_BASE_URL", "http://116.62.16.218:3000")
# New API 请求超时(秒)
NEW_API_TIMEOUT = int(os.getenv("NEW_API_TIMEOUT", "30"))
# New API 管理员密钥(用于同步用户等管理操作,可选)
NEW_API_ADMIN_KEY = os.getenv("NEW_API_ADMIN_KEY", "")