docs(solutions): add SSE error handling solution documentation
Add comprehensive documentation for the SSE stream termination fix: - Problem analysis and root cause - Step-by-step solution with code examples - Security considerations (__debug__ vulnerability) - Code simplification recommendations - Prevention strategies and best practices - Testing and monitoring guidelines Location: docs/solutions/runtime-errors/sse-mcp-tool-error-handling.md 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
b8e57b2f51
commit
ac8782e1a7
361
docs/solutions/runtime-errors/sse-mcp-tool-error-handling.md
Normal file
361
docs/solutions/runtime-errors/sse-mcp-tool-error-handling.md
Normal file
@ -0,0 +1,361 @@
|
|||||||
|
---
|
||||||
|
title: Fix RemoteProtocolError in SSE Streams When MCP Tool Calls Fail
|
||||||
|
category: runtime-errors
|
||||||
|
severity: critical
|
||||||
|
status: fixed
|
||||||
|
components: [routes/chat.py, agent/deep_assistant.py]
|
||||||
|
symptoms: [RemoteProtocolError, incomplete chunked read, connection drops, hanging SSE streams]
|
||||||
|
error_type: SSE/stream termination
|
||||||
|
related_issues: []
|
||||||
|
tags: [sse, mcp, error-handling, async, streaming, remote-protocol-error]
|
||||||
|
date_solved: 2026-01-07
|
||||||
|
---
|
||||||
|
|
||||||
|
# Fix RemoteProtocolError in SSE Streams When MCP Tool Calls Fail
|
||||||
|
|
||||||
|
## Problem Symptom
|
||||||
|
|
||||||
|
When MCP (Model Context Protocol) tool calls failed during agent execution:
|
||||||
|
|
||||||
|
1. The exception was caught and logged in `agent_task()` but **no error message was sent to the client** via the SSE (Server-Sent Events) stream
|
||||||
|
2. The `[DONE]` marker was **missing from the outer exception handler** in `enhanced_generate_stream_response()`
|
||||||
|
3. This caused `RemoteProtocolError: incomplete chunked read` - the client connection was left hanging expecting more data
|
||||||
|
4. Client connections would timeout or drop unexpectedly
|
||||||
|
|
||||||
|
### Error Messages
|
||||||
|
|
||||||
|
```
|
||||||
|
httpx.RemoteProtocolError: incomplete chunked read
|
||||||
|
```
|
||||||
|
|
||||||
|
### Observable Behavior
|
||||||
|
|
||||||
|
- SSE stream would terminate prematurely without sending completion
|
||||||
|
- Client-side: `IncompleteRead` errors or connection timeouts
|
||||||
|
- Server logs showed the error was caught, but client received no notification
|
||||||
|
|
||||||
|
## Root Cause Analysis
|
||||||
|
|
||||||
|
The issue occurred in the async SSE streaming architecture where:
|
||||||
|
|
||||||
|
1. **Inner exception handler** (`agent_task()` in `routes/chat.py:116-118`):
|
||||||
|
- Caught exceptions but only logged them
|
||||||
|
- Did not send error response to client via SSE
|
||||||
|
- Only sent `agent_done` signal to output queue
|
||||||
|
|
||||||
|
2. **Outer exception handler** (`routes/chat.py:191-204`):
|
||||||
|
- Caught exceptions in the main stream generator
|
||||||
|
- Sent error data but **missing `[DONE]` marker**
|
||||||
|
- Left client connection hanging without proper stream termination
|
||||||
|
|
||||||
|
3. **MCP tool loading** (`agent/deep_assistant.py:102-108`):
|
||||||
|
- Used generic logger.info for errors (should be logger.error)
|
||||||
|
- No traceback for debugging
|
||||||
|
- No graceful handling at init level
|
||||||
|
|
||||||
|
## Investigation Steps Tried
|
||||||
|
|
||||||
|
1. **Reviewed SSE protocol**: Confirmed SSE streams must end with `[DONE]` marker
|
||||||
|
2. **Analyzed error flow**: Traced exception from `agent_task()` through output queue to client
|
||||||
|
3. **Examined similar code**: Checked other streaming endpoints for proper patterns
|
||||||
|
4. **Tested MCP failure scenarios**: Simulated tool loading failures to reproduce
|
||||||
|
|
||||||
|
## Working Solution
|
||||||
|
|
||||||
|
### 1. Send Structured Error Messages in Inner Exception Handler
|
||||||
|
|
||||||
|
**File**: `routes/chat.py` (lines 116-133)
|
||||||
|
|
||||||
|
**Before**:
|
||||||
|
```python
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in agent task: {e}")
|
||||||
|
await output_queue.put(("agent_done", None))
|
||||||
|
```
|
||||||
|
|
||||||
|
**After**:
|
||||||
|
```python
|
||||||
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
|
logger.error(f"Error in agent task: {str(e)}")
|
||||||
|
logger.error(f"Full traceback: {error_details}")
|
||||||
|
|
||||||
|
# Send error message to client via SSE stream
|
||||||
|
error_data = {
|
||||||
|
"error": {
|
||||||
|
"message": f"Agent execution failed: {str(e)}",
|
||||||
|
"type": "agent_error",
|
||||||
|
"details": error_details if __debug__ else str(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
error_chunk = create_stream_chunk(
|
||||||
|
f"chatcmpl-error",
|
||||||
|
config.model_name,
|
||||||
|
json.dumps(error_data, ensure_ascii=False)
|
||||||
|
)
|
||||||
|
await output_queue.put(("agent", f"data: {json.dumps(error_chunk, ensure_ascii=False)}\n\n"))
|
||||||
|
# Send completion signal to ensure output controller can terminate properly
|
||||||
|
await output_queue.put(("agent_done", None))
|
||||||
|
```
|
||||||
|
|
||||||
|
**Why this works**:
|
||||||
|
- Client receives structured error response before stream terminates
|
||||||
|
- Error follows same chunk format as normal responses for consistent parsing
|
||||||
|
- Includes traceback in debug mode for troubleshooting
|
||||||
|
- `agent_done` signal ensures output queue consumer can exit
|
||||||
|
|
||||||
|
### 2. Add [DONE] Marker to Outer Exception Handler
|
||||||
|
|
||||||
|
**File**: `routes/chat.py` (line 204)
|
||||||
|
|
||||||
|
**Before**:
|
||||||
|
```python
|
||||||
|
except Exception as e:
|
||||||
|
# ... error handling ...
|
||||||
|
yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"
|
||||||
|
# Missing: yield "data: [DONE]\n\n"
|
||||||
|
```
|
||||||
|
|
||||||
|
**After**:
|
||||||
|
```python
|
||||||
|
except Exception as e:
|
||||||
|
# ... error handling ...
|
||||||
|
yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"
|
||||||
|
yield "data: [DONE]\n\n" # Ensure proper stream termination
|
||||||
|
```
|
||||||
|
|
||||||
|
**Why this works**:
|
||||||
|
- SSE protocol requires `[DONE]` to signal stream end
|
||||||
|
- Without it, client waits indefinitely for more chunks
|
||||||
|
- Prevents `RemoteProtocolError: incomplete chunked read`
|
||||||
|
|
||||||
|
### 3. Enhance MCP Tool Loading Error Handling
|
||||||
|
|
||||||
|
**File**: `agent/deep_assistant.py` (lines 102-108)
|
||||||
|
|
||||||
|
**Before**:
|
||||||
|
```python
|
||||||
|
except Exception as e:
|
||||||
|
# Log at info level, return empty list
|
||||||
|
logger.info(f"get_tools_from_mcp: error {e}, elapsed: {time.time() - start_time:.3f}s")
|
||||||
|
return []
|
||||||
|
```
|
||||||
|
|
||||||
|
**After**:
|
||||||
|
```python
|
||||||
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
|
# Log at ERROR level with full traceback
|
||||||
|
logger.error(f"get_tools_from_mcp: error {str(e)}, elapsed: {time.time() - start_time:.3f}s")
|
||||||
|
logger.error(f"Full traceback: {error_details}")
|
||||||
|
return []
|
||||||
|
```
|
||||||
|
|
||||||
|
**File**: `agent/deep_assistant.py` (lines 142-148)
|
||||||
|
|
||||||
|
**Added**:
|
||||||
|
```python
|
||||||
|
try:
|
||||||
|
mcp_tools = await get_tools_from_mcp(mcp_settings)
|
||||||
|
logger.info(f"Successfully loaded {len(mcp_tools)} MCP tools")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to load MCP tools: {str(e)}, using empty tool list")
|
||||||
|
mcp_tools = []
|
||||||
|
```
|
||||||
|
|
||||||
|
**Why this works**:
|
||||||
|
- Prevents cascading failures when MCP tools fail to load
|
||||||
|
- Agent can continue without tools rather than crashing
|
||||||
|
- Better logging aids in debugging root cause
|
||||||
|
|
||||||
|
## Security Considerations
|
||||||
|
|
||||||
|
### CRITICAL: `__debug__` Flag Issue
|
||||||
|
|
||||||
|
**SECURITY VULNERABILITY**: The current implementation uses `__debug__` conditional which is **always True** in normal Python execution (unless Python runs with `-O` optimization flag).
|
||||||
|
|
||||||
|
```python
|
||||||
|
# CURRENT - VULNERABLE IN PRODUCTION
|
||||||
|
"details": error_details if __debug__ else str(e)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Impact**: Full tracebacks containing:
|
||||||
|
- File paths revealing server directory structure
|
||||||
|
- Library versions and dependency information
|
||||||
|
- Internal variable names and state
|
||||||
|
- Database connection strings (if present in stack trace)
|
||||||
|
- API keys or secrets if logged during initialization
|
||||||
|
|
||||||
|
**Recommended Fix**:
|
||||||
|
```python
|
||||||
|
# In utils/settings.py
|
||||||
|
ENVIRONMENT = os.getenv("ENVIRONMENT", "development")
|
||||||
|
DEBUG_MODE = ENVIRONMENT != "production"
|
||||||
|
|
||||||
|
# In routes/chat.py
|
||||||
|
from utils.settings import DEBUG_MODE
|
||||||
|
|
||||||
|
# Secure implementation
|
||||||
|
"details": error_details if DEBUG_MODE else "An internal error occurred"
|
||||||
|
```
|
||||||
|
|
||||||
|
**Verification**:
|
||||||
|
```bash
|
||||||
|
# Test current behavior
|
||||||
|
poetry run python -c "print(__debug__)"
|
||||||
|
# Output: True (even in production!)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Additional Security Recommendations
|
||||||
|
|
||||||
|
1. **Sanitize error messages** - Remove sensitive data (passwords, API keys, file paths) before sending to clients
|
||||||
|
2. **Implement environment-based configuration** - Use `ENVIRONMENT` variable to distinguish production from development
|
||||||
|
3. **Log security-sensitive events** - Track attempted exploits, excessive errors, unusual patterns
|
||||||
|
|
||||||
|
## Code Simplification Opportunities
|
||||||
|
|
||||||
|
### Identified Over-Engineering
|
||||||
|
|
||||||
|
1. **Nested error structure** (lines 85-96): Error type categorization provides no value to clients
|
||||||
|
- **Recommendation**: Simplify to `{"error": str(e)}`
|
||||||
|
|
||||||
|
2. **Redundant exception handling** (agent/deep_assistant.py:145-150): Wrapper try/except duplicates inner handler logic
|
||||||
|
- **Recommendation**: Remove wrapper, let inner handler manage errors
|
||||||
|
|
||||||
|
3. **Double JSON serialization**: Creates chunk then serializes again for queue
|
||||||
|
- **Recommendation**: Direct JSON string construction
|
||||||
|
|
||||||
|
### Minimal Implementation
|
||||||
|
|
||||||
|
```python
|
||||||
|
# Minimal functional implementation (~15 lines vs ~30 lines)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Agent error: {e}")
|
||||||
|
await output_queue.put(("agent", f'data: {{"error": str(e)}}\n\n'))
|
||||||
|
await output_queue.put(("agent_done", None))
|
||||||
|
```
|
||||||
|
|
||||||
|
**Trade-off**: Minimal implementation is simpler but provides less debugging context. Choose based on team needs.
|
||||||
|
|
||||||
|
## Prevention Strategies
|
||||||
|
|
||||||
|
### Best Practices for SSE Stream Error Handling
|
||||||
|
|
||||||
|
1. **Always send error responses before termination**
|
||||||
|
```python
|
||||||
|
except Exception as e:
|
||||||
|
error_chunk = create_error_chunk(e)
|
||||||
|
yield f"data: {error_chunk}\n\n"
|
||||||
|
yield "data: [DONE]\n\n"
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Use structured error responses**
|
||||||
|
```python
|
||||||
|
error_data = {
|
||||||
|
"error": {
|
||||||
|
"message": str(e),
|
||||||
|
"type": "error_type",
|
||||||
|
"details": traceback if debug else None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **Log at appropriate levels**
|
||||||
|
- Use `logger.error()` for exceptions (not `logger.info()`)
|
||||||
|
- Include tracebacks for debugging
|
||||||
|
- Add context (elapsed time, component name)
|
||||||
|
|
||||||
|
4. **Ensure graceful degradation**
|
||||||
|
- Return empty lists instead of crashing
|
||||||
|
- Allow system to continue with reduced functionality
|
||||||
|
- Document what happens when components fail
|
||||||
|
|
||||||
|
### Code Review Checklist for SSE/Streaming Code
|
||||||
|
|
||||||
|
- [ ] Every exception handler sends error response to client
|
||||||
|
- [ ] Every exception handler yields `[DONE]` marker
|
||||||
|
- [ ] Error responses follow same format as success responses
|
||||||
|
- [ ] Tracebacks logged for debugging (in non-production paths)
|
||||||
|
- [ ] Output queues properly signaled on error (`_done` events)
|
||||||
|
- [ ] Async tasks properly cleaned up on exception
|
||||||
|
- [ ] File descriptors/connections properly closed
|
||||||
|
|
||||||
|
### Testing Strategy
|
||||||
|
|
||||||
|
**Test Cases**:
|
||||||
|
1. **Normal flow**: Verify SSE stream ends with `[DONE]`
|
||||||
|
2. **MCP tool failure**: Simulate tool loading failure, verify graceful handling
|
||||||
|
3. **Agent execution error**: Inject exception during `astream()`, verify error response
|
||||||
|
4. **Network timeout**: Test slow/failed MCP servers
|
||||||
|
5. **Concurrent requests**: Ensure error handling works under load
|
||||||
|
|
||||||
|
**Test Commands**:
|
||||||
|
```bash
|
||||||
|
# Test with invalid MCP configuration
|
||||||
|
curl -X POST http://localhost:8001/api/v2/chat/completions \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d '{
|
||||||
|
"messages": [{"role": "user", "content": "test"}],
|
||||||
|
"stream": true,
|
||||||
|
"mcp_settings": {"invalid": "config"}
|
||||||
|
}'
|
||||||
|
|
||||||
|
# Monitor for [DONE] marker in response
|
||||||
|
# Check server logs for proper error logging
|
||||||
|
```
|
||||||
|
|
||||||
|
### Monitoring Guidelines
|
||||||
|
|
||||||
|
**Key Metrics**:
|
||||||
|
- SSE streams that end without `[DONE]`
|
||||||
|
- Rate of `RemoteProtocolError` exceptions
|
||||||
|
- MCP tool loading failures
|
||||||
|
- Agent execution errors by type
|
||||||
|
|
||||||
|
**Log Patterns**:
|
||||||
|
```
|
||||||
|
ERROR:Error in agent task: <error details>
|
||||||
|
ERROR:Full traceback: <traceback>
|
||||||
|
ERROR:get_tools_from_mcp: error <error>
|
||||||
|
ERROR:Failed to load MCP tools: <error>
|
||||||
|
```
|
||||||
|
|
||||||
|
**Alerting**:
|
||||||
|
- Alert on spike in `RemoteProtocolError`
|
||||||
|
- Monitor for streams ending without `[DONE]`
|
||||||
|
- Track MCP tool loading success rate
|
||||||
|
|
||||||
|
## Related Documentation
|
||||||
|
|
||||||
|
- [SSE Specification](https://html.spec.whatwg.org/multipage/server-sent-events.html)
|
||||||
|
- [FastAPI StreamingResponse docs](https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse)
|
||||||
|
- MCP integration docs (internal)
|
||||||
|
|
||||||
|
## Cross-References
|
||||||
|
|
||||||
|
- Commit: `8a85e9025e183e80f4370d4251ca0d5af6203a41`
|
||||||
|
- Files: `routes/chat.py`, `agent/deep_assistant.py`
|
||||||
|
- Issue: N/A (internal fix)
|
||||||
|
|
||||||
|
## Verification
|
||||||
|
|
||||||
|
To verify the fix works:
|
||||||
|
|
||||||
|
1. **Check logs for proper error handling**:
|
||||||
|
```bash
|
||||||
|
tail -f logs/app.log | grep -E "(Error in agent task|Full traceback)"
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Test SSE stream termination**:
|
||||||
|
```bash
|
||||||
|
curl -N http://localhost:8001/api/v2/chat/completions \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d '{"stream": true, ...}' | grep "\[DONE\]"
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **Verify error response format**:
|
||||||
|
- Error should have `error.type: "agent_error"`
|
||||||
|
- Error should include message and details
|
||||||
|
- Stream should end with `data: [DONE]`
|
||||||
Loading…
Reference in New Issue
Block a user