add mcp-ui
This commit is contained in:
parent
5dfe2eba28
commit
11e2ce2c17
11
skills/developing/mcp-ui/.claude-plugin/plugin.json
Normal file
11
skills/developing/mcp-ui/.claude-plugin/plugin.json
Normal file
@ -0,0 +1,11 @@
|
||||
{
|
||||
"name": "mcp-ui",
|
||||
"description": "Provides interactive UI components through MCP tool responses.",
|
||||
"mcpServers": {
|
||||
"mcp_ui": {
|
||||
"transport": "stdio",
|
||||
"command": "python",
|
||||
"args": ["./ui_render_server.py", "{bot_id}"]
|
||||
}
|
||||
}
|
||||
}
|
||||
252
skills/developing/mcp-ui/mcp_common.py
Normal file
252
skills/developing/mcp-ui/mcp_common.py
Normal file
@ -0,0 +1,252 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Shared utility functions for the MCP server.
|
||||
Provides common functionality for path handling, file validation, and request processing.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import asyncio
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
import re
|
||||
|
||||
def get_allowed_directory():
|
||||
"""Get the directory that is allowed to be accessed."""
|
||||
# Prefer dataset_dir passed through command-line arguments.
|
||||
if len(sys.argv) > 1:
|
||||
dataset_dir = sys.argv[1]
|
||||
return os.path.abspath(dataset_dir)
|
||||
|
||||
# Read the project data directory from the environment variable.
|
||||
project_dir = os.getenv("PROJECT_DATA_DIR", "./projects/data")
|
||||
return os.path.abspath(project_dir)
|
||||
|
||||
|
||||
def resolve_file_path(file_path: str, default_subfolder: str = "default") -> str:
|
||||
"""
|
||||
Resolve a file path, supporting both folder/document.txt and document.txt formats.
|
||||
|
||||
Args:
|
||||
file_path: Input file path.
|
||||
default_subfolder: Default subfolder name to use when only a filename is provided.
|
||||
|
||||
Returns:
|
||||
The resolved full file path.
|
||||
"""
|
||||
# If the path contains a folder separator, use it directly.
|
||||
if '/' in file_path or '\\' in file_path:
|
||||
clean_path = file_path.replace('\\', '/')
|
||||
|
||||
# Remove the projects/ prefix if it exists.
|
||||
if clean_path.startswith('projects/'):
|
||||
clean_path = clean_path[9:] # Remove the 'projects/' prefix.
|
||||
elif clean_path.startswith('./projects/'):
|
||||
clean_path = clean_path[11:] # Remove the './projects/' prefix.
|
||||
else:
|
||||
# If only a filename is provided, add the default subfolder.
|
||||
clean_path = f"{default_subfolder}/{file_path}"
|
||||
|
||||
# Get the allowed directory.
|
||||
project_data_dir = get_allowed_directory()
|
||||
|
||||
# Try to locate the file directly under the project directory.
|
||||
full_path = os.path.join(project_data_dir, clean_path.lstrip('./'))
|
||||
if os.path.exists(full_path):
|
||||
return full_path
|
||||
|
||||
# If the direct path does not exist, try a recursive search.
|
||||
found = find_file_in_project(clean_path, project_data_dir)
|
||||
if found:
|
||||
return found
|
||||
|
||||
# If this is a bare filename and it was not found under the default subfolder,
|
||||
# try looking in the project root.
|
||||
if '/' not in file_path and '\\' not in file_path:
|
||||
root_path = os.path.join(project_data_dir, file_path)
|
||||
if os.path.exists(root_path):
|
||||
return root_path
|
||||
|
||||
raise FileNotFoundError(f"File not found: {file_path} (searched in {project_data_dir})")
|
||||
|
||||
|
||||
def find_file_in_project(filename: str, project_dir: str) -> Optional[str]:
|
||||
"""Recursively search for a file inside the project directory."""
|
||||
# If filename includes a path, only search within the specified path.
|
||||
if '/' in filename:
|
||||
parts = filename.split('/')
|
||||
target_file = parts[-1]
|
||||
search_dir = os.path.join(project_dir, *parts[:-1])
|
||||
|
||||
if os.path.exists(search_dir):
|
||||
target_path = os.path.join(search_dir, target_file)
|
||||
if os.path.exists(target_path):
|
||||
return target_path
|
||||
else:
|
||||
# For a bare filename, recursively search the whole project directory.
|
||||
for root, dirs, files in os.walk(project_dir):
|
||||
if filename in files:
|
||||
return os.path.join(root, filename)
|
||||
return None
|
||||
|
||||
|
||||
def load_tools_from_json(tools_file_name: str) -> List[Dict[str, Any]]:
|
||||
"""Load tool definitions from a JSON file."""
|
||||
try:
|
||||
tools_file = os.path.join(os.path.dirname(__file__), tools_file_name)
|
||||
if os.path.exists(tools_file):
|
||||
with open(tools_file, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
else:
|
||||
# If the JSON file does not exist, use the default definitions.
|
||||
return []
|
||||
except Exception as e:
|
||||
print(f"Warning: Unable to load tool definition JSON file: {str(e)}")
|
||||
return []
|
||||
|
||||
|
||||
def create_error_response(request_id: Any, code: int, message: str) -> Dict[str, Any]:
|
||||
"""Create a standardized error response."""
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"id": request_id,
|
||||
"error": {
|
||||
"code": code,
|
||||
"message": message
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def create_success_response(request_id: Any, result: Any) -> Dict[str, Any]:
|
||||
"""Create a standardized success response."""
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"id": request_id,
|
||||
"result": result
|
||||
}
|
||||
|
||||
|
||||
def create_initialize_response(request_id: Any, server_name: str, server_version: str = "1.0.0") -> Dict[str, Any]:
|
||||
"""Create a standardized initialize response."""
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"id": request_id,
|
||||
"result": {
|
||||
"protocolVersion": "2024-11-05",
|
||||
"capabilities": {
|
||||
"tools": {}
|
||||
},
|
||||
"serverInfo": {
|
||||
"name": server_name,
|
||||
"version": server_version
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def create_ping_response(request_id: Any) -> Dict[str, Any]:
|
||||
"""Create a standardized ping response."""
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"id": request_id,
|
||||
"result": {
|
||||
"pong": True
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def create_tools_list_response(request_id: Any, tools: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""Create a standardized tools/list response."""
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"id": request_id,
|
||||
"result": {
|
||||
"tools": tools
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def is_regex_pattern(pattern: str) -> bool:
|
||||
"""Check whether a string should be treated as a regular expression pattern."""
|
||||
# Check the /pattern/ format.
|
||||
if pattern.startswith('/') and pattern.endswith('/') and len(pattern) > 2:
|
||||
return True
|
||||
|
||||
# Check the r"pattern" or r'pattern' format.
|
||||
if pattern.startswith(('r"', "r'")) and pattern.endswith(('"', "'")) and len(pattern) > 3:
|
||||
return True
|
||||
|
||||
# Check whether it contains regex metacharacters.
|
||||
regex_chars = {'*', '+', '?', '|', '(', ')', '[', ']', '{', '}', '^', '$', '\\', '.'}
|
||||
return any(char in pattern for char in regex_chars)
|
||||
|
||||
|
||||
def compile_pattern(pattern: str) -> Union[re.Pattern, str, None]:
|
||||
"""Compile a regex pattern, or return the original string if it is not regex."""
|
||||
if not is_regex_pattern(pattern):
|
||||
return pattern
|
||||
|
||||
try:
|
||||
# Handle the /pattern/ format.
|
||||
if pattern.startswith('/') and pattern.endswith('/'):
|
||||
regex_body = pattern[1:-1]
|
||||
return re.compile(regex_body)
|
||||
|
||||
# Handle the r"pattern" or r'pattern' format.
|
||||
if pattern.startswith(('r"', "r'")) and pattern.endswith(('"', "'")):
|
||||
regex_body = pattern[2:-1]
|
||||
return re.compile(regex_body)
|
||||
|
||||
# Directly compile strings that contain regex metacharacters.
|
||||
return re.compile(pattern)
|
||||
except re.error as e:
|
||||
# If compilation fails, return None to indicate an invalid regex.
|
||||
print(f"Warning: Regular expression '{pattern}' compilation failed: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def handle_mcp_streaming(request_handler):
|
||||
"""Handle the standard main loop for MCP requests."""
|
||||
try:
|
||||
while True:
|
||||
# Read from stdin
|
||||
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
|
||||
if not line:
|
||||
break
|
||||
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
try:
|
||||
request = json.loads(line)
|
||||
response = await request_handler(request)
|
||||
|
||||
# Write to stdout
|
||||
sys.stdout.write(json.dumps(response, ensure_ascii=False) + "\n")
|
||||
sys.stdout.flush()
|
||||
|
||||
except json.JSONDecodeError:
|
||||
error_response = {
|
||||
"jsonrpc": "2.0",
|
||||
"error": {
|
||||
"code": -32700,
|
||||
"message": "Parse error"
|
||||
}
|
||||
}
|
||||
sys.stdout.write(json.dumps(error_response, ensure_ascii=False) + "\n")
|
||||
sys.stdout.flush()
|
||||
|
||||
except Exception as e:
|
||||
error_response = {
|
||||
"jsonrpc": "2.0",
|
||||
"error": {
|
||||
"code": -32603,
|
||||
"message": f"Internal error: {str(e)}"
|
||||
}
|
||||
}
|
||||
sys.stdout.write(json.dumps(error_response, ensure_ascii=False) + "\n")
|
||||
sys.stdout.flush()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
30
skills/developing/mcp-ui/mcp_ui_tools.json
Normal file
30
skills/developing/mcp-ui/mcp_ui_tools.json
Normal file
@ -0,0 +1,30 @@
|
||||
[
|
||||
{
|
||||
"name": "render_ui",
|
||||
"description": "Render an interactive HTML UI widget in the chat. Use this tool when the user asks for interactive content, visualizations, forms, or dynamic displays that benefit from rich HTML rendering rather than plain text.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"title": {
|
||||
"type": "string",
|
||||
"description": "A descriptive title for the UI widget"
|
||||
},
|
||||
"html_content": {
|
||||
"type": "string",
|
||||
"description": "Complete HTML content to render. Can include inline CSS and JavaScript within <style> and <script> tags."
|
||||
},
|
||||
"width": {
|
||||
"type": "string",
|
||||
"description": "CSS width for the iframe. Default: '100%'",
|
||||
"default": "100%"
|
||||
},
|
||||
"height": {
|
||||
"type": "string",
|
||||
"description": "CSS height for the iframe. Default: '400px'",
|
||||
"default": "400px"
|
||||
}
|
||||
},
|
||||
"required": ["title", "html_content"]
|
||||
}
|
||||
}
|
||||
]
|
||||
140
skills/developing/mcp-ui/ui_render_server.py
Normal file
140
skills/developing/mcp-ui/ui_render_server.py
Normal file
@ -0,0 +1,140 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
MCP UI Server - provides interactive UI rendering tools.
|
||||
Uses mcp-ui-server SDK to create standard UIResource objects.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import sys
|
||||
from typing import Any, Dict
|
||||
|
||||
from mcp_ui_server import create_ui_resource, UIMetadataKey
|
||||
|
||||
from mcp_common import (
|
||||
create_error_response,
|
||||
create_initialize_response,
|
||||
create_ping_response,
|
||||
create_tools_list_response,
|
||||
load_tools_from_json,
|
||||
handle_mcp_streaming,
|
||||
)
|
||||
|
||||
|
||||
def render_ui(
|
||||
title: str, html_content: str, width: str = "100%", height: str = "400px"
|
||||
) -> Dict[str, Any]:
|
||||
"""Create a UI resource and serialize it as JSON text for passthrough.
|
||||
|
||||
The UIResource is serialized as a JSON string inside a type:"text" content block.
|
||||
This is necessary because langchain_mcp_adapters strips metadata (uri, mimeType)
|
||||
from EmbeddedResource objects. By wrapping it as text, the full resource JSON
|
||||
passes through to the frontend for detection and rendering.
|
||||
"""
|
||||
try:
|
||||
uri_slug = title.replace(" ", "-").lower()[:50]
|
||||
ui_resource = create_ui_resource(
|
||||
{
|
||||
"uri": f"ui://mcp-ui-skill/{uri_slug}",
|
||||
"content": {"type": "rawHtml", "htmlString": html_content},
|
||||
"encoding": "text",
|
||||
"uiMetadata": {
|
||||
UIMetadataKey.PREFERRED_FRAME_SIZE: [width, height],
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
# Serialize the full UIResource as JSON string for passthrough
|
||||
resource_json = json.dumps(
|
||||
ui_resource.model_dump(mode="json"), ensure_ascii=False
|
||||
)
|
||||
|
||||
return {"content": [{"type": "text", "text": resource_json}]}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"content": [
|
||||
{"type": "text", "text": f"Error creating UI resource: {str(e)}"}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Handle an MCP request."""
|
||||
try:
|
||||
method = request.get("method")
|
||||
params = request.get("params", {})
|
||||
request_id = request.get("id")
|
||||
|
||||
if method == "initialize":
|
||||
return create_initialize_response(request_id, "mcp-ui")
|
||||
|
||||
elif method == "ping":
|
||||
return create_ping_response(request_id)
|
||||
|
||||
elif method == "tools/list":
|
||||
tools = load_tools_from_json("mcp_ui_tools.json")
|
||||
if not tools:
|
||||
tools = [
|
||||
{
|
||||
"name": "render_ui",
|
||||
"description": "Render an interactive HTML UI widget in the chat.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"title": {
|
||||
"type": "string",
|
||||
"description": "A descriptive title for the UI widget",
|
||||
},
|
||||
"html_content": {
|
||||
"type": "string",
|
||||
"description": "Complete HTML content to render.",
|
||||
},
|
||||
},
|
||||
"required": ["title", "html_content"],
|
||||
},
|
||||
}
|
||||
]
|
||||
return create_tools_list_response(request_id, tools)
|
||||
|
||||
elif method == "tools/call":
|
||||
tool_name = params.get("name")
|
||||
arguments = params.get("arguments", {})
|
||||
|
||||
if tool_name == "render_ui":
|
||||
title = arguments.get("title", "UI Widget")
|
||||
html_content = arguments.get("html_content", "")
|
||||
width = arguments.get("width", "100%")
|
||||
height = arguments.get("height", "400px")
|
||||
|
||||
if not html_content:
|
||||
return create_error_response(
|
||||
request_id, -32602, "Missing required parameter: html_content"
|
||||
)
|
||||
|
||||
result = render_ui(title, html_content, width, height)
|
||||
return {"jsonrpc": "2.0", "id": request_id, "result": result}
|
||||
|
||||
else:
|
||||
return create_error_response(
|
||||
request_id, -32601, f"Unknown tool: {tool_name}"
|
||||
)
|
||||
|
||||
else:
|
||||
return create_error_response(
|
||||
request_id, -32601, f"Unknown method: {method}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
return create_error_response(
|
||||
request.get("id"), -32603, f"Internal error: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main entry point."""
|
||||
await handle_mcp_streaming(handle_request)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Loading…
Reference in New Issue
Block a user