Merge branch 'developing' into staging

This commit is contained in:
朱潮 2026-05-28 09:55:22 +08:00
commit 091659d693
18 changed files with 1232 additions and 6 deletions

View File

@ -186,6 +186,9 @@ init_with_fastapi(app)
# Mount the public directory as static files
app.mount("/public", StaticFiles(directory="public"), name="static")
# Mount robot projects directory as static files (supports HTML/CSS/JS/images)
app.mount("/robots", StaticFiles(directory="projects/robot", html=True), name="robots")
# Add CORS middleware for frontend pages
app.add_middleware(
CORSMiddleware,

22
poetry.lock generated
View File

@ -2765,6 +2765,26 @@ cli = ["python-dotenv (>=1.0.0)", "typer (>=0.16.0)"]
rich = ["rich (>=13.9.4)"]
ws = ["websockets (>=15.0.1)"]
[[package]]
name = "mcp-ui-server"
version = "1.0.0"
description = "mcp-ui Server SDK for Python"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "mcp_ui_server-1.0.0-py3-none-any.whl", hash = "sha256:85f53b2e4300fbd175f1fbb7c40f2566b1f4a4ad03a1f33647867c82a3159dcc"},
{file = "mcp_ui_server-1.0.0.tar.gz", hash = "sha256:5ab8f17b93bf794966af7c35e9a575e4f21a9ba2bab3d316cfc107a15f88a3c9"},
]
[package.dependencies]
mcp = ">=1.0.0"
pydantic = ">=2.0.0"
typing-extensions = ">=4.0.0"
[package.extras]
dev = ["pyright (>=1.1.0)", "pytest (>=7.0.0)", "ruff (>=0.1.0)"]
[[package]]
name = "mdit-py-plugins"
version = "0.5.0"
@ -7132,4 +7152,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
[metadata]
lock-version = "2.1"
python-versions = ">=3.12,<3.15"
content-hash = "dc130664802ad1344adc341931036a343f9892934a41bbc15c48663d0146696b"
content-hash = "9c949ca9f49b62502571dadab242919fad5e90621f187998e6abdcbcbd448fe4"

View File

@ -42,6 +42,7 @@ dependencies = [
"daytona-sdk",
"langchain-daytona",
"langfuse (>=2.0.0,<4.0.0)",
"mcp-ui-server (>=1.0.0,<2.0.0)",
]
[tool.poetry.requires-plugins]

View File

@ -95,6 +95,7 @@ linkify-it-py==2.1.0 ; python_version >= "3.12" and python_version < "3.15"
markdown-it-py==4.0.0 ; python_version >= "3.12" and python_version < "3.15"
markdownify==1.2.2 ; python_version >= "3.12" and python_version < "3.15"
markupsafe==3.0.3 ; python_version >= "3.12" and python_version < "3.15"
mcp-ui-server==1.0.0 ; python_version >= "3.12" and python_version < "3.15"
mcp==1.12.4 ; python_version >= "3.12" and python_version < "3.15"
mdit-py-plugins==0.5.0 ; python_version >= "3.12" and python_version < "3.15"
mdurl==0.1.2 ; python_version >= "3.12" and python_version < "3.15"

View File

