qwen_agent/mcp/datetime_server.py

273 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
MCP Server for date and time operations.
Provides functions to:
1. Get current date and time
2. Get current date
3. Get current time
4. Format date and time
"""
import json
import sys
import asyncio
from datetime import datetime, timezone
from typing import Any, Dict, List
from mcp_common import (
load_tools_from_json,
create_error_response,
create_success_response,
create_initialize_response,
create_ping_response,
create_tools_list_response,
handle_mcp_streaming
)
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle MCP request"""
try:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "initialize":
return create_initialize_response(request_id, "datetime-server")
elif method == "ping":
return create_ping_response(request_id)
elif method == "tools/list":
# 从 JSON 文件加载工具定义
tools = load_tools_from_json("datetime_tools.json")
return create_tools_list_response(request_id, tools)
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "get_current_datetime":
return await get_current_datetime(arguments, request_id)
elif tool_name == "get_current_date":
return await get_current_date(arguments, request_id)
elif tool_name == "get_current_time":
return await get_current_time(arguments, request_id)
elif tool_name == "format_datetime":
return await format_datetime(arguments, request_id)
else:
return create_error_response(
request_id,
-32601,
f"Unknown tool: {tool_name}"
)
else:
return create_error_response(
request_id,
-32601,
f"Unknown method: {method}"
)
except Exception as e:
return create_error_response(
request.get("id"),
-32603,
f"Internal error: {str(e)}"
)
async def get_current_datetime(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""获取当前日期时间"""
try:
timezone_str = arguments.get("timezone", "local")
if timezone_str == "UTC":
now = datetime.now(timezone.utc)
elif timezone_str == "local":
now = datetime.now()
else:
# 支持常见的时区名称
try:
import pytz
tz = pytz.timezone(timezone_str)
now = datetime.now(tz)
except ImportError:
# 如果没有pytz库回退到本地时间
now = datetime.now()
except Exception:
now = datetime.now()
result = {
"datetime": now.isoformat(),
"year": now.year,
"month": now.month,
"day": now.day,
"hour": now.hour,
"minute": now.minute,
"second": now.second,
"weekday": now.weekday(), # 0=Monday, 6=Sunday
"timezone": timezone_str
}
# 将结果包装在 content 字段中
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, ensure_ascii=False, indent=2)
}
]
}
}
except Exception as e:
return create_error_response(request_id, -32603, f"获取日期时间失败: {str(e)}")
async def get_current_date(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""获取当前日期"""
try:
timezone_str = arguments.get("timezone", "local")
if timezone_str == "UTC":
now = datetime.now(timezone.utc)
elif timezone_str == "local":
now = datetime.now()
else:
try:
import pytz
tz = pytz.timezone(timezone_str)
now = datetime.now(tz)
except ImportError:
now = datetime.now()
except Exception:
now = datetime.now()
result = {
"date": now.date().isoformat(),
"year": now.year,
"month": now.month,
"day": now.day,
"weekday": now.weekday(),
"timezone": timezone_str
}
# 将结果包装在 content 字段中
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, ensure_ascii=False, indent=2)
}
]
}
}
except Exception as e:
return create_error_response(request_id, -32603, f"获取日期失败: {str(e)}")
async def get_current_time(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""获取当前时间"""
try:
timezone_str = arguments.get("timezone", "local")
if timezone_str == "UTC":
now = datetime.now(timezone.utc)
elif timezone_str == "local":
now = datetime.now()
else:
try:
import pytz
tz = pytz.timezone(timezone_str)
now = datetime.now(tz)
except ImportError:
now = datetime.now()
except Exception:
now = datetime.now()
result = {
"time": now.time().isoformat(),
"hour": now.hour,
"minute": now.minute,
"second": now.second,
"timezone": timezone_str
}
# 将结果包装在 content 字段中
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, ensure_ascii=False, indent=2)
}
]
}
}
except Exception as e:
return create_error_response(request_id, -32603, f"获取时间失败: {str(e)}")
async def format_datetime(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""格式化日期时间"""
try:
format_string = arguments.get("format", "%Y-%m-%d %H:%M:%S")
timezone_str = arguments.get("timezone", "local")
if timezone_str == "UTC":
now = datetime.now(timezone.utc)
elif timezone_str == "local":
now = datetime.now()
else:
try:
import pytz
tz = pytz.timezone(timezone_str)
now = datetime.now(tz)
except ImportError:
now = datetime.now()
except Exception:
now = datetime.now()
formatted_datetime = now.strftime(format_string)
result = {
"formatted_datetime": formatted_datetime,
"format": format_string,
"original_datetime": now.isoformat(),
"timezone": timezone_str
}
# 将结果包装在 content 字段中
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, ensure_ascii=False, indent=2)
}
]
}
}
except Exception as e:
return create_error_response(request_id, -32603, f"格式化日期时间失败: {str(e)}")
if __name__ == "__main__":
asyncio.run(handle_mcp_streaming(handle_request))