197 lines
6.2 KiB
Python
197 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MCP UI Server - provides interactive UI rendering tools.
|
|
Uses mcp-ui-server SDK to create standard UIResource objects.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
from typing import Any, Dict
|
|
|
|
from mcp_ui_server import create_ui_resource, UIMetadataKey
|
|
|
|
from mcp_common import (
|
|
create_error_response,
|
|
create_initialize_response,
|
|
create_ping_response,
|
|
create_tools_list_response,
|
|
load_tools_from_json,
|
|
handle_mcp_streaming,
|
|
)
|
|
|
|
|
|
ASK_USER_MARKER = "__ask_user__"
|
|
|
|
|
|
def ask_user(questions: list) -> Dict[str, Any]:
|
|
"""Create an ask_user response.
|
|
|
|
Args:
|
|
questions: List of dicts, each with "question", "options", and "multi_select".
|
|
|
|
Returns a JSON structure with a marker so the backend can detect it
|
|
and emit it as a special delta.ask_user event at the end of the stream.
|
|
"""
|
|
normalized = []
|
|
for q in questions:
|
|
normalized.append({
|
|
"question": q.get("question", ""),
|
|
"options": q.get("options", []),
|
|
"multi_select": q.get("multi_select", False),
|
|
})
|
|
payload = {
|
|
"__type__": ASK_USER_MARKER,
|
|
"questions": normalized,
|
|
}
|
|
return {
|
|
"content": [
|
|
{"type": "text", "text": json.dumps(payload, ensure_ascii=False)}
|
|
]
|
|
}
|
|
|
|
|
|
def render_ui(
|
|
title: str,
|
|
html_content: str = "",
|
|
url: str = "",
|
|
width: str = "100%",
|
|
height: str = "400px",
|
|
) -> Dict[str, Any]:
|
|
"""Create a UI resource and serialize it as JSON text for passthrough.
|
|
|
|
Supports two modes:
|
|
- rawHtml: provide html_content to render custom HTML.
|
|
- externalUrl: provide url to embed an external webpage.
|
|
|
|
The UIResource is serialized as a JSON string inside a type:"text" content block.
|
|
"""
|
|
try:
|
|
uri_slug = title.replace(" ", "-").lower()[:50]
|
|
|
|
if url:
|
|
content = {"type": "externalUrl", "iframeUrl": url}
|
|
else:
|
|
content = {"type": "rawHtml", "htmlString": html_content}
|
|
|
|
ui_resource = create_ui_resource(
|
|
{
|
|
"uri": f"ui://mcp-ui-skill/{uri_slug}",
|
|
"content": content,
|
|
"encoding": "text",
|
|
"uiMetadata": {
|
|
UIMetadataKey.PREFERRED_FRAME_SIZE: [width, height],
|
|
},
|
|
}
|
|
)
|
|
|
|
resource_json = json.dumps(
|
|
ui_resource.model_dump(mode="json"), ensure_ascii=False
|
|
)
|
|
|
|
return {"content": [{"type": "text", "text": resource_json}]}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"content": [
|
|
{"type": "text", "text": f"Error creating UI resource: {str(e)}"}
|
|
]
|
|
}
|
|
|
|
|
|
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Handle an MCP request."""
|
|
try:
|
|
method = request.get("method")
|
|
params = request.get("params", {})
|
|
request_id = request.get("id")
|
|
|
|
if method == "initialize":
|
|
return create_initialize_response(request_id, "mcp-ui")
|
|
|
|
elif method == "ping":
|
|
return create_ping_response(request_id)
|
|
|
|
elif method == "tools/list":
|
|
tools = load_tools_from_json("mcp_ui_tools.json")
|
|
if not tools:
|
|
tools = [
|
|
{
|
|
"name": "render_ui",
|
|
"description": "Render an interactive UI widget in the chat.",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"title": {
|
|
"type": "string",
|
|
"description": "A descriptive title for the UI widget",
|
|
},
|
|
"html_content": {
|
|
"type": "string",
|
|
"description": "Complete HTML content to render. Use this OR url.",
|
|
},
|
|
"url": {
|
|
"type": "string",
|
|
"description": "External URL to embed. Use this OR html_content.",
|
|
},
|
|
},
|
|
"required": ["title"],
|
|
},
|
|
}
|
|
]
|
|
return create_tools_list_response(request_id, tools)
|
|
|
|
elif method == "tools/call":
|
|
tool_name = params.get("name")
|
|
arguments = params.get("arguments", {})
|
|
|
|
if tool_name == "render_ui":
|
|
title = arguments.get("title", "UI Widget")
|
|
html_content = arguments.get("html_content", "")
|
|
url = arguments.get("url", "")
|
|
width = arguments.get("width", "100%")
|
|
height = arguments.get("height", "400px")
|
|
|
|
if not html_content and not url:
|
|
return create_error_response(
|
|
request_id, -32602, "Missing required parameter: html_content or url"
|
|
)
|
|
|
|
result = render_ui(title, html_content, url, width, height)
|
|
return {"jsonrpc": "2.0", "id": request_id, "result": result}
|
|
|
|
elif tool_name == "ask_user":
|
|
questions = arguments.get("questions", [])
|
|
|
|
if not questions:
|
|
return create_error_response(
|
|
request_id, -32602, "Missing required parameter: questions"
|
|
)
|
|
|
|
result = ask_user(questions)
|
|
return {"jsonrpc": "2.0", "id": request_id, "result": result}
|
|
|
|
else:
|
|
return create_error_response(
|
|
request_id, -32601, f"Unknown tool: {tool_name}"
|
|
)
|
|
|
|
else:
|
|
return create_error_response(
|
|
request_id, -32601, f"Unknown method: {method}"
|
|
)
|
|
|
|
except Exception as e:
|
|
return create_error_response(
|
|
request.get("id"), -32603, f"Internal error: {str(e)}"
|
|
)
|
|
|
|
|
|
async def main():
|
|
"""Main entry point."""
|
|
await handle_mcp_streaming(handle_request)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|