@ -92,6 +92,7 @@ async def enhanced_generate_stream_response(
logger.info(f"Starting agent stream response")
chunk_id = 0
message_tag = ""
current_tool_name = ""
last_answer_first_char_duration_ms = None
waiting_for_answer_first_char = False
agent, checkpointer, sandbox = await init_agent(config)
@ -108,10 +109,13 @@ async def enhanced_generate_stream_response(
if msg.tool_call_chunks:
message_tag = "TOOL_CALL"
waiting_for_answer_first_char = False
if config.tool_response:
for tool_call_chunk in msg.tool_call_chunks:
chunk_name = tool_call_chunk.get("name") if isinstance(tool_call_chunk, dict) else getattr(tool_call_chunk, "name", None)
chunk_args = tool_call_chunk.get("args") if isinstance(tool_call_chunk, dict) else getattr(tool_call_chunk, "args", None)
for tool_call_chunk in msg.tool_call_chunks:
chunk_name = tool_call_chunk.get("name") if isinstance(tool_call_chunk, dict) else getattr(tool_call_chunk, "name", None)
chunk_args = tool_call_chunk.get("args") if isinstance(tool_call_chunk, dict) else getattr(tool_call_chunk, "args", None)
if chunk_name:
current_tool_name = chunk_name
# Always output ask_user and render_ui tool calls even when tool_response is disabled
if config.tool_response or current_tool_name in ('ask_user', 'render_ui'):
if chunk_name:
new_content = f"[{message_tag}] {chunk_name}\n"
if chunk_args:
@ -140,7 +144,13 @@ async def enhanced_generate_stream_response(
elif isinstance(msg, ToolMessage) and msg.content:
message_tag = "TOOL_RESPONSE"
waiting_for_answer_first_char = False
if config.tool_response:
# Always output UIResource responses even when tool_response is disabled
is_ui_resource = (
msg.text
and msg.text.lstrip().startswith('{"')
and '"ui://' in msg.text
)
if config.tool_response or is_ui_resource:
new_content = f"[{message_tag}] {msg.name}\n{msg.text}\n"
# Collect full content

View File

@ -0,0 +1,19 @@
{
"name": "data-dashboard",
"description": "Renders data as an interactive dashboard card UI using the mcp-ui protocol.",
"hooks": {
"PrePrompt": [
{
"type": "command",
"command": "python hooks/pre_prompt.py"
}
]
},
"mcpServers": {
"data_dashboard": {
"transport": "stdio",
"command": "python",
"args": ["./dashboard_server.py", "{bot_id}"]
}
}
}

View File

@ -0,0 +1,187 @@
#!/usr/bin/env python3
"""
Data Dashboard MCP Server - renders metric data as an interactive dashboard.
Returns UIResource via the mcp-ui protocol so the frontend renders it as an iframe.
"""
import asyncio
import json
from typing import Any, Dict, List
from mcp_ui_server import create_ui_resource, UIMetadataKey
from mcp_common import (
create_error_response,
create_initialize_response,
create_ping_response,
create_tools_list_response,
load_tools_from_json,
handle_mcp_streaming,
)
def _build_dashboard_html(title: str, metrics: List[Dict[str, str]]) -> str:
"""Build a self-contained HTML dashboard from metrics data."""
cards_html = ""
for m in metrics:
label = _esc(m.get("label", ""))
value = _esc(m.get("value", ""))
change = m.get("change", "")
change_type = m.get("change_type", "neutral")
change_html = ""
if change:
color = "#16a34a" if change_type == "up" else "#dc2626" if change_type == "down" else "#6b7280"
arrow = "&#9650;" if change_type == "up" else "&#9660;" if change_type == "down" else "&#8212;"
change_html = f'<span class="change" style="color:{color}"><span class="arrow">{arrow}</span> {_esc(change)}</span>'
cards_html += f"""
<div class="card">
<div class="card-label">{label}</div>
<div class="card-value">{value}</div>
{f'<div class="card-change">{change_html}</div>' if change_html else ''}
</div>"""
return f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{_esc(title)}</title>
<style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: #f8fafc; padding: 24px; }}
h1 {{ font-size: 20px; font-weight: 600; color: #0f172a; margin-bottom: 20px; }}
.grid {{ display: grid; grid-template-columns: repeat(auto-fill, minmax(200px, 1fr)); gap: 16px; }}
.card {{ background: #fff; border: 1px solid #e2e8f0; border-radius: 12px; padding: 20px; transition: box-shadow 0.15s; }}
.card:hover {{ box-shadow: 0 4px 12px rgba(0,0,0,0.06); }}
.card-label {{ font-size: 13px; color: #64748b; margin-bottom: 8px; }}
.card-value {{ font-size: 28px; font-weight: 700; color: #0f172a; }}
.card-change {{ margin-top: 8px; font-size: 13px; font-weight: 500; }}
.arrow {{ font-size: 10px; }}
</style>
</head>
<body>
<h1>{_esc(title)}</h1>
<div class="grid">{cards_html}
</div>
</body>
</html>"""
def _esc(text: str) -> str:
"""Minimal HTML escaping."""
return text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;").replace('"', "&quot;")
def render_dashboard(title: str, metrics: List[Dict[str, str]]) -> Dict[str, Any]:
"""Create a UIResource dashboard and serialize it as JSON for TOOL_RESPONSE."""
try:
html = _build_dashboard_html(title, metrics)
uri_slug = title.replace(" ", "-").lower()[:50]
ui_resource = create_ui_resource(
{
"uri": f"ui://data-dashboard/{uri_slug}",
"content": {"type": "rawHtml", "htmlString": html},
"encoding": "text",
"uiMetadata": {
UIMetadataKey.PREFERRED_FRAME_SIZE: ["100%", "auto"],
},
}
)
resource_json = json.dumps(
ui_resource.model_dump(mode="json"), ensure_ascii=False
)
return {"content": [{"type": "text", "text": resource_json}]}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error creating dashboard: {str(e)}"}]
}
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle an MCP request."""
try:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "initialize":
return create_initialize_response(request_id, "data-dashboard")
elif method == "ping":
return create_ping_response(request_id)
elif method == "tools/list":
tools = load_tools_from_json("dashboard_tools.json")
if not tools:
tools = [
{
"name": "render_dashboard",
"description": "Render a data dashboard with metric cards.",
"inputSchema": {
"type": "object",
"properties": {
"title": {"type": "string"},
"metrics": {
"type": "array",
"items": {
"type": "object",
"properties": {
"label": {"type": "string"},
"value": {"type": "string"},
"change": {"type": "string"},
"change_type": {"type": "string", "enum": ["up", "down", "neutral"]},
},
"required": ["label", "value"],
},
},
},
"required": ["title", "metrics"],
},
}
]
return create_tools_list_response(request_id, tools)
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "render_dashboard":
title = arguments.get("title", "Dashboard")
metrics = arguments.get("metrics", [])
if not metrics:
return create_error_response(
request_id, -32602, "Missing required parameter: metrics"
)
result = render_dashboard(title, metrics)
return {"jsonrpc": "2.0", "id": request_id, "result": result}
else:
return create_error_response(
request_id, -32601, f"Unknown tool: {tool_name}"
)
else:
return create_error_response(
request_id, -32601, f"Unknown method: {method}"
)
except Exception as e:
return create_error_response(
request.get("id"), -32603, f"Internal error: {str(e)}"
)
async def main():
"""Main entry point."""
await handle_mcp_streaming(handle_request)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,43 @@
[
{
"name": "render_dashboard",
"description": "Render an interactive data dashboard with metric cards. Pass an array of metrics, each with label, value, and optional change/change_type fields. The dashboard is rendered as an HTML card UI.",
"inputSchema": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Dashboard title, e.g. 'Sales Overview'"
},
"metrics": {
"type": "array",
"description": "Array of metric objects to display as cards",
"items": {
"type": "object",
"properties": {
"label": {
"type": "string",
"description": "Metric name, e.g. 'Revenue'"
},
"value": {
"type": "string",
"description": "Metric value, e.g. '$12,345'"
},
"change": {
"type": "string",
"description": "Change indicator, e.g. '+12.5%' or '-3.2%'"
},
"change_type": {
"type": "string",
"enum": ["up", "down", "neutral"],
"description": "Direction of change"
}
},
"required": ["label", "value"]
}
}
},
"required": ["title", "metrics"]
}
}
]

View File

@ -0,0 +1,13 @@
## `render_dashboard` usage guide
### When to use render_dashboard:
- When the user wants to visualize data as metric cards (KPIs, stats, numbers)
- When summarizing numerical results (sales, traffic, performance metrics)
- When comparing multiple data points side by side
### How to call:
render_dashboard(title="Sales Overview", metrics=[
{"label": "Revenue", "value": "$12,345", "change": "+12.5%", "change_type": "up"},
{"label": "Users", "value": "1,234", "change": "-3.2%", "change_type": "down"},
{"label": "Conversion", "value": "4.5%", "change_type": "neutral"}
])

View File

@ -0,0 +1,8 @@
#!/usr/bin/env python3
"""PrePrompt hook - inject dashboard tool guide into system prompt."""
import os
guide_path = os.path.join(os.path.dirname(__file__), "dashboard_guide.md")
if os.path.exists(guide_path):
with open(guide_path, "r", encoding="utf-8") as f:
print(f.read())

View File

@ -0,0 +1,252 @@
#!/usr/bin/env python3
"""
Shared utility functions for the MCP server.
Provides common functionality for path handling, file validation, and request processing.
"""
import json
import os
import sys
import asyncio
from typing import Any, Dict, List, Optional, Union
import re
def get_allowed_directory():
"""Get the directory that is allowed to be accessed."""
# Prefer dataset_dir passed through command-line arguments.
if len(sys.argv) > 1:
dataset_dir = sys.argv[1]
return os.path.abspath(dataset_dir)
# Read the project data directory from the environment variable.
project_dir = os.getenv("PROJECT_DATA_DIR", "./projects/data")
return os.path.abspath(project_dir)
def resolve_file_path(file_path: str, default_subfolder: str = "default") -> str:
"""
Resolve a file path, supporting both folder/document.txt and document.txt formats.
Args:
file_path: Input file path.
default_subfolder: Default subfolder name to use when only a filename is provided.
Returns:
The resolved full file path.
"""
# If the path contains a folder separator, use it directly.
if '/' in file_path or '\\' in file_path:
clean_path = file_path.replace('\\', '/')
# Remove the projects/ prefix if it exists.
if clean_path.startswith('projects/'):
clean_path = clean_path[9:] # Remove the 'projects/' prefix.
elif clean_path.startswith('./projects/'):
clean_path = clean_path[11:] # Remove the './projects/' prefix.
else:
# If only a filename is provided, add the default subfolder.
clean_path = f"{default_subfolder}/{file_path}"
# Get the allowed directory.
project_data_dir = get_allowed_directory()
# Try to locate the file directly under the project directory.
full_path = os.path.join(project_data_dir, clean_path.lstrip('./'))
if os.path.exists(full_path):
return full_path
# If the direct path does not exist, try a recursive search.
found = find_file_in_project(clean_path, project_data_dir)
if found:
return found
# If this is a bare filename and it was not found under the default subfolder,
# try looking in the project root.
if '/' not in file_path and '\\' not in file_path:
root_path = os.path.join(project_data_dir, file_path)
if os.path.exists(root_path):
return root_path
raise FileNotFoundError(f"File not found: {file_path} (searched in {project_data_dir})")
def find_file_in_project(filename: str, project_dir: str) -> Optional[str]:
"""Recursively search for a file inside the project directory."""
# If filename includes a path, only search within the specified path.
if '/' in filename:
parts = filename.split('/')
target_file = parts[-1]
search_dir = os.path.join(project_dir, *parts[:-1])
if os.path.exists(search_dir):
target_path = os.path.join(search_dir, target_file)
if os.path.exists(target_path):
return target_path
else:
# For a bare filename, recursively search the whole project directory.
for root, dirs, files in os.walk(project_dir):
if filename in files:
return os.path.join(root, filename)
return None
def load_tools_from_json(tools_file_name: str) -> List[Dict[str, Any]]:
"""Load tool definitions from a JSON file."""
try:
tools_file = os.path.join(os.path.dirname(__file__), tools_file_name)
if os.path.exists(tools_file):
with open(tools_file, 'r', encoding='utf-8') as f:
return json.load(f)
else:
# If the JSON file does not exist, use the default definitions.
return []
except Exception as e:
print(f"Warning: Unable to load tool definition JSON file: {str(e)}")
return []
def create_error_response(request_id: Any, code: int, message: str) -> Dict[str, Any]:
"""Create a standardized error response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": code,
"message": message
}
}
def create_success_response(request_id: Any, result: Any) -> Dict[str, Any]:
"""Create a standardized success response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
def create_initialize_response(request_id: Any, server_name: str, server_version: str = "1.0.0") -> Dict[str, Any]:
"""Create a standardized initialize response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": server_name,
"version": server_version
}
}
}
def create_ping_response(request_id: Any) -> Dict[str, Any]:
"""Create a standardized ping response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"pong": True
}
}
def create_tools_list_response(request_id: Any, tools: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Create a standardized tools/list response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": tools
}
}
def is_regex_pattern(pattern: str) -> bool:
"""Check whether a string should be treated as a regular expression pattern."""
# Check the /pattern/ format.
if pattern.startswith('/') and pattern.endswith('/') and len(pattern) > 2:
return True
# Check the r"pattern" or r'pattern' format.
if pattern.startswith(('r"', "r'")) and pattern.endswith(('"', "'")) and len(pattern) > 3:
return True
# Check whether it contains regex metacharacters.
regex_chars = {'*', '+', '?', '|', '(', ')', '[', ']', '{', '}', '^', '$', '\\', '.'}
return any(char in pattern for char in regex_chars)
def compile_pattern(pattern: str) -> Union[re.Pattern, str, None]:
"""Compile a regex pattern, or return the original string if it is not regex."""
if not is_regex_pattern(pattern):
return pattern
try:
# Handle the /pattern/ format.
if pattern.startswith('/') and pattern.endswith('/'):
regex_body = pattern[1:-1]
return re.compile(regex_body)
# Handle the r"pattern" or r'pattern' format.
if pattern.startswith(('r"', "r'")) and pattern.endswith(('"', "'")):
regex_body = pattern[2:-1]
return re.compile(regex_body)
# Directly compile strings that contain regex metacharacters.
return re.compile(pattern)
except re.error as e:
# If compilation fails, return None to indicate an invalid regex.
print(f"Warning: Regular expression '{pattern}' compilation failed: {e}")
return None
async def handle_mcp_streaming(request_handler):
"""Handle the standard main loop for MCP requests."""
try:
while True:
# Read from stdin
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
if not line:
break
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
response = await request_handler(request)
# Write to stdout
sys.stdout.write(json.dumps(response, ensure_ascii=False) + "\n")
sys.stdout.flush()
except json.JSONDecodeError:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": "Parse error"
}
}
sys.stdout.write(json.dumps(error_response, ensure_ascii=False) + "\n")
sys.stdout.flush()
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
sys.stdout.write(json.dumps(error_response, ensure_ascii=False) + "\n")
sys.stdout.flush()
except KeyboardInterrupt:
pass

View File

@ -0,0 +1,19 @@
{
"name": "mcp-ui",
"description": "Provides interactive UI components through MCP tool responses.",
"hooks": {
"PrePrompt": [
{
"type": "command",
"command": "python hooks/pre_prompt.py"
}
]
},
"mcpServers": {
"mcp_ui": {
"transport": "stdio",
"command": "python",
"args": ["./ui_render_server.py", "{bot_id}"]
}
}
}

View File

@ -0,0 +1,55 @@
## ask_user Tool Usage Guide
### When to call ask_user
You MUST call this tool in these cases:
1. When you need single-select or multi-select choices from the user.
2. When you have multiple questions to ask at once — you MUST batch them into a single ask_user call. Do NOT list multiple questions as plain text in your response.
The ONLY case where you do NOT need this tool is when there is exactly 1 open-ended question with no options to suggest — in that case, just ask directly in your response text.
### CRITICAL: Every question MUST have options
When calling ask_user, you MUST generate at least 2-3 suggested options for EVERY question — even for questions that seem open-ended. You should infer reasonable options based on context. The user can always type a custom answer if none of the suggestions fit.
Example: If the question is "What is the topic of the PPT?", do NOT leave options empty. Instead, suggest options like:
```json
{"question": "What is the topic of the PPT?", "options": ["Work report", "Product introduction", "Academic presentation", "Other"]}
```
Do NOT call ask_user with empty options arrays.
### How to format options
- Options MUST be placed in the `options` array field, NOT embedded in the question text.
- Keep the question text short and clean — just the question itself.
- Each question MUST have at least 2 options in the options array.
CORRECT example:
```json
{
"questions": [
{
"question": "Who is the audience?",
"options": ["Leadership", "Team", "Client"]
},
{
"question": "How long is the presentation?",
"options": ["5-10 minutes", "15-20 minutes", "30+ minutes"]
}
]
}
```
WRONG example (DO NOT do this):
```json
{
"questions": [
{
"question": "Who is the audience? A. Leadership B. Team C. Client",
"options": []
}
]
}
```
### Other rules
- Each question MUST be a separate item in the questions array.
- NEVER combine multiple questions into a single question string.

View File

@ -0,0 +1,18 @@
#!/usr/bin/env python3
"""
PrePrompt Hook - ask_user tool usage guide loader.
Outputs the ask_user usage guide to be injected into the system prompt.
"""
import sys
from pathlib import Path
def main():
guide_file = Path(__file__).parent / "ask_user_guide.md"
print(guide_file.read_text(encoding="utf-8"))
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -0,0 +1,252 @@
#!/usr/bin/env python3
"""
Shared utility functions for the MCP server.
Provides common functionality for path handling, file validation, and request processing.
"""
import json
import os
import sys
import asyncio
from typing import Any, Dict, List, Optional, Union
import re
def get_allowed_directory():
"""Get the directory that is allowed to be accessed."""
# Prefer dataset_dir passed through command-line arguments.
if len(sys.argv) > 1:
dataset_dir = sys.argv[1]
return os.path.abspath(dataset_dir)
# Read the project data directory from the environment variable.
project_dir = os.getenv("PROJECT_DATA_DIR", "./projects/data")
return os.path.abspath(project_dir)
def resolve_file_path(file_path: str, default_subfolder: str = "default") -> str:
"""
Resolve a file path, supporting both folder/document.txt and document.txt formats.
Args:
file_path: Input file path.
default_subfolder: Default subfolder name to use when only a filename is provided.
Returns:
The resolved full file path.
"""
# If the path contains a folder separator, use it directly.
if '/' in file_path or '\\' in file_path:
clean_path = file_path.replace('\\', '/')
# Remove the projects/ prefix if it exists.
if clean_path.startswith('projects/'):
clean_path = clean_path[9:] # Remove the 'projects/' prefix.
elif clean_path.startswith('./projects/'):
clean_path = clean_path[11:] # Remove the './projects/' prefix.
else:
# If only a filename is provided, add the default subfolder.
clean_path = f"{default_subfolder}/{file_path}"
# Get the allowed directory.
project_data_dir = get_allowed_directory()
# Try to locate the file directly under the project directory.
full_path = os.path.join(project_data_dir, clean_path.lstrip('./'))
if os.path.exists(full_path):
return full_path
# If the direct path does not exist, try a recursive search.
found = find_file_in_project(clean_path, project_data_dir)
if found:
return found
# If this is a bare filename and it was not found under the default subfolder,
# try looking in the project root.
if '/' not in file_path and '\\' not in file_path:
root_path = os.path.join(project_data_dir, file_path)
if os.path.exists(root_path):
return root_path
raise FileNotFoundError(f"File not found: {file_path} (searched in {project_data_dir})")
def find_file_in_project(filename: str, project_dir: str) -> Optional[str]:
"""Recursively search for a file inside the project directory."""
# If filename includes a path, only search within the specified path.
if '/' in filename:
parts = filename.split('/')
target_file = parts[-1]
search_dir = os.path.join(project_dir, *parts[:-1])
if os.path.exists(search_dir):
target_path = os.path.join(search_dir, target_file)
if os.path.exists(target_path):
return target_path
else:
# For a bare filename, recursively search the whole project directory.
for root, dirs, files in os.walk(project_dir):
if filename in files:
return os.path.join(root, filename)
return None
def load_tools_from_json(tools_file_name: str) -> List[Dict[str, Any]]:
"""Load tool definitions from a JSON file."""
try:
tools_file = os.path.join(os.path.dirname(__file__), tools_file_name)
if os.path.exists(tools_file):
with open(tools_file, 'r', encoding='utf-8') as f:
return json.load(f)
else:
# If the JSON file does not exist, use the default definitions.
return []
except Exception as e:
print(f"Warning: Unable to load tool definition JSON file: {str(e)}")
return []
def create_error_response(request_id: Any, code: int, message: str) -> Dict[str, Any]:
"""Create a standardized error response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": code,
"message": message
}
}
def create_success_response(request_id: Any, result: Any) -> Dict[str, Any]:
"""Create a standardized success response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
def create_initialize_response(request_id: Any, server_name: str, server_version: str = "1.0.0") -> Dict[str, Any]:
"""Create a standardized initialize response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": server_name,
"version": server_version
}
}
}
def create_ping_response(request_id: Any) -> Dict[str, Any]:
"""Create a standardized ping response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"pong": True
}
}
def create_tools_list_response(request_id: Any, tools: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Create a standardized tools/list response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": tools
}
}
def is_regex_pattern(pattern: str) -> bool:
"""Check whether a string should be treated as a regular expression pattern."""
# Check the /pattern/ format.
if pattern.startswith('/') and pattern.endswith('/') and len(pattern) > 2:
return True
# Check the r"pattern" or r'pattern' format.
if pattern.startswith(('r"', "r'")) and pattern.endswith(('"', "'")) and len(pattern) > 3:
return True
# Check whether it contains regex metacharacters.
regex_chars = {'*', '+', '?', '|', '(', ')', '[', ']', '{', '}', '^', '$', '\\', '.'}
return any(char in pattern for char in regex_chars)
def compile_pattern(pattern: str) -> Union[re.Pattern, str, None]:
"""Compile a regex pattern, or return the original string if it is not regex."""
if not is_regex_pattern(pattern):
return pattern
try:
# Handle the /pattern/ format.
if pattern.startswith('/') and pattern.endswith('/'):
regex_body = pattern[1:-1]
return re.compile(regex_body)
# Handle the r"pattern" or r'pattern' format.
if pattern.startswith(('r"', "r'")) and pattern.endswith(('"', "'")):
regex_body = pattern[2:-1]
return re.compile(regex_body)
# Directly compile strings that contain regex metacharacters.
return re.compile(pattern)
except re.error as e:
# If compilation fails, return None to indicate an invalid regex.
print(f"Warning: Regular expression '{pattern}' compilation failed: {e}")
return None
async def handle_mcp_streaming(request_handler):
"""Handle the standard main loop for MCP requests."""
try:
while True:
# Read from stdin
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
if not line:
break
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
response = await request_handler(request)
# Write to stdout
sys.stdout.write(json.dumps(response, ensure_ascii=False) + "\n")
sys.stdout.flush()
except json.JSONDecodeError:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": "Parse error"
}
}
sys.stdout.write(json.dumps(error_response, ensure_ascii=False) + "\n")
sys.stdout.flush()
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
sys.stdout.write(json.dumps(error_response, ensure_ascii=False) + "\n")
sys.stdout.flush()
except KeyboardInterrupt:
pass

