import pickle import re import numpy as np import os from typing import Optional, List, Dict, Any import requests import asyncio import hashlib import json import logging from utils.settings import FASTAPI_URL # Configure logger logger = logging.getLogger('app') def encode_texts_via_api(texts, batch_size=32): """Encode texts through the API endpoint""" if not texts: return np.array([]) try: # FastAPI service endpoint api_endpoint = f"{FASTAPI_URL}/api/v1/embedding/encode" # Call the encoding endpoint request_data = { "texts": texts, "batch_size": batch_size } response = requests.post( api_endpoint, json=request_data, timeout=60 # Increase the timeout ) if response.status_code == 200: result_data = response.json() if result_data.get("success"): embeddings_list = result_data.get("embeddings", []) logger.info(f"API encoding succeeded, processed {len(texts)} texts, embedding dimension: {len(embeddings_list[0]) if embeddings_list else 0}") return np.array(embeddings_list) else: error_msg = result_data.get('error', 'Unknown error') logger.error(f"API encoding failed: {error_msg}") raise Exception(f"API encoding failed: {error_msg}") else: logger.error(f"API request failed: {response.status_code} - {response.text}") raise Exception(f"API request failed: {response.status_code}") except Exception as e: logger.error(f"API encoding exception: {e}") raise def clean_text(text): """ Clean text by removing special and meaningless characters Args: text (str): original text Returns: str: cleaned text """ # Remove HTML tags text = re.sub(r'<[^>]+>', '', text) # Remove extra whitespace characters text = re.sub(r'\s+', ' ', text) # Remove control and non-printable characters while preserving Unicode text characters text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text) # Trim leading and trailing whitespace text = text.strip() return text def is_meaningful_line(text): """ Determine whether a line of text is meaningful Args: text (str): text line Returns: bool: whether it is meaningful """ if not text or len(text.strip()) < 5: return False # Filter out lines containing only numbers if text.strip().isdigit(): return False # Filter out lines containing only symbols if re.match(r'^[^\w\u4e00-\u9fa5]+$', text): return False # Filter out common meaningless lines meaningless_patterns = [ r'^[-=_]{3,}$', # separator line r'^Page\s+\d+$', # page number r'^\d+\.\s*$', # number only r'^[a-zA-Z]\.\s*$', # single-letter item label ] for pattern in meaningless_patterns: if re.match(pattern, text.strip()): return False return True def embed_document(input_file='document.txt', output_file='embedding.pkl', chunking_strategy='line', **chunking_params): """ Read a document, create embeddings with the specified chunking strategy, and save them as a pickle file Args: input_file (str): input document file path output_file (str): output pickle file path chunking_strategy (str): chunking strategy, either 'line' or 'paragraph' **chunking_params: chunking parameters - for the 'line' strategy: no additional parameters - for the 'paragraph' strategy: - max_chunk_size: maximum chunk size (default 1000) - overlap: overlap size (default 100) - min_chunk_size: minimum chunk size (default 200) - separator: paragraph separator (default '\n') """ try: with open(input_file, 'r', encoding='utf-8') as f: content = f.read() chunks = [] if chunking_strategy == 'line': # Original line-based processing logic lines = content.split('\n') original_count = len(lines) for line in lines: # Clean text cleaned_text = clean_text(line) # Check whether it is meaningful if is_meaningful_line(cleaned_text): chunks.append(cleaned_text) logger.info(f"Use line-based chunking strategy") logger.info(f"Original line count: {original_count}") logger.info(f"Valid sentence count after cleaning: {len(chunks)}") logger.info(f"Filter ratio: {((original_count - len(chunks)) / original_count * 100):.1f}%") elif chunking_strategy == 'paragraph': # New paragraph-level chunking strategy # Set default parameters params = { 'max_chunk_size': 1000, 'overlap': 100, 'min_chunk_size': 200, 'separator': '\n' } params.update(chunking_params) # Clean whitespace in the whole document first cleaned_content = clean_text(content) # Use paragraph chunking chunks = paragraph_chunking(cleaned_content, **params) logger.info(f"Use paragraph-level chunking strategy") logger.info(f"Total document length: {len(content)} characters") logger.info(f"Chunk count: {len(chunks)}") if chunks: logger.debug(f"Average chunk size: {sum(len(chunk) for chunk in chunks) / len(chunks):.1f} characters") logger.debug(f"Maximum chunk size: {max(len(chunk) for chunk in chunks)} characters") logger.debug(f"Minimum chunk size: {min(len(chunk) for chunk in chunks)} characters") elif chunking_strategy == 'smart': # Smart chunking strategy that automatically detects document format params = { 'max_chunk_size': 1000, 'overlap': 100, 'min_chunk_size': 200 } params.update(chunking_params) # Use smart chunking chunks = smart_chunking(content, **params) logger.info(f"Use smart chunking strategy") logger.info(f"Total document length: {len(content)} characters") logger.info(f"Chunk count: {len(chunks)}") if chunks: logger.debug(f"Average chunk size: {sum(len(chunk) for chunk in chunks) / len(chunks):.1f} characters") logger.debug(f"Maximum chunk size: {max(len(chunk) for chunk in chunks)} characters") logger.debug(f"Minimum chunk size: {min(len(chunk) for chunk in chunks)} characters") else: raise ValueError(f"Unsupported chunking strategy: {chunking_strategy}") if not chunks: logger.warning("Warning: no valid content chunks were found.") return None logger.info(f"Processing {len(chunks)} content chunks...") # Encode through the API endpoint logger.info("Encoding through the API endpoint...") chunk_embeddings = encode_texts_via_api(chunks, batch_size=32) embedding_data = { 'chunks': chunks, 'embeddings': chunk_embeddings, 'chunking_strategy': chunking_strategy, 'chunking_params': chunking_params, 'model_path': 'api_service' } with open(output_file, 'wb') as f: pickle.dump(embedding_data, f) logger.info(f"Saved embeddings to {output_file}") return embedding_data except FileNotFoundError: logger.error(f"Error: file not found: {input_file}") return None except Exception as e: logger.error(f"Error processing document: {e}") return None def paragraph_chunking(text, max_chunk_size=1000, overlap=100, min_chunk_size=200, separator='\n\n'): """ Paragraph-level smart chunking function that uses fixed-size chunks instead of page-based splitting Args: text (str): input text max_chunk_size (int): maximum chunk size in characters overlap (int): overlap size in characters min_chunk_size (int): minimum chunk size in characters separator (str): paragraph separator Returns: list: list of chunked text """ if not text or not text.strip(): return [] # Directly use the fixed-length chunking strategy without considering page markers return _fixed_length_chunking(text, max_chunk_size, overlap, min_chunk_size) def _split_long_content(content, max_size, min_size, separator): """ Split overly long content Args: content (str): content to split max_size (int): maximum size min_size (int): minimum size separator (str): separator Returns: list: list of split chunks """ if len(content) <= max_size: return [content] # Try splitting by paragraph paragraphs = content.split(separator) if len(paragraphs) > 1: chunks = [] current_chunk = "" for para in paragraphs: if not current_chunk: current_chunk = para elif len(current_chunk + separator + para) <= max_size: current_chunk += separator + para else: if current_chunk: chunks.append(current_chunk) current_chunk = para if current_chunk: chunks.append(current_chunk) return chunks # If paragraph splitting is not possible, split by sentence sentences = _split_into_sentences(content) chunks = [] current_chunk = "" for sentence in sentences: if not current_chunk: current_chunk = sentence elif len(current_chunk + " " + sentence) <= max_size: current_chunk += " " + sentence else: if current_chunk: chunks.append(current_chunk) current_chunk = sentence if current_chunk: chunks.append(current_chunk) return chunks def _split_into_sentences(text): """ Split text into sentences Args: text (str): input text Returns: list: list of sentences """ # Simple sentence splitting that can be improved if needed import re # Split on periods, question marks, and exclamation marks while preserving decimal points sentence_endings = re.compile(r'(?<=[.!?])\s+(?=[\dA-Z\u4e00-\u9fa5])') sentences = sentence_endings.split(text.strip()) return [s.strip() for s in sentences if s.strip()] def _create_overlap_chunk(previous_chunk, new_paragraph, overlap_size): """ Create a new chunk with overlapping content Args: previous_chunk (str): previous chunk new_paragraph (str): new paragraph overlap_size (int): overlap size Returns: str: new chunk with overlap """ if overlap_size <= 0: return new_paragraph # Get overlapping content from the end of the previous chunk overlap_text = previous_chunk[-overlap_size:] if len(previous_chunk) > overlap_size else previous_chunk # Try splitting the overlapping content at sentence boundaries sentences = _split_into_sentences(overlap_text) if len(sentences) > 1: # Drop the possibly incomplete first sentence overlap_text = " ".join(sentences[1:]) elif len(overlap_text) > overlap_size * 0.5: # If there is only one sentence and its length is appropriate, keep it pass else: # If the overlap content is too short, do not use overlap return new_paragraph return overlap_text + "\n\n" + new_paragraph def _add_overlap_to_chunk(previous_chunk, current_chunk, overlap_size): """ Add overlap from the previous chunk to the current chunk Args: previous_chunk (str): previous chunk current_chunk (str): current chunk overlap_size (int): overlap size Returns: str: chunk with overlap """ if overlap_size <= 0: return current_chunk # Get overlapping content from the end of the previous chunk overlap_text = previous_chunk[-overlap_size:] if len(previous_chunk) > overlap_size else previous_chunk # Try splitting at sentence boundaries sentences = _split_into_sentences(overlap_text) if len(sentences) > 1: overlap_text = " ".join(sentences[1:]) return overlap_text + "\n\n" + current_chunk def smart_chunking(text, max_chunk_size=1000, overlap=100, min_chunk_size=200): """ Smart chunking function that detects document structure and selects the best chunking strategy Args: text (str): input text max_chunk_size (int): maximum chunk size in characters overlap (int): overlap size in characters min_chunk_size (int): minimum chunk size in characters Returns: list: list of chunked text """ if not text or not text.strip(): return [] # Detect document type, supporting both # Page and # File formats has_page_markers = '# Page' in text or '# File' in text has_paragraph_breaks = '\n\n' in text has_line_breaks = '\n' in text # Select the appropriate separator and strategy if has_page_markers: # Use the page separator return _page_based_chunking(text, max_chunk_size, overlap, min_chunk_size) elif has_paragraph_breaks: # Use the paragraph separator return paragraph_chunking(text, max_chunk_size, overlap, min_chunk_size, '\n\n') elif has_line_breaks: # Use the line separator return _line_based_chunking(text, max_chunk_size, overlap, min_chunk_size) else: # Chunk by fixed length return _fixed_length_chunking(text, max_chunk_size, overlap, min_chunk_size) def _page_based_chunking(text, max_chunk_size, overlap, min_chunk_size): """Page-based chunking strategy""" import re # Split pages with a regular expression, supporting both # Page and # File formats page_pattern = r'#\s*(Page\s+\d+|File\s+[^\n]+)' pages = re.split(page_pattern, text) # Clean and filter page content cleaned_pages = [] for page in pages: page = page.strip() if page and len(page) > min_chunk_size * 0.3: # Filter out pages that are too small cleaned_pages.append(page) if not cleaned_pages: return [] # If page content is too large, split it further chunks = [] for page in cleaned_pages: if len(page) <= max_chunk_size: chunks.append(page) else: # Page is too large and must be split sub_chunks = _split_long_content(page, max_chunk_size, min_chunk_size, '\n') chunks.extend(sub_chunks) # Add overlap if overlap > 0 and len(chunks) > 1: chunks = _add_overlaps_to_chunks(chunks, overlap) return chunks def _line_based_chunking(text, max_chunk_size, overlap, min_chunk_size): """Line-based chunking strategy""" lines = text.split('\n') chunks = [] current_chunk = "" for line in lines: line = line.strip() if not line: continue if not current_chunk: current_chunk = line elif len(current_chunk + '\n' + line) <= max_chunk_size: current_chunk += '\n' + line else: if len(current_chunk) >= min_chunk_size: chunks.append(current_chunk) current_chunk = _create_overlap_for_line(current_chunk, line, overlap) else: # The current line is too long and must be split split_chunks = _split_long_content(current_chunk + '\n' + line, max_chunk_size, min_chunk_size, '\n') if chunks and split_chunks: split_chunks[0] = _add_overlap_to_chunk(chunks[-1], split_chunks[0], overlap) chunks.extend(split_chunks[:-1]) current_chunk = split_chunks[-1] if split_chunks else "" if current_chunk and len(current_chunk) >= min_chunk_size: chunks.append(current_chunk) elif current_chunk and chunks: chunks[-1] += '\n' + current_chunk return chunks def _fixed_length_chunking(text, max_chunk_size, overlap, min_chunk_size): """Fixed-length chunking strategy""" chunks = [] start = 0 while start < len(text): end = start + max_chunk_size if end >= len(text): chunks.append(text[start:]) break # Try splitting at periods, question marks, or exclamation marks split_pos = end for i in range(end, max(start, end - 100), -1): if text[i] in '.!?。!?': split_pos = i + 1 break chunk = text[start:split_pos] if len(chunk) >= min_chunk_size: chunks.append(chunk) start = split_pos - overlap if overlap > 0 else split_pos else: start += max_chunk_size // 2 return chunks def _create_overlap_for_line(previous_chunk, new_line, overlap_size): """Create overlap for line-based chunks""" if overlap_size <= 0: return new_line # Get overlapping content from the end of the previous chunk overlap_text = previous_chunk[-overlap_size:] if len(previous_chunk) > overlap_size else previous_chunk # Try splitting at an appropriate boundary last_newline = overlap_text.rfind('\n') if last_newline > 0: overlap_text = overlap_text[last_newline + 1:] return overlap_text + '\n' + new_line def _add_overlaps_to_chunks(chunks, overlap_size): """Add overlap to chunks""" if overlap_size <= 0 or len(chunks) <= 1: return chunks result = [chunks[0]] for i in range(1, len(chunks)): previous_chunk = chunks[i-1] current_chunk = chunks[i] # Add overlap overlap_text = previous_chunk[-overlap_size:] if len(previous_chunk) > overlap_size else previous_chunk # Try splitting at an appropriate boundary last_newline = overlap_text.rfind('\n') if last_newline > 0: overlap_text = overlap_text[last_newline + 1:] elif '.' in overlap_text: # Try splitting at a period last_period = overlap_text.rfind('.') if last_period > 0: overlap_text = overlap_text[last_period + 1:].strip() if overlap_text: combined_chunk = overlap_text + '\n\n' + current_chunk result.append(combined_chunk) else: result.append(current_chunk) return result def split_document_by_pages(input_file='document.txt', output_file='pagination.txt'): """ Split document.txt by page or file markers and write each page as one line into pagination.txt Args: input_file (str): input document file path output_file (str): output serialized file path """ try: with open(input_file, 'r', encoding='utf-8') as f: lines = f.readlines() pages = [] current_page = [] for line in lines: line = line.strip() # Check whether this is a page separator, supporting both # Page and # File formats if re.match(r'^#\s*(Page|File)', line, re.IGNORECASE): # If the current page has content, save it if current_page: # Merge the current page content into a single line page_content = ' '.join(current_page).strip() if page_content: # Save non-empty pages only pages.append(page_content) current_page = [] continue # If this is not a page separator and it has content, add it to the current page if line: current_page.append(line) # Process the last page if current_page: page_content = ' '.join(current_page).strip() if page_content: pages.append(page_content) logger.info(f"Split into a total of {len(pages)} pages") # Write the serialized file with open(output_file, 'w', encoding='utf-8') as f: for i, page_content in enumerate(pages, 1): f.write(f"{page_content}\n") logger.info(f"Serialized page content to {output_file}") return pages except FileNotFoundError: logger.error(f"Error: file not found: {input_file}") return [] except Exception as e: logger.error(f"Error while splitting the document: {e}") return [] def test_chunking_strategies(): """ Test different chunking strategies and compare their results """ # Test text test_text = """ First paragraph: This is a test paragraph. It contains multiple sentences. It is used to test chunking functionality. Second paragraph: This is another paragraph. It also contains multiple sentences to validate the chunking strategy. We need to ensure chunk quality. Third paragraph: This is the third paragraph, and it is relatively long with more information. It may trigger the chunking logic because it could exceed the maximum chunk size limit. We need to ensure the algorithm handles this case correctly and splits at sentence boundaries. Fourth paragraph: This is the fourth paragraph. It is relatively short. Fifth paragraph: This is the final paragraph. It is used to test the completeness and accuracy of the chunking strategy. """ logger.debug("=" * 60) logger.debug("Chunking strategy test") logger.debug("=" * 60) # Test 1: Paragraph-level chunking (small chunks) logger.debug("\n1. Paragraph-level chunking - small chunks (max_size=200):") chunks_small = paragraph_chunking(test_text, max_chunk_size=200, overlap=50) for i, chunk in enumerate(chunks_small): logger.debug(f"Chunk {i+1} (length: {len(chunk)}): {chunk[:50]}...") # Test 2: Paragraph-level chunking (large chunks) logger.debug("\n2. Paragraph-level chunking - large chunks (max_size=500):") chunks_large = paragraph_chunking(test_text, max_chunk_size=500, overlap=100) for i, chunk in enumerate(chunks_large): logger.debug(f"Chunk {i+1} (length: {len(chunk)}): {chunk[:50]}...") # Test 3: Paragraph-level chunking (no overlap) logger.debug("\n3. Paragraph-level chunking - no overlap:") chunks_no_overlap = paragraph_chunking(test_text, max_chunk_size=300, overlap=0) for i, chunk in enumerate(chunks_no_overlap): logger.debug(f"Chunk {i+1} (length: {len(chunk)}): {chunk[:50]}...") logger.debug(f"\nTest summary:") logger.debug(f"- Small-chunk strategy: {len(chunks_small)} chunks") logger.debug(f"- Large-chunk strategy: {len(chunks_large)} chunks") logger.debug(f"- No-overlap strategy: {len(chunks_no_overlap)} chunks") def demo_usage(): """ Demonstrate how to use the new chunking features """ logger.debug("=" * 60) logger.debug("Usage examples") logger.debug("=" * 60) logger.debug("\n1. Use traditional line-based chunking:") logger.debug("embed_document('document.txt', 'line_embedding.pkl', chunking_strategy='line')") logger.debug("\n2. Use paragraph-level chunking with default parameters:") logger.debug("embed_document('document.txt', 'paragraph_embedding.pkl', chunking_strategy='paragraph')") logger.debug("\n3. Use paragraph-level chunking with custom parameters:") logger.debug("embed_document('document.txt', 'custom_embedding.pkl',") logger.debug(" chunking_strategy='paragraph',") logger.debug(" max_chunk_size=1500,") logger.debug(" overlap=200,") logger.debug(" min_chunk_size=300)") # If this file is run directly, execute the test if __name__ == "__main__": #test_chunking_strategies() #demo_usage() # Example of using the new smart chunking: embed_document("./projects/test/dataset/all_hp_product_spec_book2506/document.txt", "./projects/test/dataset/all_hp_product_spec_book2506/smart_embedding.pkl", chunking_strategy='smart', # Use smart chunking strategy max_chunk_size=800, # smaller chunk size overlap=100) def cache_terms_embeddings(bot_id: str, terms_list: List[Dict[str, Any]]) -> Dict[str, Any]: """ Process the terms list, generate embeddings, and cache them Args: bot_id: bot ID used as the cache key terms_list: list of terms, where each term contains fields such as name, description, and synonyms Returns: Dict: dictionary containing embedding data """ if not terms_list: return {} cache_key = f"{bot_id}_terms" cache_file = f"projects/cache/{cache_key}.pkl" # Ensure the cache directory exists os.makedirs("projects/cache", exist_ok=True) # Check whether the cache exists and is valid if os.path.exists(cache_file): try: with open(cache_file, 'rb') as f: cached_data = pickle.load(f) # Verify that the cached data matches the current terms current_hash = _generate_terms_hash(terms_list) if cached_data.get('hash') == current_hash: logger.info(f"Using cached terms embeddings for {cache_key}") return cached_data except Exception as e: logger.error(f"Error loading cache: {e}") # Prepare the texts to encode term_texts = [] term_info = [] for term in terms_list: # Build the full term text for embedding term_text_parts = [] if 'name' in term and term['name']: term_text_parts.append(f"Name: {term['name']}") if 'description' in term and term['description']: term_text_parts.append(f"Description: {term['description']}") # Process synonyms synonyms = [] if 'synonyms' in term and term['synonyms']: if isinstance(term['synonyms'], list): synonyms = term['synonyms'] elif isinstance(term['synonyms'], str): synonyms = [s.strip() for s in term['synonyms'].split(',') if s.strip()] if synonyms: term_text_parts.append(f"Synonyms: {', '.join(synonyms)}") term_text = " | ".join(term_text_parts) term_texts.append(term_text) # Store the original information term_info.append({ 'name': term.get('name', ''), 'description': term.get('description', ''), 'synonyms': synonyms }) # Generate embeddings try: embeddings = encode_texts_via_api(term_texts, batch_size=16) # Prepare cache data cache_data = { 'hash': _generate_terms_hash(terms_list), 'term_info': term_info, 'embeddings': embeddings, 'texts': term_texts } # Save to cache with open(cache_file, 'wb') as f: pickle.dump(cache_data, f) logger.info(f"Cached {len(term_texts)} terms embeddings to {cache_file}") return cache_data except Exception as e: logger.error(f"Error generating terms embeddings: {e}") return {} def search_similar_terms(query_text: str, cached_terms_data: Dict[str, Any]) -> List[Dict[str, Any]]: """ Search cached terms for entries similar to the query text Args: query_text: query text cached_terms_data: cached term data Returns: List[Dict]: list of matched terms, sorted by similarity in descending order """ if not cached_terms_data or not query_text or 'embeddings' not in cached_terms_data: return [] try: # Generate an embedding for the query text query_embedding = encode_texts_via_api([query_text], batch_size=1) if len(query_embedding) == 0: return [] query_vector = query_embedding[0] term_embeddings = cached_terms_data['embeddings'] term_info = cached_terms_data['term_info'] # Add debug information logger.debug(f"DEBUG: Query text: '{query_text}'") logger.debug(f"DEBUG: Query vector shape: {query_vector.shape}, norm: {np.linalg.norm(query_vector)}") # Compute cosine similarity similarities = _cosine_similarity(query_vector, term_embeddings) logger.debug(f"DEBUG: Similarities: {similarities}") logger.debug(f"DEBUG: Max similarity: {np.max(similarities):.3f}, Mean similarity: {np.mean(similarities):.3f}") # Get similarity scores for all terms matches = [] for i, similarity in enumerate(similarities): match = { 'term_info': term_info[i], 'similarity': float(similarity), 'index': i } matches.append(match) # Sort by similarity in descending order matches.sort(key=lambda x: x['similarity'], reverse=True) # Return only the top 5 results return matches[:5] except Exception as e: logger.error(f"Error in similarity search: {e}") return [] def format_terms_analysis(similar_terms: List[Dict[str, Any]]) -> str: """ Format similar terms into the required string format Args: similar_terms: list of similar terms Returns: str: formatted term analysis """ if not similar_terms: return "" formatted_terms = [] for i, match in enumerate(similar_terms, 1): term_info = match['term_info'] similarity = match['similarity'] name = term_info.get('name', '') description = term_info.get('description', '') synonyms = term_info.get('synonyms', []) # Format synonyms synonyms_str = ', '.join(synonyms) if synonyms else 'N/A' formatted_term = f"{i}) Name: {name}, Description: {description}, Synonyms: {synonyms_str} (Similarity: {similarity:.3f})" formatted_terms.append(formatted_term) return "\n".join(formatted_terms) def _generate_terms_hash(terms_list: List[Dict[str, Any]]) -> str: """Generate a hash for the terms list to validate the cache""" # Convert the terms list into a normalized string terms_str = json.dumps(terms_list, sort_keys=True, ensure_ascii=False) return hashlib.md5(terms_str.encode('utf-8')).hexdigest() def _cosine_similarity(query_vector: np.ndarray, term_embeddings: np.ndarray) -> np.ndarray: """ Compute cosine similarity between the query vector and all term embeddings Follow the implementation in semantic_search_server.py and assume vectors are already normalized Args: query_vector: query vector (shape: [embedding_dim]) term_embeddings: term embedding matrix (shape: [n_terms, embedding_dim]) Returns: np.ndarray: similarity array (shape: [n_terms]) """ # Use the same algorithm as semantic_search_server.py if len(term_embeddings.shape) > 1: cos_scores = np.dot(term_embeddings, query_vector) / ( np.linalg.norm(term_embeddings, axis=1) * np.linalg.norm(query_vector) + 1e-8 ) else: cos_scores = np.array([0.0] * len(term_embeddings)) return cos_scores def process_terms_with_embedding(terms_list: List[Dict[str, Any]], bot_id: str, query_text: str) -> str: """ Complete term-processing flow: cache, similarity search, and formatted output Args: terms_list: list of terms bot_id: bot ID query_text: user query text Returns: str: formatted term analysis result """ if not terms_list or not query_text: return "" # 1. Cache term embeddings cached_data = cache_terms_embeddings(bot_id, terms_list) if not cached_data: return "" # 2. Search for similar terms and take the top 5 similar_terms = search_similar_terms(query_text, cached_data) # 3. Format the output if similar_terms: return format_terms_analysis(similar_terms) else: # When no similar terms are found, return an empty string or a hint message # Return an empty string here so the caller can decide how to handle it return "" # Other example calls (commented out): # split_document_by_pages("/Users/moshui/Documents/felo/qwen-agent/projects/test/dataset/all_hp_product_spec_book2506/document.txt") # embed_document("/Users/moshui/Documents/felo/qwen-agent/projects/test/dataset/all_hp_product_spec_book2506/document.txt") # uncomment to run