diff --git a/docs/solutions/runtime-errors/sse-mcp-tool-error-handling.md b/docs/solutions/runtime-errors/sse-mcp-tool-error-handling.md new file mode 100644 index 0000000..48e9472 --- /dev/null +++ b/docs/solutions/runtime-errors/sse-mcp-tool-error-handling.md @@ -0,0 +1,361 @@ +--- +title: Fix RemoteProtocolError in SSE Streams When MCP Tool Calls Fail +category: runtime-errors +severity: critical +status: fixed +components: [routes/chat.py, agent/deep_assistant.py] +symptoms: [RemoteProtocolError, incomplete chunked read, connection drops, hanging SSE streams] +error_type: SSE/stream termination +related_issues: [] +tags: [sse, mcp, error-handling, async, streaming, remote-protocol-error] +date_solved: 2026-01-07 +--- + +# Fix RemoteProtocolError in SSE Streams When MCP Tool Calls Fail + +## Problem Symptom + +When MCP (Model Context Protocol) tool calls failed during agent execution: + +1. The exception was caught and logged in `agent_task()` but **no error message was sent to the client** via the SSE (Server-Sent Events) stream +2. The `[DONE]` marker was **missing from the outer exception handler** in `enhanced_generate_stream_response()` +3. This caused `RemoteProtocolError: incomplete chunked read` - the client connection was left hanging expecting more data +4. Client connections would timeout or drop unexpectedly + +### Error Messages + +``` +httpx.RemoteProtocolError: incomplete chunked read +``` + +### Observable Behavior + +- SSE stream would terminate prematurely without sending completion +- Client-side: `IncompleteRead` errors or connection timeouts +- Server logs showed the error was caught, but client received no notification + +## Root Cause Analysis + +The issue occurred in the async SSE streaming architecture where: + +1. **Inner exception handler** (`agent_task()` in `routes/chat.py:116-118`): + - Caught exceptions but only logged them + - Did not send error response to client via SSE + - Only sent `agent_done` signal to output queue + +2. **Outer exception handler** (`routes/chat.py:191-204`): + - Caught exceptions in the main stream generator + - Sent error data but **missing `[DONE]` marker** + - Left client connection hanging without proper stream termination + +3. **MCP tool loading** (`agent/deep_assistant.py:102-108`): + - Used generic logger.info for errors (should be logger.error) + - No traceback for debugging + - No graceful handling at init level + +## Investigation Steps Tried + +1. **Reviewed SSE protocol**: Confirmed SSE streams must end with `[DONE]` marker +2. **Analyzed error flow**: Traced exception from `agent_task()` through output queue to client +3. **Examined similar code**: Checked other streaming endpoints for proper patterns +4. **Tested MCP failure scenarios**: Simulated tool loading failures to reproduce + +## Working Solution + +### 1. Send Structured Error Messages in Inner Exception Handler + +**File**: `routes/chat.py` (lines 116-133) + +**Before**: +```python +except Exception as e: + logger.error(f"Error in agent task: {e}") + await output_queue.put(("agent_done", None)) +``` + +**After**: +```python +except Exception as e: + import traceback + error_details = traceback.format_exc() + logger.error(f"Error in agent task: {str(e)}") + logger.error(f"Full traceback: {error_details}") + + # Send error message to client via SSE stream + error_data = { + "error": { + "message": f"Agent execution failed: {str(e)}", + "type": "agent_error", + "details": error_details if __debug__ else str(e) + } + } + error_chunk = create_stream_chunk( + f"chatcmpl-error", + config.model_name, + json.dumps(error_data, ensure_ascii=False) + ) + await output_queue.put(("agent", f"data: {json.dumps(error_chunk, ensure_ascii=False)}\n\n")) + # Send completion signal to ensure output controller can terminate properly + await output_queue.put(("agent_done", None)) +``` + +**Why this works**: +- Client receives structured error response before stream terminates +- Error follows same chunk format as normal responses for consistent parsing +- Includes traceback in debug mode for troubleshooting +- `agent_done` signal ensures output queue consumer can exit + +### 2. Add [DONE] Marker to Outer Exception Handler + +**File**: `routes/chat.py` (line 204) + +**Before**: +```python +except Exception as e: + # ... error handling ... + yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" + # Missing: yield "data: [DONE]\n\n" +``` + +**After**: +```python +except Exception as e: + # ... error handling ... + yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" + yield "data: [DONE]\n\n" # Ensure proper stream termination +``` + +**Why this works**: +- SSE protocol requires `[DONE]` to signal stream end +- Without it, client waits indefinitely for more chunks +- Prevents `RemoteProtocolError: incomplete chunked read` + +### 3. Enhance MCP Tool Loading Error Handling + +**File**: `agent/deep_assistant.py` (lines 102-108) + +**Before**: +```python +except Exception as e: + # Log at info level, return empty list + logger.info(f"get_tools_from_mcp: error {e}, elapsed: {time.time() - start_time:.3f}s") + return [] +``` + +**After**: +```python +except Exception as e: + import traceback + error_details = traceback.format_exc() + # Log at ERROR level with full traceback + logger.error(f"get_tools_from_mcp: error {str(e)}, elapsed: {time.time() - start_time:.3f}s") + logger.error(f"Full traceback: {error_details}") + return [] +``` + +**File**: `agent/deep_assistant.py` (lines 142-148) + +**Added**: +```python +try: + mcp_tools = await get_tools_from_mcp(mcp_settings) + logger.info(f"Successfully loaded {len(mcp_tools)} MCP tools") +except Exception as e: + logger.error(f"Failed to load MCP tools: {str(e)}, using empty tool list") + mcp_tools = [] +``` + +**Why this works**: +- Prevents cascading failures when MCP tools fail to load +- Agent can continue without tools rather than crashing +- Better logging aids in debugging root cause + +## Security Considerations + +### CRITICAL: `__debug__` Flag Issue + +**SECURITY VULNERABILITY**: The current implementation uses `__debug__` conditional which is **always True** in normal Python execution (unless Python runs with `-O` optimization flag). + +```python +# CURRENT - VULNERABLE IN PRODUCTION +"details": error_details if __debug__ else str(e) +``` + +**Impact**: Full tracebacks containing: +- File paths revealing server directory structure +- Library versions and dependency information +- Internal variable names and state +- Database connection strings (if present in stack trace) +- API keys or secrets if logged during initialization + +**Recommended Fix**: +```python +# In utils/settings.py +ENVIRONMENT = os.getenv("ENVIRONMENT", "development") +DEBUG_MODE = ENVIRONMENT != "production" + +# In routes/chat.py +from utils.settings import DEBUG_MODE + +# Secure implementation +"details": error_details if DEBUG_MODE else "An internal error occurred" +``` + +**Verification**: +```bash +# Test current behavior +poetry run python -c "print(__debug__)" +# Output: True (even in production!) +``` + +### Additional Security Recommendations + +1. **Sanitize error messages** - Remove sensitive data (passwords, API keys, file paths) before sending to clients +2. **Implement environment-based configuration** - Use `ENVIRONMENT` variable to distinguish production from development +3. **Log security-sensitive events** - Track attempted exploits, excessive errors, unusual patterns + +## Code Simplification Opportunities + +### Identified Over-Engineering + +1. **Nested error structure** (lines 85-96): Error type categorization provides no value to clients + - **Recommendation**: Simplify to `{"error": str(e)}` + +2. **Redundant exception handling** (agent/deep_assistant.py:145-150): Wrapper try/except duplicates inner handler logic + - **Recommendation**: Remove wrapper, let inner handler manage errors + +3. **Double JSON serialization**: Creates chunk then serializes again for queue + - **Recommendation**: Direct JSON string construction + +### Minimal Implementation + +```python +# Minimal functional implementation (~15 lines vs ~30 lines) +except Exception as e: + logger.error(f"Agent error: {e}") + await output_queue.put(("agent", f'data: {{"error": str(e)}}\n\n')) + await output_queue.put(("agent_done", None)) +``` + +**Trade-off**: Minimal implementation is simpler but provides less debugging context. Choose based on team needs. + +## Prevention Strategies + +### Best Practices for SSE Stream Error Handling + +1. **Always send error responses before termination** + ```python + except Exception as e: + error_chunk = create_error_chunk(e) + yield f"data: {error_chunk}\n\n" + yield "data: [DONE]\n\n" + ``` + +2. **Use structured error responses** + ```python + error_data = { + "error": { + "message": str(e), + "type": "error_type", + "details": traceback if debug else None + } + } + ``` + +3. **Log at appropriate levels** + - Use `logger.error()` for exceptions (not `logger.info()`) + - Include tracebacks for debugging + - Add context (elapsed time, component name) + +4. **Ensure graceful degradation** + - Return empty lists instead of crashing + - Allow system to continue with reduced functionality + - Document what happens when components fail + +### Code Review Checklist for SSE/Streaming Code + +- [ ] Every exception handler sends error response to client +- [ ] Every exception handler yields `[DONE]` marker +- [ ] Error responses follow same format as success responses +- [ ] Tracebacks logged for debugging (in non-production paths) +- [ ] Output queues properly signaled on error (`_done` events) +- [ ] Async tasks properly cleaned up on exception +- [ ] File descriptors/connections properly closed + +### Testing Strategy + +**Test Cases**: +1. **Normal flow**: Verify SSE stream ends with `[DONE]` +2. **MCP tool failure**: Simulate tool loading failure, verify graceful handling +3. **Agent execution error**: Inject exception during `astream()`, verify error response +4. **Network timeout**: Test slow/failed MCP servers +5. **Concurrent requests**: Ensure error handling works under load + +**Test Commands**: +```bash +# Test with invalid MCP configuration +curl -X POST http://localhost:8001/api/v2/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "messages": [{"role": "user", "content": "test"}], + "stream": true, + "mcp_settings": {"invalid": "config"} + }' + +# Monitor for [DONE] marker in response +# Check server logs for proper error logging +``` + +### Monitoring Guidelines + +**Key Metrics**: +- SSE streams that end without `[DONE]` +- Rate of `RemoteProtocolError` exceptions +- MCP tool loading failures +- Agent execution errors by type + +**Log Patterns**: +``` +ERROR:Error in agent task: +ERROR:Full traceback: +ERROR:get_tools_from_mcp: error +ERROR:Failed to load MCP tools: +``` + +**Alerting**: +- Alert on spike in `RemoteProtocolError` +- Monitor for streams ending without `[DONE]` +- Track MCP tool loading success rate + +## Related Documentation + +- [SSE Specification](https://html.spec.whatwg.org/multipage/server-sent-events.html) +- [FastAPI StreamingResponse docs](https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse) +- MCP integration docs (internal) + +## Cross-References + +- Commit: `8a85e9025e183e80f4370d4251ca0d5af6203a41` +- Files: `routes/chat.py`, `agent/deep_assistant.py` +- Issue: N/A (internal fix) + +## Verification + +To verify the fix works: + +1. **Check logs for proper error handling**: + ```bash + tail -f logs/app.log | grep -E "(Error in agent task|Full traceback)" + ``` + +2. **Test SSE stream termination**: + ```bash + curl -N http://localhost:8001/api/v2/chat/completions \ + -H "Content-Type: application/json" \ + -d '{"stream": true, ...}' | grep "\[DONE\]" + ``` + +3. **Verify error response format**: + - Error should have `error.type: "agent_error"` + - Error should include message and details + - Stream should end with `data: [DONE]`