View File

@ -0,0 +1,71 @@
[
{
"name": "render_ui",
"description": "Render an interactive UI widget in the chat. Supports two modes: (1) raw HTML — provide html_content to render custom HTML/CSS/JS, (2) external URL — provide url to embed an external webpage in an iframe. Use html_content OR url, not both.",
"inputSchema": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "A descriptive title for the UI widget"
},
"html_content": {
"type": "string",
"description": "Complete HTML content to render. Can include inline CSS and JavaScript within <style> and <script> tags. Use this OR url, not both."
},
"url": {
"type": "string",
"description": "External URL to embed in an iframe. Use this OR html_content, not both."
},
"width": {
"type": "string",
"description": "CSS width for the iframe. Default: '100%'",
"default": "100%"
},
"height": {
"type": "string",
"description": "CSS height for the iframe. Set to a fixed value (e.g. '300px', '600px') matching the HTML content height. Use 'auto' only when the HTML is responsive and has no fixed height. Default: 'auto'",
"default": "auto"
}
},
"required": ["title", "html_content"]
}
},
{
"name": "ask_user",
"description": "Present questions with selectable options to the user. CRITICAL: Every question MUST have at least 2 options in the options array — generate reasonable suggestions based on context. Do NOT call this tool with empty options arrays. See the ask_user usage guide in system prompt for detailed rules.",
"inputSchema": {
"type": "object",
"properties": {
"questions": {
"type": "array",
"description": "Array of questions to ask the user. Each question is an object with its own question text, options, and multi_select setting.",
"items": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question to ask the user"
},
"options": {
"type": "array",
"minItems": 2,
"items": {
"type": "string"
},
"description": "REQUIRED array with at least 2 options. Put choices here, NOT in the question text."
},
"multi_select": {
"type": "boolean",
"description": "If true, the user can select multiple options for this question. Default: false.",
"default": false
}
},
"required": ["question", "options"]
}
}
},
"required": ["questions"]
}
}
]

