124 lines
3.8 KiB
Python
124 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
天气查询MCP服务器
|
||
提供中国天气网天气预报查询功能
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import sys
|
||
import os
|
||
from typing import Any, Dict
|
||
|
||
# 将 scripts 目录加入 sys.path,以便导入 weather 模块
|
||
SCRIPTS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "scripts")
|
||
if SCRIPTS_DIR not in sys.path:
|
||
sys.path.insert(0, SCRIPTS_DIR)
|
||
|
||
# 将 mcp 目录加入 sys.path,以便导入 mcp_common
|
||
MCP_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "mcp")
|
||
MCP_DIR = os.path.abspath(MCP_DIR)
|
||
if MCP_DIR not in sys.path:
|
||
sys.path.insert(0, MCP_DIR)
|
||
|
||
from mcp_common import (
|
||
create_error_response,
|
||
create_initialize_response,
|
||
create_ping_response,
|
||
create_tools_list_response,
|
||
handle_mcp_streaming,
|
||
)
|
||
|
||
from weather import query_weather, find_city_code
|
||
|
||
|
||
# MCP 工具定义
|
||
WEATHER_TOOLS = [
|
||
{
|
||
"name": "weather_query",
|
||
"description": "查询指定城市的天气预报,支持1/3/7/15/40天预报。返回格式化的天气文本,包含日期、天气状况、温度、风向等信息。",
|
||
"inputSchema": {
|
||
"type": "object",
|
||
"properties": {
|
||
"city": {
|
||
"type": "string",
|
||
"description": "城市名称,如:杭州、北京、上海、深圳"
|
||
},
|
||
"days": {
|
||
"type": "integer",
|
||
"description": "查询天数,可选1/3/7/15/40,默认7天",
|
||
"enum": [1, 3, 7, 15, 40],
|
||
"default": 7
|
||
}
|
||
},
|
||
"required": ["city"]
|
||
}
|
||
}
|
||
]
|
||
|
||
|
||
def handle_weather_query(city: str, days: int = 7) -> Dict[str, Any]:
|
||
"""处理天气预报查询"""
|
||
try:
|
||
city_code = find_city_code(city)
|
||
if not city_code:
|
||
return {
|
||
"content": [{"type": "text", "text": f"未找到城市 \"{city}\""}]
|
||
}
|
||
|
||
result = query_weather(city, days)
|
||
if result:
|
||
return {"content": [{"type": "text", "text": result}]}
|
||
else:
|
||
return {"content": [{"type": "text", "text": f"无法获取 {city} 的天气预报"}]}
|
||
|
||
except Exception as e:
|
||
return {"content": [{"type": "text", "text": f"查询天气预报出错: {str(e)}"}]}
|
||
|
||
|
||
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""Handle MCP request"""
|
||
try:
|
||
method = request.get("method")
|
||
params = request.get("params", {})
|
||
request_id = request.get("id")
|
||
|
||
if method == "initialize":
|
||
return create_initialize_response(request_id, "weather-query")
|
||
|
||
elif method == "ping":
|
||
return create_ping_response(request_id)
|
||
|
||
elif method == "tools/list":
|
||
return create_tools_list_response(request_id, WEATHER_TOOLS)
|
||
|
||
elif method == "tools/call":
|
||
tool_name = params.get("name")
|
||
arguments = params.get("arguments", {})
|
||
|
||
if tool_name == "weather_query":
|
||
city = arguments.get("city", "")
|
||
days = arguments.get("days", 7)
|
||
if not city:
|
||
return create_error_response(request_id, -32602, "Missing required parameter: city")
|
||
result = handle_weather_query(city, days)
|
||
return {"jsonrpc": "2.0", "id": request_id, "result": result}
|
||
|
||
else:
|
||
return create_error_response(request_id, -32601, f"Unknown tool: {tool_name}")
|
||
|
||
else:
|
||
return create_error_response(request_id, -32601, f"Unknown method: {method}")
|
||
|
||
except Exception as e:
|
||
return create_error_response(request.get("id"), -32603, f"Internal error: {str(e)}")
|
||
|
||
|
||
async def main():
|
||
"""Main entry point."""
|
||
await handle_mcp_streaming(handle_request)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|