273 lines
8.2 KiB
Python
273 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
MCP Server for date and time operations.
|
||
Provides functions to:
|
||
1. Get current date and time
|
||
2. Get current date
|
||
3. Get current time
|
||
4. Format date and time
|
||
"""
|
||
|
||
import json
|
||
import sys
|
||
import asyncio
|
||
from datetime import datetime, timezone
|
||
from typing import Any, Dict, List
|
||
from mcp_common import (
|
||
load_tools_from_json,
|
||
create_error_response,
|
||
create_success_response,
|
||
create_initialize_response,
|
||
create_ping_response,
|
||
create_tools_list_response,
|
||
handle_mcp_streaming
|
||
)
|
||
|
||
|
||
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""Handle MCP request"""
|
||
try:
|
||
method = request.get("method")
|
||
params = request.get("params", {})
|
||
request_id = request.get("id")
|
||
|
||
if method == "initialize":
|
||
return create_initialize_response(request_id, "datetime-server")
|
||
|
||
elif method == "ping":
|
||
return create_ping_response(request_id)
|
||
|
||
elif method == "tools/list":
|
||
# 从 JSON 文件加载工具定义
|
||
tools = load_tools_from_json("datetime_tools.json")
|
||
return create_tools_list_response(request_id, tools)
|
||
|
||
elif method == "tools/call":
|
||
tool_name = params.get("name")
|
||
arguments = params.get("arguments", {})
|
||
|
||
if tool_name == "get_current_datetime":
|
||
return await get_current_datetime(arguments, request_id)
|
||
|
||
elif tool_name == "get_current_date":
|
||
return await get_current_date(arguments, request_id)
|
||
|
||
elif tool_name == "get_current_time":
|
||
return await get_current_time(arguments, request_id)
|
||
|
||
elif tool_name == "format_datetime":
|
||
return await format_datetime(arguments, request_id)
|
||
|
||
else:
|
||
return create_error_response(
|
||
request_id,
|
||
-32601,
|
||
f"Unknown tool: {tool_name}"
|
||
)
|
||
|
||
else:
|
||
return create_error_response(
|
||
request_id,
|
||
-32601,
|
||
f"Unknown method: {method}"
|
||
)
|
||
|
||
except Exception as e:
|
||
return create_error_response(
|
||
request.get("id"),
|
||
-32603,
|
||
f"Internal error: {str(e)}"
|
||
)
|
||
|
||
|
||
async def get_current_datetime(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
|
||
"""获取当前日期时间"""
|
||
try:
|
||
timezone_str = arguments.get("timezone", "local")
|
||
|
||
if timezone_str == "UTC":
|
||
now = datetime.now(timezone.utc)
|
||
elif timezone_str == "local":
|
||
now = datetime.now()
|
||
else:
|
||
# 支持常见的时区名称
|
||
try:
|
||
import pytz
|
||
tz = pytz.timezone(timezone_str)
|
||
now = datetime.now(tz)
|
||
except ImportError:
|
||
# 如果没有pytz库,回退到本地时间
|
||
now = datetime.now()
|
||
except Exception:
|
||
now = datetime.now()
|
||
|
||
result = {
|
||
"datetime": now.isoformat(),
|
||
"year": now.year,
|
||
"month": now.month,
|
||
"day": now.day,
|
||
"hour": now.hour,
|
||
"minute": now.minute,
|
||
"second": now.second,
|
||
"weekday": now.weekday(), # 0=Monday, 6=Sunday
|
||
"timezone": timezone_str
|
||
}
|
||
|
||
# 将结果包装在 content 字段中
|
||
return {
|
||
"jsonrpc": "2.0",
|
||
"id": request_id,
|
||
"result": {
|
||
"content": [
|
||
{
|
||
"type": "text",
|
||
"text": json.dumps(result, ensure_ascii=False, indent=2)
|
||
}
|
||
]
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
return create_error_response(request_id, -32603, f"获取日期时间失败: {str(e)}")
|
||
|
||
|
||
async def get_current_date(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
|
||
"""获取当前日期"""
|
||
try:
|
||
timezone_str = arguments.get("timezone", "local")
|
||
|
||
if timezone_str == "UTC":
|
||
now = datetime.now(timezone.utc)
|
||
elif timezone_str == "local":
|
||
now = datetime.now()
|
||
else:
|
||
try:
|
||
import pytz
|
||
tz = pytz.timezone(timezone_str)
|
||
now = datetime.now(tz)
|
||
except ImportError:
|
||
now = datetime.now()
|
||
except Exception:
|
||
now = datetime.now()
|
||
|
||
result = {
|
||
"date": now.date().isoformat(),
|
||
"year": now.year,
|
||
"month": now.month,
|
||
"day": now.day,
|
||
"weekday": now.weekday(),
|
||
"timezone": timezone_str
|
||
}
|
||
|
||
# 将结果包装在 content 字段中
|
||
return {
|
||
"jsonrpc": "2.0",
|
||
"id": request_id,
|
||
"result": {
|
||
"content": [
|
||
{
|
||
"type": "text",
|
||
"text": json.dumps(result, ensure_ascii=False, indent=2)
|
||
}
|
||
]
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
return create_error_response(request_id, -32603, f"获取日期失败: {str(e)}")
|
||
|
||
|
||
async def get_current_time(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
|
||
"""获取当前时间"""
|
||
try:
|
||
timezone_str = arguments.get("timezone", "local")
|
||
|
||
if timezone_str == "UTC":
|
||
now = datetime.now(timezone.utc)
|
||
elif timezone_str == "local":
|
||
now = datetime.now()
|
||
else:
|
||
try:
|
||
import pytz
|
||
tz = pytz.timezone(timezone_str)
|
||
now = datetime.now(tz)
|
||
except ImportError:
|
||
now = datetime.now()
|
||
except Exception:
|
||
now = datetime.now()
|
||
|
||
result = {
|
||
"time": now.time().isoformat(),
|
||
"hour": now.hour,
|
||
"minute": now.minute,
|
||
"second": now.second,
|
||
"timezone": timezone_str
|
||
}
|
||
|
||
# 将结果包装在 content 字段中
|
||
return {
|
||
"jsonrpc": "2.0",
|
||
"id": request_id,
|
||
"result": {
|
||
"content": [
|
||
{
|
||
"type": "text",
|
||
"text": json.dumps(result, ensure_ascii=False, indent=2)
|
||
}
|
||
]
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
return create_error_response(request_id, -32603, f"获取时间失败: {str(e)}")
|
||
|
||
|
||
async def format_datetime(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
|
||
"""格式化日期时间"""
|
||
try:
|
||
format_string = arguments.get("format", "%Y-%m-%d %H:%M:%S")
|
||
timezone_str = arguments.get("timezone", "local")
|
||
|
||
if timezone_str == "UTC":
|
||
now = datetime.now(timezone.utc)
|
||
elif timezone_str == "local":
|
||
now = datetime.now()
|
||
else:
|
||
try:
|
||
import pytz
|
||
tz = pytz.timezone(timezone_str)
|
||
now = datetime.now(tz)
|
||
except ImportError:
|
||
now = datetime.now()
|
||
except Exception:
|
||
now = datetime.now()
|
||
|
||
formatted_datetime = now.strftime(format_string)
|
||
|
||
result = {
|
||
"formatted_datetime": formatted_datetime,
|
||
"format": format_string,
|
||
"original_datetime": now.isoformat(),
|
||
"timezone": timezone_str
|
||
}
|
||
|
||
# 将结果包装在 content 字段中
|
||
return {
|
||
"jsonrpc": "2.0",
|
||
"id": request_id,
|
||
"result": {
|
||
"content": [
|
||
{
|
||
"type": "text",
|
||
"text": json.dumps(result, ensure_ascii=False, indent=2)
|
||
}
|
||
]
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
return create_error_response(request_id, -32603, f"格式化日期时间失败: {str(e)}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(handle_mcp_streaming(handle_request)) |