#!/usr/bin/env python3 """ MCP Server for date and time operations. Provides functions to: 1. Get current date and time 2. Get current date 3. Get current time 4. Format date and time """ import json import sys import asyncio from datetime import datetime, timezone from typing import Any, Dict, List from mcp_common import ( load_tools_from_json, create_error_response, create_success_response, create_initialize_response, create_ping_response, create_tools_list_response, handle_mcp_streaming ) async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]: """Handle MCP request""" try: method = request.get("method") params = request.get("params", {}) request_id = request.get("id") if method == "initialize": return create_initialize_response(request_id, "datetime-server") elif method == "ping": return create_ping_response(request_id) elif method == "tools/list": # Load tool definitions from the JSON file tools = load_tools_from_json("datetime_tools.json") return create_tools_list_response(request_id, tools) elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name == "get_current_datetime": return await get_current_datetime(arguments, request_id) elif tool_name == "get_current_date": return await get_current_date(arguments, request_id) elif tool_name == "get_current_time": return await get_current_time(arguments, request_id) elif tool_name == "format_datetime": return await format_datetime(arguments, request_id) else: return create_error_response( request_id, -32601, f"Unknown tool: {tool_name}" ) else: return create_error_response( request_id, -32601, f"Unknown method: {method}" ) except Exception as e: return create_error_response( request.get("id"), -32603, f"Internal error: {str(e)}" ) async def get_current_datetime(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """Get the current date and time.""" try: timezone_str = arguments.get("timezone", "local") if timezone_str == "UTC": now = datetime.now(timezone.utc) elif timezone_str == "local": now = datetime.now() else: # Support common timezone names try: import pytz tz = pytz.timezone(timezone_str) now = datetime.now(tz) except ImportError: # Fall back to local time if pytz is unavailable now = datetime.now() except Exception: now = datetime.now() result = { "datetime": now.isoformat(), "year": now.year, "month": now.month, "day": now.day, "hour": now.hour, "minute": now.minute, "second": now.second, "weekday": now.weekday(), # 0=Monday, 6=Sunday "timezone": timezone_str } # Wrap the result in the content field return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": json.dumps(result, ensure_ascii=False, indent=2) } ] } } except Exception as e: return create_error_response(request_id, -32603, f"Failed to get date and time: {str(e)}") async def get_current_date(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """Get the current date.""" try: timezone_str = arguments.get("timezone", "local") if timezone_str == "UTC": now = datetime.now(timezone.utc) elif timezone_str == "local": now = datetime.now() else: try: import pytz tz = pytz.timezone(timezone_str) now = datetime.now(tz) except ImportError: now = datetime.now() except Exception: now = datetime.now() result = { "date": now.date().isoformat(), "year": now.year, "month": now.month, "day": now.day, "weekday": now.weekday(), "timezone": timezone_str } # Wrap the result in the content field return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": json.dumps(result, ensure_ascii=False, indent=2) } ] } } except Exception as e: return create_error_response(request_id, -32603, f"Failed to get date: {str(e)}") async def get_current_time(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """Get the current time.""" try: timezone_str = arguments.get("timezone", "local") if timezone_str == "UTC": now = datetime.now(timezone.utc) elif timezone_str == "local": now = datetime.now() else: try: import pytz tz = pytz.timezone(timezone_str) now = datetime.now(tz) except ImportError: now = datetime.now() except Exception: now = datetime.now() result = { "time": now.time().isoformat(), "hour": now.hour, "minute": now.minute, "second": now.second, "timezone": timezone_str } # Wrap the result in the content field return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": json.dumps(result, ensure_ascii=False, indent=2) } ] } } except Exception as e: return create_error_response(request_id, -32603, f"Failed to get time: {str(e)}") async def format_datetime(arguments: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """Format date and time.""" try: format_string = arguments.get("format", "%Y-%m-%d %H:%M:%S") timezone_str = arguments.get("timezone", "local") if timezone_str == "UTC": now = datetime.now(timezone.utc) elif timezone_str == "local": now = datetime.now() else: try: import pytz tz = pytz.timezone(timezone_str) now = datetime.now(tz) except ImportError: now = datetime.now() except Exception: now = datetime.now() formatted_datetime = now.strftime(format_string) result = { "formatted_datetime": formatted_datetime, "format": format_string, "original_datetime": now.isoformat(), "timezone": timezone_str } # Wrap the result in the content field return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": json.dumps(result, ensure_ascii=False, indent=2) } ] } } except Exception as e: return create_error_response(request_id, -32603, f"Failed to format date and time: {str(e)}") if __name__ == "__main__": asyncio.run(handle_mcp_streaming(handle_request))