qwen_agent/mcp/datetime_server.py
朱潮 425f3c5bb4 chore: replace Chinese comments and log messages with English
Convert all Chinese comments, docstrings, logger/print output,
HTTPException detail messages, and API response messages to English
across the entire codebase. Functional zh/ja localized strings
(e.g. prompt templates, timezone display names, date formats) are
preserved as-is.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-30 19:45:35 +08:00

273 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""
MCP Server for date and time operations.
Provides functions to:
1. Get current date and time
2. Get current date
3. Get current time
4. Format date and time
"""
import json
import sys
import asyncio
from datetime import datetime, timezone
from typing import Any, Dict, List
from mcp_common import (
load_tools_from_json,
create_error_response,
create_success_response,
create_initialize_response,
create_ping_response,
create_tools_list_response,
handle_mcp_streaming
)
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle MCP request"""
try:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "initialize":
return create_initialize_response(request_id, "datetime-server")
elif method == "ping":
return create_ping_response(request_id)
elif method == "tools/list":
# Load tool definitions from the JSON file
tools = load_tools_from_json("datetime_tools.json")
return create_tools_list_response(request_id, tools)
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "get_current_datetime":
return await get_current_datetime(arguments, request_id)
elif tool_name == "get_current_date":
return await get_current_date(arguments, request_id)
elif tool_name == "get_current_time":
return await get_current_time(arguments, request_id)
elif tool_name == "format_datetime":
return await format_datetime(arguments, request_id)
else:
return create_error_response(
request_id,
-32601,
f"Unknown tool: {tool_name}"
)
else:
return create_error_response(
request_id,
-32601,
f"Unknown method: {method}"
)
except Exception as e:
return create_error_response(
request.get("id"),
-32603,
f"Internal error: {str(e)}"
)
async def get_current_datetime(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""Get the current date and time."""
try:
timezone_str = arguments.get("timezone", "local")
if timezone_str == "UTC":
now = datetime.now(timezone.utc)
elif timezone_str == "local":
now = datetime.now()
else:
# Support common timezone names
try:
import pytz
tz = pytz.timezone(timezone_str)
now = datetime.now(tz)
except ImportError:
# Fall back to local time if pytz is unavailable
now = datetime.now()
except Exception:
now = datetime.now()
result = {
"datetime": now.isoformat(),
"year": now.year,
"month": now.month,
"day": now.day,
"hour": now.hour,
"minute": now.minute,
"second": now.second,
"weekday": now.weekday(), # 0=Monday, 6=Sunday
"timezone": timezone_str
}
# Wrap the result in the content field
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, ensure_ascii=False, indent=2)
}
]
}
}
except Exception as e:
return create_error_response(request_id, -32603, f"Failed to get date and time: {str(e)}")
async def get_current_date(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""Get the current date."""
try:
timezone_str = arguments.get("timezone", "local")
if timezone_str == "UTC":
now = datetime.now(timezone.utc)
elif timezone_str == "local":
now = datetime.now()
else:
try:
import pytz
tz = pytz.timezone(timezone_str)
now = datetime.now(tz)
except ImportError:
now = datetime.now()
except Exception:
now = datetime.now()
result = {
"date": now.date().isoformat(),
"year": now.year,
"month": now.month,
"day": now.day,
"weekday": now.weekday(),
"timezone": timezone_str
}
# Wrap the result in the content field
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, ensure_ascii=False, indent=2)
}
]
}
}
except Exception as e:
return create_error_response(request_id, -32603, f"Failed to get date: {str(e)}")
async def get_current_time(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""Get the current time."""
try:
timezone_str = arguments.get("timezone", "local")
if timezone_str == "UTC":
now = datetime.now(timezone.utc)
elif timezone_str == "local":
now = datetime.now()
else:
try:
import pytz
tz = pytz.timezone(timezone_str)
now = datetime.now(tz)
except ImportError:
now = datetime.now()
except Exception:
now = datetime.now()
result = {
"time": now.time().isoformat(),
"hour": now.hour,
"minute": now.minute,
"second": now.second,
"timezone": timezone_str
}
# Wrap the result in the content field
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, ensure_ascii=False, indent=2)
}
]
}
}
except Exception as e:
return create_error_response(request_id, -32603, f"Failed to get time: {str(e)}")
async def format_datetime(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""Format date and time."""
try:
format_string = arguments.get("format", "%Y-%m-%d %H:%M:%S")
timezone_str = arguments.get("timezone", "local")
if timezone_str == "UTC":
now = datetime.now(timezone.utc)
elif timezone_str == "local":
now = datetime.now()
else:
try:
import pytz
tz = pytz.timezone(timezone_str)
now = datetime.now(tz)
except ImportError:
now = datetime.now()
except Exception:
now = datetime.now()
formatted_datetime = now.strftime(format_string)
result = {
"formatted_datetime": formatted_datetime,
"format": format_string,
"original_datetime": now.isoformat(),
"timezone": timezone_str
}
# Wrap the result in the content field
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, ensure_ascii=False, indent=2)
}
]
}
}
except Exception as e:
return create_error_response(request_id, -32603, f"Failed to format date and time: {str(e)}")
if __name__ == "__main__":
asyncio.run(handle_mcp_streaming(handle_request))