Normalize OpenAI-style and LangChain standard image blocks into LangChain standard content blocks so provider block_translators auto-convert for either OpenAI or Anthropic. Flatten multimodal content to plain text when persisting history and computing term embeddings. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
98 lines
4.0 KiB
Python
98 lines
4.0 KiB
Python
"""Utility functions for handling LangGraph checkpoint data."""
|
|
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
from langgraph.checkpoint.memory import MemorySaver
|
|
|
|
logger = logging.getLogger('app')
|
|
|
|
|
|
async def get_checkpoint_history(checkpointer: MemorySaver, thread_id: str) -> List:
|
|
"""
|
|
Get the chat history for the specified thread_id from the checkpointer.
|
|
|
|
Args:
|
|
checkpointer: MemorySaver instance
|
|
thread_id: Thread ID, usually the session_id
|
|
|
|
Returns:
|
|
List[Dict]: Historical messages, or an empty list if no history exists or an error occurs
|
|
"""
|
|
if not checkpointer or not thread_id:
|
|
logger.debug(f"No checkpointer or thread_id: checkpointer={bool(checkpointer)}, thread_id={thread_id}")
|
|
return []
|
|
|
|
try:
|
|
config = {"configurable": {"thread_id": thread_id}}
|
|
checkpoint_tuple = await checkpointer.aget_tuple(config)
|
|
|
|
if checkpoint_tuple is None or checkpoint_tuple.checkpoint is None:
|
|
logger.debug(f"No checkpoint found for thread_id: {thread_id}")
|
|
return []
|
|
|
|
# Extract message history from the checkpoint.
|
|
checkpoint_data = checkpoint_tuple.checkpoint
|
|
|
|
# In LangGraph checkpoints, messages are usually stored in channel_values['messages'].
|
|
if "channel_values" not in checkpoint_data:
|
|
logger.debug(f"No channel_values in checkpoint for thread_id: {thread_id}")
|
|
return []
|
|
|
|
channel_values = checkpoint_data["channel_values"]
|
|
|
|
if isinstance(channel_values, dict) and "messages" in channel_values:
|
|
history_messages = channel_values["messages"]
|
|
converted = history_messages
|
|
logger.info(f"Loaded {len(converted)} messages from checkpoint for thread_id: {thread_id}")
|
|
return converted
|
|
elif isinstance(channel_values, list):
|
|
# In some cases, channel_values is directly the message list.
|
|
converted = channel_values
|
|
logger.info(f"Loaded {len(converted)} messages from checkpoint for thread_id: {thread_id}")
|
|
return converted
|
|
else:
|
|
logger.debug(f"Unexpected channel_values format: {type(channel_values)}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
logger.error(f"Error getting checkpoint history for thread_id {thread_id}: {e}")
|
|
logger.error(f"Full traceback: {traceback.format_exc()}")
|
|
return []
|
|
|
|
|
|
async def prepare_checkpoint_message(config, checkpointer):
|
|
"""
|
|
Prepare checkpoint-related messages:
|
|
1. Fetch and filter history, excluding messages that contain think tags.
|
|
2. Decide which messages to send based on whether history exists.
|
|
"""
|
|
if not config.session_id or not checkpointer or len(config.messages) == 0:
|
|
logger.debug("No session_id/checkpointer or empty messages, skipping checkpoint")
|
|
return
|
|
|
|
# Load history.
|
|
history = await get_checkpoint_history(checkpointer, config.session_id)
|
|
has_history = len(history) > 0
|
|
|
|
# Filter history and keep the most recent 20 messages.
|
|
if has_history:
|
|
filtered_history = [
|
|
h for h in history
|
|
if getattr(h, "type", None) in ("human", "ai")
|
|
and "<think>" not in str(getattr(h, "content", "")).lower()
|
|
]
|
|
logger.info(f"Filtered {len(filtered_history)} human/ai messages from history")
|
|
config._session_history = filtered_history[-20:]
|
|
|
|
# If history exists, only send the latest user message; otherwise send all messages.
|
|
if has_history:
|
|
last_user_msg = next((m for m in reversed(config.messages) if m.get('role') == 'user'), None)
|
|
if last_user_msg:
|
|
config.messages = [last_user_msg]
|
|
from utils.fastapi_utils import extract_text_from_content
|
|
preview = extract_text_from_content(last_user_msg.get('content', ''))
|
|
logger.info(f"Has history, sending last user message: {preview[:50]}...")
|
|
else:
|
|
logger.info(f"No history, sending all {len(config.messages)} messages")
|