View File

@ -0,0 +1,175 @@
#!/usr/bin/env python3
"""
MCP UI Server - provides interactive UI rendering tools.
"""
import asyncio
import json
import sys
from typing import Any, Dict
from mcp_ui_server import create_ui_resource, UIMetadataKey
from mcp_common import (
create_error_response,
create_initialize_response,
create_ping_response,
create_tools_list_response,
load_tools_from_json,
handle_mcp_streaming,
)
def _serialize_ui_resource(ui_resource) -> str:
"""Serialize a UIResource to JSON string."""
return json.dumps(ui_resource.model_dump(mode="json"), ensure_ascii=False)
def ask_user() -> Dict[str, Any]:
"""Return a UIResource response for ask_user tool.
The actual questions are in the TOOL_CALL arguments. The frontend
detects ui_type from the UIResource metadata and extracts content
from the corresponding TOOL_CALL args.
"""
resource = create_ui_resource({
"uri": "ui://mcp-ui/ask-user",
"content": {"type": "rawHtml", "htmlString": "Questions sent to user."},
"encoding": "text",
"uiMetadata": {
"type": "ask_user",
"interactive": True,
},
})
return {"content": [{"type": "text", "text": _serialize_ui_resource(resource)}]}
def render_ui(arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Return a UIResource response for render_ui tool.
The actual html_content/url is in the TOOL_CALL arguments. The frontend
detects ui_type from the UIResource metadata and extracts content
from the corresponding TOOL_CALL args.
"""
html_content = arguments.get("html_content", "")
url = arguments.get("url", "")
width = arguments.get("width", "100%")
height = arguments.get("height", "auto")
if html_content:
resource = create_ui_resource({
"uri": "ui://mcp-ui/render-ui",
"content": {"type": "rawHtml", "htmlString": "UI rendered."},
"encoding": "text",
"uiMetadata": {
UIMetadataKey.PREFERRED_FRAME_SIZE: [width, height],
"type": "render_ui_html",
"interactive": False,
},
})
else:
resource = create_ui_resource({
"uri": "ui://mcp-ui/render-ui",
"content": {"type": "externalUrl", "iframeUrl": url},
"encoding": "text",
"uiMetadata": {
UIMetadataKey.PREFERRED_FRAME_SIZE: [width, height],
"type": "render_ui_url",
"interactive": False,
},
})
return {"content": [{"type": "text", "text": _serialize_ui_resource(resource)}]}
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle an MCP request."""
try:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "initialize":
return create_initialize_response(request_id, "mcp-ui")
elif method == "ping":
return create_ping_response(request_id)
elif method == "tools/list":
tools = load_tools_from_json("mcp_ui_tools.json")
if not tools:
tools = [
{
"name": "render_ui",
"description": "Render an interactive UI widget in the chat.",
"inputSchema": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "A descriptive title for the UI widget",
},
"html_content": {
"type": "string",
"description": "Complete HTML content to render. Use this OR url.",
},
"url": {
"type": "string",
"description": "External URL to embed. Use this OR html_content.",
},
},
"required": ["title"],
},
}
]
return create_tools_list_response(request_id, tools)
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "render_ui":
html_content = arguments.get("html_content", "")
url = arguments.get("url", "")
if not html_content and not url:
return create_error_response(
request_id, -32602, "Missing required parameter: html_content or url"
)
result = render_ui(arguments)
return {"jsonrpc": "2.0", "id": request_id, "result": result}
elif tool_name == "ask_user":
questions = arguments.get("questions", [])
if not questions:
return create_error_response(
request_id, -32602, "Missing required parameter: questions"
)
result = ask_user()
return {"jsonrpc": "2.0", "id": request_id, "result": result}
else:
return create_error_response(
request_id, -32601, f"Unknown tool: {tool_name}"
)
else:
return create_error_response(
request_id, -32601, f"Unknown method: {method}"
)
except Exception as e:
return create_error_response(
request.get("id"), -32603, f"Internal error: {str(e)}"
)
async def main():
"""Main entry point."""
await handle_mcp_streaming(handle_request)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,79 @@
---
name: static-hosting
description: Serve static HTML/CSS/JS/images from robot project directories via the built-in FastAPI static file server. Use when generating web pages, reports, or interactive content for a bot.
---
# Static Hosting
Provide external access to static files (HTML, CSS, JS, images, fonts, etc.) stored under each robot's project directory.
## Use when
- Generate HTML pages, reports, or dashboards for a bot and need a URL to access them
- Create interactive web content (with CSS/JS) that users can open in a browser
- Need to host images or other assets referenced by bot-generated HTML
## How it works
FastAPI mounts `projects/robot/` at the `/robots` URL path with `html=True` enabled.
```
/app/projects/robot/{bot_id}/ --> {FASTAPI_URL}/robots/{bot_id}/
/app/projects/robot/{bot_id}/foo.html --> {FASTAPI_URL}/robots/{bot_id}/foo.html
```
`FASTAPI_URL` : `http://127.0.0.1:8001`.
## URL rules
| Local path | External URL | Notes |
|---|---|---|
| `/app/projects/robot/{bot_id}/index.html` | `/robots/{bot_id}/` | `html=True` auto-resolves `index.html` |
| `/app/projects/robot/{bot_id}/page.html` | `/robots/{bot_id}/page.html` | Direct HTML access |
| `/app/projects/robot/{bot_id}/css/style.css` | `/robots/{bot_id}/css/style.css` | CSS files |
| `/app/projects/robot/{bot_id}/js/app.js` | `/robots/{bot_id}/js/app.js` | JavaScript files |
| `/app/projects/robot/{bot_id}/images/logo.png` | `/robots/{bot_id}/images/logo.png` | Images (png/jpg/svg/etc.) |
| `/app/projects/robot/{bot_id}/fonts/custom.woff2` | `/robots/{bot_id}/fonts/custom.woff2` | Font files |
## Recommended directory structure
```
/app/projects/robot/{bot_id}/
├── index.html # Entry page (auto-served at /robots/{bot_id}/)
├── css/
│ └── style.css
├── js/
│ └── app.js
├── images/
│ └── logo.png
└── fonts/
└── custom.woff2
```
## HTML referencing rules
Inside HTML files, use **relative paths** to reference local assets:
```html
<!-- In projects/robot/{bot_id}/index.html -->
<link rel="stylesheet" href="css/style.css">
<script src="js/app.js"></script>
<img src="images/logo.png" alt="logo">
```
Do NOT use absolute paths like `/css/style.css` — this would resolve to the server root, not the bot directory.
## Quick example
To generate and serve a simple page for bot `63069654-7750-409d-9a58-a0960d899a20`:
1. Write HTML to `/app/projects/robot/63069654-7750-409d-9a58-a0960d899a20/index.html`
2. Access at `{FASTAPI_URL}/robots/63069654-7750-409d-9a58-a0960d899a20/`
## Notes
- Content-Type is auto-detected from file extension
- CORS is already configured (allow all origins) so frontend JS can fetch APIs on the same server
- Subdirectories of any depth are supported
- No authentication — all files under `/robots/` are publicly accessible
- The mount is defined in `fastapi_app.py`