qwen_agent/skills/weather/mcp.py
2026-03-24 00:40:19 +08:00

124 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
天气查询MCP服务器
提供中国天气网天气预报查询功能
"""
import asyncio
import json
import sys
import os
from typing import Any, Dict
# 将 scripts 目录加入 sys.path以便导入 weather 模块
SCRIPTS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "scripts")
if SCRIPTS_DIR not in sys.path:
sys.path.insert(0, SCRIPTS_DIR)
# 将 mcp 目录加入 sys.path以便导入 mcp_common
MCP_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "mcp")
MCP_DIR = os.path.abspath(MCP_DIR)
if MCP_DIR not in sys.path:
sys.path.insert(0, MCP_DIR)
from mcp_common import (
create_error_response,
create_initialize_response,
create_ping_response,
create_tools_list_response,
handle_mcp_streaming,
)
from weather import query_weather, find_city_code
# MCP 工具定义
WEATHER_TOOLS = [
{
"name": "weather_query",
"description": "查询指定城市的天气预报支持1/3/7/15/40天预报。返回格式化的天气文本包含日期、天气状况、温度、风向等信息。",
"inputSchema": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称,如:杭州、北京、上海、深圳"
},
"days": {
"type": "integer",
"description": "查询天数可选1/3/7/15/40默认7天",
"enum": [1, 3, 7, 15, 40],
"default": 7
}
},
"required": ["city"]
}
}
]
def handle_weather_query(city: str, days: int = 7) -> Dict[str, Any]:
"""处理天气预报查询"""
try:
city_code = find_city_code(city)
if not city_code:
return {
"content": [{"type": "text", "text": f"未找到城市 \"{city}\""}]
}
result = query_weather(city, days)
if result:
return {"content": [{"type": "text", "text": result}]}
else:
return {"content": [{"type": "text", "text": f"无法获取 {city} 的天气预报"}]}
except Exception as e:
return {"content": [{"type": "text", "text": f"查询天气预报出错: {str(e)}"}]}
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle MCP request"""
try:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "initialize":
return create_initialize_response(request_id, "weather-query")
elif method == "ping":
return create_ping_response(request_id)
elif method == "tools/list":
return create_tools_list_response(request_id, WEATHER_TOOLS)
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "weather_query":
city = arguments.get("city", "")
days = arguments.get("days", 7)
if not city:
return create_error_response(request_id, -32602, "Missing required parameter: city")
result = handle_weather_query(city, days)
return {"jsonrpc": "2.0", "id": request_id, "result": result}
else:
return create_error_response(request_id, -32601, f"Unknown tool: {tool_name}")
else:
return create_error_response(request_id, -32601, f"Unknown method: {method}")
except Exception as e:
return create_error_response(request.get("id"), -32603, f"Internal error: {str(e)}")
async def main():
"""Main entry point."""
await handle_mcp_streaming(handle_request)
if __name__ == "__main__":
asyncio.run(main())