qwen_agent/embedding/embedding.py
朱潮 425f3c5bb4 chore: replace Chinese comments and log messages with English
Convert all Chinese comments, docstrings, logger/print output,
HTTPException detail messages, and API response messages to English
across the entire codebase. Functional zh/ja localized strings
(e.g. prompt templates, timezone display names, date formats) are
preserved as-is.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-30 19:45:35 +08:00

961 lines
33 KiB
Python

import pickle
import re
import numpy as np
import os
from typing import Optional, List, Dict, Any
import requests
import asyncio
import hashlib
import json
import logging
from utils.settings import FASTAPI_URL
# Configure logger
logger = logging.getLogger('app')
def encode_texts_via_api(texts, batch_size=32):
"""Encode texts through the API endpoint"""
if not texts:
return np.array([])
try:
# FastAPI service endpoint
api_endpoint = f"{FASTAPI_URL}/api/v1/embedding/encode"
# Call the encoding endpoint
request_data = {
"texts": texts,
"batch_size": batch_size
}
response = requests.post(
api_endpoint,
json=request_data,
timeout=60 # Increase the timeout
)
if response.status_code == 200:
result_data = response.json()
if result_data.get("success"):
embeddings_list = result_data.get("embeddings", [])
logger.info(f"API encoding succeeded, processed {len(texts)} texts, embedding dimension: {len(embeddings_list[0]) if embeddings_list else 0}")
return np.array(embeddings_list)
else:
error_msg = result_data.get('error', 'Unknown error')
logger.error(f"API encoding failed: {error_msg}")
raise Exception(f"API encoding failed: {error_msg}")
else:
logger.error(f"API request failed: {response.status_code} - {response.text}")
raise Exception(f"API request failed: {response.status_code}")
except Exception as e:
logger.error(f"API encoding exception: {e}")
raise
def clean_text(text):
"""
Clean text by removing special and meaningless characters
Args:
text (str): original text
Returns:
str: cleaned text
"""
# Remove HTML tags
text = re.sub(r'<[^>]+>', '', text)
# Remove extra whitespace characters
text = re.sub(r'\s+', ' ', text)
# Remove control and non-printable characters while preserving Unicode text characters
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
# Trim leading and trailing whitespace
text = text.strip()
return text
def is_meaningful_line(text):
"""
Determine whether a line of text is meaningful
Args:
text (str): text line
Returns:
bool: whether it is meaningful
"""
if not text or len(text.strip()) < 5:
return False
# Filter out lines containing only numbers
if text.strip().isdigit():
return False
# Filter out lines containing only symbols
if re.match(r'^[^\w\u4e00-\u9fa5]+$', text):
return False
# Filter out common meaningless lines
meaningless_patterns = [
r'^[-=_]{3,}$', # separator line
r'^Page\s+\d+$', # page number
r'^\d+\.\s*$', # number only
r'^[a-zA-Z]\.\s*$', # single-letter item label
]
for pattern in meaningless_patterns:
if re.match(pattern, text.strip()):
return False
return True
def embed_document(input_file='document.txt', output_file='embedding.pkl',
chunking_strategy='line', **chunking_params):
"""
Read a document, create embeddings with the specified chunking strategy, and save them as a pickle file
Args:
input_file (str): input document file path
output_file (str): output pickle file path
chunking_strategy (str): chunking strategy, either 'line' or 'paragraph'
**chunking_params: chunking parameters
- for the 'line' strategy: no additional parameters
- for the 'paragraph' strategy:
- max_chunk_size: maximum chunk size (default 1000)
- overlap: overlap size (default 100)
- min_chunk_size: minimum chunk size (default 200)
- separator: paragraph separator (default '\n')
"""
try:
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read()
chunks = []
if chunking_strategy == 'line':
# Original line-based processing logic
lines = content.split('\n')
original_count = len(lines)
for line in lines:
# Clean text
cleaned_text = clean_text(line)
# Check whether it is meaningful
if is_meaningful_line(cleaned_text):
chunks.append(cleaned_text)
logger.info(f"Use line-based chunking strategy")
logger.info(f"Original line count: {original_count}")
logger.info(f"Valid sentence count after cleaning: {len(chunks)}")
logger.info(f"Filter ratio: {((original_count - len(chunks)) / original_count * 100):.1f}%")
elif chunking_strategy == 'paragraph':
# New paragraph-level chunking strategy
# Set default parameters
params = {
'max_chunk_size': 1000,
'overlap': 100,
'min_chunk_size': 200,
'separator': '\n'
}
params.update(chunking_params)
# Clean whitespace in the whole document first
cleaned_content = clean_text(content)
# Use paragraph chunking
chunks = paragraph_chunking(cleaned_content, **params)
logger.info(f"Use paragraph-level chunking strategy")
logger.info(f"Total document length: {len(content)} characters")
logger.info(f"Chunk count: {len(chunks)}")
if chunks:
logger.debug(f"Average chunk size: {sum(len(chunk) for chunk in chunks) / len(chunks):.1f} characters")
logger.debug(f"Maximum chunk size: {max(len(chunk) for chunk in chunks)} characters")
logger.debug(f"Minimum chunk size: {min(len(chunk) for chunk in chunks)} characters")
elif chunking_strategy == 'smart':
# Smart chunking strategy that automatically detects document format
params = {
'max_chunk_size': 1000,
'overlap': 100,
'min_chunk_size': 200
}
params.update(chunking_params)
# Use smart chunking
chunks = smart_chunking(content, **params)
logger.info(f"Use smart chunking strategy")
logger.info(f"Total document length: {len(content)} characters")
logger.info(f"Chunk count: {len(chunks)}")
if chunks:
logger.debug(f"Average chunk size: {sum(len(chunk) for chunk in chunks) / len(chunks):.1f} characters")
logger.debug(f"Maximum chunk size: {max(len(chunk) for chunk in chunks)} characters")
logger.debug(f"Minimum chunk size: {min(len(chunk) for chunk in chunks)} characters")
else:
raise ValueError(f"Unsupported chunking strategy: {chunking_strategy}")
if not chunks:
logger.warning("Warning: no valid content chunks were found.")
return None
logger.info(f"Processing {len(chunks)} content chunks...")
# Encode through the API endpoint
logger.info("Encoding through the API endpoint...")
chunk_embeddings = encode_texts_via_api(chunks, batch_size=32)
embedding_data = {
'chunks': chunks,
'embeddings': chunk_embeddings,
'chunking_strategy': chunking_strategy,
'chunking_params': chunking_params,
'model_path': 'api_service'
}
with open(output_file, 'wb') as f:
pickle.dump(embedding_data, f)
logger.info(f"Saved embeddings to {output_file}")
return embedding_data
except FileNotFoundError:
logger.error(f"Error: file not found: {input_file}")
return None
except Exception as e:
logger.error(f"Error processing document: {e}")
return None
def paragraph_chunking(text, max_chunk_size=1000, overlap=100, min_chunk_size=200, separator='\n\n'):
"""
Paragraph-level smart chunking function that uses fixed-size chunks instead of page-based splitting
Args:
text (str): input text
max_chunk_size (int): maximum chunk size in characters
overlap (int): overlap size in characters
min_chunk_size (int): minimum chunk size in characters
separator (str): paragraph separator
Returns:
list: list of chunked text
"""
if not text or not text.strip():
return []
# Directly use the fixed-length chunking strategy without considering page markers
return _fixed_length_chunking(text, max_chunk_size, overlap, min_chunk_size)
def _split_long_content(content, max_size, min_size, separator):
"""
Split overly long content
Args:
content (str): content to split
max_size (int): maximum size
min_size (int): minimum size
separator (str): separator
Returns:
list: list of split chunks
"""
if len(content) <= max_size:
return [content]
# Try splitting by paragraph
paragraphs = content.split(separator)
if len(paragraphs) > 1:
chunks = []
current_chunk = ""
for para in paragraphs:
if not current_chunk:
current_chunk = para
elif len(current_chunk + separator + para) <= max_size:
current_chunk += separator + para
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = para
if current_chunk:
chunks.append(current_chunk)
return chunks
# If paragraph splitting is not possible, split by sentence
sentences = _split_into_sentences(content)
chunks = []
current_chunk = ""
for sentence in sentences:
if not current_chunk:
current_chunk = sentence
elif len(current_chunk + " " + sentence) <= max_size:
current_chunk += " " + sentence
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = sentence
if current_chunk:
chunks.append(current_chunk)
return chunks
def _split_into_sentences(text):
"""
Split text into sentences
Args:
text (str): input text
Returns:
list: list of sentences
"""
# Simple sentence splitting that can be improved if needed
import re
# Split on periods, question marks, and exclamation marks while preserving decimal points
sentence_endings = re.compile(r'(?<=[.!?])\s+(?=[\dA-Z\u4e00-\u9fa5])')
sentences = sentence_endings.split(text.strip())
return [s.strip() for s in sentences if s.strip()]
def _create_overlap_chunk(previous_chunk, new_paragraph, overlap_size):
"""
Create a new chunk with overlapping content
Args:
previous_chunk (str): previous chunk
new_paragraph (str): new paragraph
overlap_size (int): overlap size
Returns:
str: new chunk with overlap
"""
if overlap_size <= 0:
return new_paragraph
# Get overlapping content from the end of the previous chunk
overlap_text = previous_chunk[-overlap_size:] if len(previous_chunk) > overlap_size else previous_chunk
# Try splitting the overlapping content at sentence boundaries
sentences = _split_into_sentences(overlap_text)
if len(sentences) > 1:
# Drop the possibly incomplete first sentence
overlap_text = " ".join(sentences[1:])
elif len(overlap_text) > overlap_size * 0.5:
# If there is only one sentence and its length is appropriate, keep it
pass
else:
# If the overlap content is too short, do not use overlap
return new_paragraph
return overlap_text + "\n\n" + new_paragraph
def _add_overlap_to_chunk(previous_chunk, current_chunk, overlap_size):
"""
Add overlap from the previous chunk to the current chunk
Args:
previous_chunk (str): previous chunk
current_chunk (str): current chunk
overlap_size (int): overlap size
Returns:
str: chunk with overlap
"""
if overlap_size <= 0:
return current_chunk
# Get overlapping content from the end of the previous chunk
overlap_text = previous_chunk[-overlap_size:] if len(previous_chunk) > overlap_size else previous_chunk
# Try splitting at sentence boundaries
sentences = _split_into_sentences(overlap_text)
if len(sentences) > 1:
overlap_text = " ".join(sentences[1:])
return overlap_text + "\n\n" + current_chunk
def smart_chunking(text, max_chunk_size=1000, overlap=100, min_chunk_size=200):
"""
Smart chunking function that detects document structure and selects the best chunking strategy
Args:
text (str): input text
max_chunk_size (int): maximum chunk size in characters
overlap (int): overlap size in characters
min_chunk_size (int): minimum chunk size in characters
Returns:
list: list of chunked text
"""
if not text or not text.strip():
return []
# Detect document type, supporting both # Page and # File formats
has_page_markers = '# Page' in text or '# File' in text
has_paragraph_breaks = '\n\n' in text
has_line_breaks = '\n' in text
# Select the appropriate separator and strategy
if has_page_markers:
# Use the page separator
return _page_based_chunking(text, max_chunk_size, overlap, min_chunk_size)
elif has_paragraph_breaks:
# Use the paragraph separator
return paragraph_chunking(text, max_chunk_size, overlap, min_chunk_size, '\n\n')
elif has_line_breaks:
# Use the line separator
return _line_based_chunking(text, max_chunk_size, overlap, min_chunk_size)
else:
# Chunk by fixed length
return _fixed_length_chunking(text, max_chunk_size, overlap, min_chunk_size)
def _page_based_chunking(text, max_chunk_size, overlap, min_chunk_size):
"""Page-based chunking strategy"""
import re
# Split pages with a regular expression, supporting both # Page and # File formats
page_pattern = r'#\s*(Page\s+\d+|File\s+[^\n]+)'
pages = re.split(page_pattern, text)
# Clean and filter page content
cleaned_pages = []
for page in pages:
page = page.strip()
if page and len(page) > min_chunk_size * 0.3: # Filter out pages that are too small
cleaned_pages.append(page)
if not cleaned_pages:
return []
# If page content is too large, split it further
chunks = []
for page in cleaned_pages:
if len(page) <= max_chunk_size:
chunks.append(page)
else:
# Page is too large and must be split
sub_chunks = _split_long_content(page, max_chunk_size, min_chunk_size, '\n')
chunks.extend(sub_chunks)
# Add overlap
if overlap > 0 and len(chunks) > 1:
chunks = _add_overlaps_to_chunks(chunks, overlap)
return chunks
def _line_based_chunking(text, max_chunk_size, overlap, min_chunk_size):
"""Line-based chunking strategy"""
lines = text.split('\n')
chunks = []
current_chunk = ""
for line in lines:
line = line.strip()
if not line:
continue
if not current_chunk:
current_chunk = line
elif len(current_chunk + '\n' + line) <= max_chunk_size:
current_chunk += '\n' + line
else:
if len(current_chunk) >= min_chunk_size:
chunks.append(current_chunk)
current_chunk = _create_overlap_for_line(current_chunk, line, overlap)
else:
# The current line is too long and must be split
split_chunks = _split_long_content(current_chunk + '\n' + line, max_chunk_size, min_chunk_size, '\n')
if chunks and split_chunks:
split_chunks[0] = _add_overlap_to_chunk(chunks[-1], split_chunks[0], overlap)
chunks.extend(split_chunks[:-1])
current_chunk = split_chunks[-1] if split_chunks else ""
if current_chunk and len(current_chunk) >= min_chunk_size:
chunks.append(current_chunk)
elif current_chunk and chunks:
chunks[-1] += '\n' + current_chunk
return chunks
def _fixed_length_chunking(text, max_chunk_size, overlap, min_chunk_size):
"""Fixed-length chunking strategy"""
chunks = []
start = 0
while start < len(text):
end = start + max_chunk_size
if end >= len(text):
chunks.append(text[start:])
break
# Try splitting at periods, question marks, or exclamation marks
split_pos = end
for i in range(end, max(start, end - 100), -1):
if text[i] in '.!?。!?':
split_pos = i + 1
break
chunk = text[start:split_pos]
if len(chunk) >= min_chunk_size:
chunks.append(chunk)
start = split_pos - overlap if overlap > 0 else split_pos
else:
start += max_chunk_size // 2
return chunks
def _create_overlap_for_line(previous_chunk, new_line, overlap_size):
"""Create overlap for line-based chunks"""
if overlap_size <= 0:
return new_line
# Get overlapping content from the end of the previous chunk
overlap_text = previous_chunk[-overlap_size:] if len(previous_chunk) > overlap_size else previous_chunk
# Try splitting at an appropriate boundary
last_newline = overlap_text.rfind('\n')
if last_newline > 0:
overlap_text = overlap_text[last_newline + 1:]
return overlap_text + '\n' + new_line
def _add_overlaps_to_chunks(chunks, overlap_size):
"""Add overlap to chunks"""
if overlap_size <= 0 or len(chunks) <= 1:
return chunks
result = [chunks[0]]
for i in range(1, len(chunks)):
previous_chunk = chunks[i-1]
current_chunk = chunks[i]
# Add overlap
overlap_text = previous_chunk[-overlap_size:] if len(previous_chunk) > overlap_size else previous_chunk
# Try splitting at an appropriate boundary
last_newline = overlap_text.rfind('\n')
if last_newline > 0:
overlap_text = overlap_text[last_newline + 1:]
elif '.' in overlap_text:
# Try splitting at a period
last_period = overlap_text.rfind('.')
if last_period > 0:
overlap_text = overlap_text[last_period + 1:].strip()
if overlap_text:
combined_chunk = overlap_text + '\n\n' + current_chunk
result.append(combined_chunk)
else:
result.append(current_chunk)
return result
def split_document_by_pages(input_file='document.txt', output_file='pagination.txt'):
"""
Split document.txt by page or file markers and write each page as one line into pagination.txt
Args:
input_file (str): input document file path
output_file (str): output serialized file path
"""
try:
with open(input_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
pages = []
current_page = []
for line in lines:
line = line.strip()
# Check whether this is a page separator, supporting both # Page and # File formats
if re.match(r'^#\s*(Page|File)', line, re.IGNORECASE):
# If the current page has content, save it
if current_page:
# Merge the current page content into a single line
page_content = ' '.join(current_page).strip()
if page_content: # Save non-empty pages only
pages.append(page_content)
current_page = []
continue
# If this is not a page separator and it has content, add it to the current page
if line:
current_page.append(line)
# Process the last page
if current_page:
page_content = ' '.join(current_page).strip()
if page_content:
pages.append(page_content)
logger.info(f"Split into a total of {len(pages)} pages")
# Write the serialized file
with open(output_file, 'w', encoding='utf-8') as f:
for i, page_content in enumerate(pages, 1):
f.write(f"{page_content}\n")
logger.info(f"Serialized page content to {output_file}")
return pages
except FileNotFoundError:
logger.error(f"Error: file not found: {input_file}")
return []
except Exception as e:
logger.error(f"Error while splitting the document: {e}")
return []
def test_chunking_strategies():
"""
Test different chunking strategies and compare their results
"""
# Test text
test_text = """
First paragraph: This is a test paragraph. It contains multiple sentences. It is used to test chunking functionality.
Second paragraph: This is another paragraph. It also contains multiple sentences to validate the chunking strategy. We need to ensure chunk quality.
Third paragraph: This is the third paragraph, and it is relatively long with more information. It may trigger the chunking logic because it could exceed the maximum chunk size limit. We need to ensure the algorithm handles this case correctly and splits at sentence boundaries.
Fourth paragraph: This is the fourth paragraph. It is relatively short.
Fifth paragraph: This is the final paragraph. It is used to test the completeness and accuracy of the chunking strategy.
"""
logger.debug("=" * 60)
logger.debug("Chunking strategy test")
logger.debug("=" * 60)
# Test 1: Paragraph-level chunking (small chunks)
logger.debug("\n1. Paragraph-level chunking - small chunks (max_size=200):")
chunks_small = paragraph_chunking(test_text, max_chunk_size=200, overlap=50)
for i, chunk in enumerate(chunks_small):
logger.debug(f"Chunk {i+1} (length: {len(chunk)}): {chunk[:50]}...")
# Test 2: Paragraph-level chunking (large chunks)
logger.debug("\n2. Paragraph-level chunking - large chunks (max_size=500):")
chunks_large = paragraph_chunking(test_text, max_chunk_size=500, overlap=100)
for i, chunk in enumerate(chunks_large):
logger.debug(f"Chunk {i+1} (length: {len(chunk)}): {chunk[:50]}...")
# Test 3: Paragraph-level chunking (no overlap)
logger.debug("\n3. Paragraph-level chunking - no overlap:")
chunks_no_overlap = paragraph_chunking(test_text, max_chunk_size=300, overlap=0)
for i, chunk in enumerate(chunks_no_overlap):
logger.debug(f"Chunk {i+1} (length: {len(chunk)}): {chunk[:50]}...")
logger.debug(f"\nTest summary:")
logger.debug(f"- Small-chunk strategy: {len(chunks_small)} chunks")
logger.debug(f"- Large-chunk strategy: {len(chunks_large)} chunks")
logger.debug(f"- No-overlap strategy: {len(chunks_no_overlap)} chunks")
def demo_usage():
"""
Demonstrate how to use the new chunking features
"""
logger.debug("=" * 60)
logger.debug("Usage examples")
logger.debug("=" * 60)
logger.debug("\n1. Use traditional line-based chunking:")
logger.debug("embed_document('document.txt', 'line_embedding.pkl', chunking_strategy='line')")
logger.debug("\n2. Use paragraph-level chunking with default parameters:")
logger.debug("embed_document('document.txt', 'paragraph_embedding.pkl', chunking_strategy='paragraph')")
logger.debug("\n3. Use paragraph-level chunking with custom parameters:")
logger.debug("embed_document('document.txt', 'custom_embedding.pkl',")
logger.debug(" chunking_strategy='paragraph',")
logger.debug(" max_chunk_size=1500,")
logger.debug(" overlap=200,")
logger.debug(" min_chunk_size=300)")
# If this file is run directly, execute the test
if __name__ == "__main__":
#test_chunking_strategies()
#demo_usage()
# Example of using the new smart chunking:
embed_document("./projects/test/dataset/all_hp_product_spec_book2506/document.txt",
"./projects/test/dataset/all_hp_product_spec_book2506/smart_embedding.pkl",
chunking_strategy='smart', # Use smart chunking strategy
max_chunk_size=800, # smaller chunk size
overlap=100)
def cache_terms_embeddings(bot_id: str, terms_list: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Process the terms list, generate embeddings, and cache them
Args:
bot_id: bot ID used as the cache key
terms_list: list of terms, where each term contains fields such as name, description, and synonyms
Returns:
Dict: dictionary containing embedding data
"""
if not terms_list:
return {}
cache_key = f"{bot_id}_terms"
cache_file = f"projects/cache/{cache_key}.pkl"
# Ensure the cache directory exists
os.makedirs("projects/cache", exist_ok=True)
# Check whether the cache exists and is valid
if os.path.exists(cache_file):
try:
with open(cache_file, 'rb') as f:
cached_data = pickle.load(f)
# Verify that the cached data matches the current terms
current_hash = _generate_terms_hash(terms_list)
if cached_data.get('hash') == current_hash:
logger.info(f"Using cached terms embeddings for {cache_key}")
return cached_data
except Exception as e:
logger.error(f"Error loading cache: {e}")
# Prepare the texts to encode
term_texts = []
term_info = []
for term in terms_list:
# Build the full term text for embedding
term_text_parts = []
if 'name' in term and term['name']:
term_text_parts.append(f"Name: {term['name']}")
if 'description' in term and term['description']:
term_text_parts.append(f"Description: {term['description']}")
# Process synonyms
synonyms = []
if 'synonyms' in term and term['synonyms']:
if isinstance(term['synonyms'], list):
synonyms = term['synonyms']
elif isinstance(term['synonyms'], str):
synonyms = [s.strip() for s in term['synonyms'].split(',') if s.strip()]
if synonyms:
term_text_parts.append(f"Synonyms: {', '.join(synonyms)}")
term_text = " | ".join(term_text_parts)
term_texts.append(term_text)
# Store the original information
term_info.append({
'name': term.get('name', ''),
'description': term.get('description', ''),
'synonyms': synonyms
})
# Generate embeddings
try:
embeddings = encode_texts_via_api(term_texts, batch_size=16)
# Prepare cache data
cache_data = {
'hash': _generate_terms_hash(terms_list),
'term_info': term_info,
'embeddings': embeddings,
'texts': term_texts
}
# Save to cache
with open(cache_file, 'wb') as f:
pickle.dump(cache_data, f)
logger.info(f"Cached {len(term_texts)} terms embeddings to {cache_file}")
return cache_data
except Exception as e:
logger.error(f"Error generating terms embeddings: {e}")
return {}
def search_similar_terms(query_text: str, cached_terms_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Search cached terms for entries similar to the query text
Args:
query_text: query text
cached_terms_data: cached term data
Returns:
List[Dict]: list of matched terms, sorted by similarity in descending order
"""
if not cached_terms_data or not query_text or 'embeddings' not in cached_terms_data:
return []
try:
# Generate an embedding for the query text
query_embedding = encode_texts_via_api([query_text], batch_size=1)
if len(query_embedding) == 0:
return []
query_vector = query_embedding[0]
term_embeddings = cached_terms_data['embeddings']
term_info = cached_terms_data['term_info']
# Add debug information
logger.debug(f"DEBUG: Query text: '{query_text}'")
logger.debug(f"DEBUG: Query vector shape: {query_vector.shape}, norm: {np.linalg.norm(query_vector)}")
# Compute cosine similarity
similarities = _cosine_similarity(query_vector, term_embeddings)
logger.debug(f"DEBUG: Similarities: {similarities}")
logger.debug(f"DEBUG: Max similarity: {np.max(similarities):.3f}, Mean similarity: {np.mean(similarities):.3f}")
# Get similarity scores for all terms
matches = []
for i, similarity in enumerate(similarities):
match = {
'term_info': term_info[i],
'similarity': float(similarity),
'index': i
}
matches.append(match)
# Sort by similarity in descending order
matches.sort(key=lambda x: x['similarity'], reverse=True)
# Return only the top 5 results
return matches[:5]
except Exception as e:
logger.error(f"Error in similarity search: {e}")
return []
def format_terms_analysis(similar_terms: List[Dict[str, Any]]) -> str:
"""
Format similar terms into the required string format
Args:
similar_terms: list of similar terms
Returns:
str: formatted term analysis
"""
if not similar_terms:
return ""
formatted_terms = []
for i, match in enumerate(similar_terms, 1):
term_info = match['term_info']
similarity = match['similarity']
name = term_info.get('name', '')
description = term_info.get('description', '')
synonyms = term_info.get('synonyms', [])
# Format synonyms
synonyms_str = ', '.join(synonyms) if synonyms else 'N/A'
formatted_term = f"{i}) Name: {name}, Description: {description}, Synonyms: {synonyms_str} (Similarity: {similarity:.3f})"
formatted_terms.append(formatted_term)
return "\n".join(formatted_terms)
def _generate_terms_hash(terms_list: List[Dict[str, Any]]) -> str:
"""Generate a hash for the terms list to validate the cache"""
# Convert the terms list into a normalized string
terms_str = json.dumps(terms_list, sort_keys=True, ensure_ascii=False)
return hashlib.md5(terms_str.encode('utf-8')).hexdigest()
def _cosine_similarity(query_vector: np.ndarray, term_embeddings: np.ndarray) -> np.ndarray:
"""
Compute cosine similarity between the query vector and all term embeddings
Follow the implementation in semantic_search_server.py and assume vectors are already normalized
Args:
query_vector: query vector (shape: [embedding_dim])
term_embeddings: term embedding matrix (shape: [n_terms, embedding_dim])
Returns:
np.ndarray: similarity array (shape: [n_terms])
"""
# Use the same algorithm as semantic_search_server.py
if len(term_embeddings.shape) > 1:
cos_scores = np.dot(term_embeddings, query_vector) / (
np.linalg.norm(term_embeddings, axis=1) * np.linalg.norm(query_vector) + 1e-8
)
else:
cos_scores = np.array([0.0] * len(term_embeddings))
return cos_scores
def process_terms_with_embedding(terms_list: List[Dict[str, Any]], bot_id: str, query_text: str) -> str:
"""
Complete term-processing flow: cache, similarity search, and formatted output
Args:
terms_list: list of terms
bot_id: bot ID
query_text: user query text
Returns:
str: formatted term analysis result
"""
if not terms_list or not query_text:
return ""
# 1. Cache term embeddings
cached_data = cache_terms_embeddings(bot_id, terms_list)
if not cached_data:
return ""
# 2. Search for similar terms and take the top 5
similar_terms = search_similar_terms(query_text, cached_data)
# 3. Format the output
if similar_terms:
return format_terms_analysis(similar_terms)
else:
# When no similar terms are found, return an empty string or a hint message
# Return an empty string here so the caller can decide how to handle it
return ""
# Other example calls (commented out):
# split_document_by_pages("/Users/moshui/Documents/felo/qwen-agent/projects/test/dataset/all_hp_product_spec_book2506/document.txt")
# embed_document("/Users/moshui/Documents/felo/qwen-agent/projects/test/dataset/all_hp_product_spec_book2506/document.txt") # uncomment to run