Merge branch 'feature/moshui20260330-schedule-job' into bot_manager

This commit is contained in:
朱潮 2026-03-31 11:20:12 +08:00
commit b4cf5face0

View File

@ -9,6 +9,8 @@ import asyncio
import logging import logging
import time import time
import yaml import yaml
import aiohttp
import json
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
@ -136,7 +138,7 @@ class ScheduleExecutor:
logger.info(f"Executing scheduled task: {task_id} ({task.get('name', '')}) for bot={bot_id} user={user_id}") logger.info(f"Executing scheduled task: {task_id} ({task.get('name', '')}) for bot={bot_id} user={user_id}")
# 调用 agent # 调用 agent
response_text = await self._call_agent(bot_id, user_id, task) response_text = await self._call_agent_v2(bot_id, user_id, task)
# 写入日志 # 写入日志
duration_ms = int((time.time() - start_time) * 1000) duration_ms = int((time.time() - start_time) * 1000)
@ -156,42 +158,35 @@ class ScheduleExecutor:
finally: finally:
self._executing_tasks.discard(task_id) self._executing_tasks.discard(task_id)
async def _call_agent(self, bot_id: str, user_id: str, task: dict) -> str: async def _call_agent_v2(self, bot_id: str, user_id: str, task: dict) -> str:
"""构建 AgentConfig 并调用 agent""" """通过 HTTP 调用 /api/v2/chat/completions 接口"""
from routes.chat import create_agent_and_generate_response from utils.fastapi_utils import generate_v2_auth_token
from utils.fastapi_utils import fetch_bot_config, create_project_directory
from agent.agent_config import AgentConfig
bot_config = await fetch_bot_config(bot_id) url = f"http://127.0.0.1:8001/api/v2/chat/completions"
project_dir = create_project_directory( auth_token = generate_v2_auth_token(bot_id)
bot_config.get("dataset_ids", []),
bot_id,
bot_config.get("skills")
)
messages = [{"role": "user", "content": task["message"]}] payload = {
"messages": [{"role": "user", "content": task["message"]}],
"stream": False,
"bot_id": bot_id,
"tool_response": False,
"session_id": f"schedule_{task['id']}",
"user_identifier": user_id,
}
config = AgentConfig( headers = {
bot_id=bot_id, "Content-Type": "application/json",
api_key=bot_config.get("api_key"), "Authorization": f"Bearer {auth_token}",
model_name=bot_config.get("model", "qwen3-next"), }
model_server=bot_config.get("model_server", ""),
language=bot_config.get("language", "ja"),
system_prompt=bot_config.get("system_prompt"),
mcp_settings=bot_config.get("mcp_settings", []),
user_identifier=user_id,
session_id=f"schedule_{task['id']}",
project_dir=project_dir,
stream=False,
tool_response=False,
messages=messages,
dataset_ids=bot_config.get("dataset_ids", []),
enable_memori=bot_config.get("enable_memory", False),
shell_env=bot_config.get("shell_env") or {},
)
result = await create_agent_and_generate_response(config) async with aiohttp.ClientSession() as session:
return result.choices[0]["message"]["content"] async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=300)) as resp:
if resp.status != 200:
body = await resp.text()
raise RuntimeError(f"API returned {resp.status}: {body}")
data = await resp.json()
return data["choices"][0]["message"]["content"]
def _update_task_after_execution(self, task_id: str, tasks_file: Path): def _update_task_after_execution(self, task_id: str, tasks_file: Path):
"""执行后更新 tasks.yaml""" """执行后更新 tasks.yaml"""