import pickle import re import numpy as np import os from typing import Optional, List, Dict, Any import requests import asyncio import hashlib import json import logging from utils.settings import FASTAPI_URL # Configure logger logger = logging.getLogger('app') def encode_texts_via_api(texts, batch_size=32): """通过 API 接口编码文本""" if not texts: return np.array([]) try: # FastAPI 服务地址 api_endpoint = f"{FASTAPI_URL}/api/v1/embedding/encode" # 调用编码接口 request_data = { "texts": texts, "batch_size": batch_size } response = requests.post( api_endpoint, json=request_data, timeout=60 # 增加超时时间 ) if response.status_code == 200: result_data = response.json() if result_data.get("success"): embeddings_list = result_data.get("embeddings", []) logger.info(f"API编码成功,处理了 {len(texts)} 个文本,embedding维度: {len(embeddings_list[0]) if embeddings_list else 0}") return np.array(embeddings_list) else: error_msg = result_data.get('error', '未知错误') logger.error(f"API编码失败: {error_msg}") raise Exception(f"API编码失败: {error_msg}") else: logger.error(f"API请求失败: {response.status_code} - {response.text}") raise Exception(f"API请求失败: {response.status_code}") except Exception as e: logger.error(f"API编码异常: {e}") raise def clean_text(text): """ 清理文本,去除特殊字符和无意义字符 Args: text (str): 原始文本 Returns: str: 清理后的文本 """ # 去除HTML标签 text = re.sub(r'<[^>]+>', '', text) # 去除多余的空白字符 text = re.sub(r'\s+', ' ', text) # 去除控制字符和非打印字符,但保留Unicode文字字符 text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text) # 去除首尾空白 text = text.strip() return text def is_meaningful_line(text): """ 判断一行文本是否有意义 Args: text (str): 文本行 Returns: bool: 是否有意义 """ if not text or len(text.strip()) < 5: return False # 过滤纯数字行 if text.strip().isdigit(): return False # 过滤只有符号的行 if re.match(r'^[^\w\u4e00-\u9fa5]+$', text): return False # 过滤常见的无意义行 meaningless_patterns = [ r'^[-=_]{3,}$', # 分割线 r'^第\d+页$', # 页码 r'^\d+\.\s*$', # 只有编号 r'^[a-zA-Z]\.\s*$', # 只有一个字母编号 ] for pattern in meaningless_patterns: if re.match(pattern, text.strip()): return False return True def embed_document(input_file='document.txt', output_file='embedding.pkl', chunking_strategy='line', **chunking_params): """ 读取文档文件,使用指定分块策略进行embedding,保存为pickle文件 Args: input_file (str): 输入文档文件路径 output_file (str): 输出pickle文件路径 chunking_strategy (str): 分块策略,可选 'line', 'paragraph' **chunking_params: 分块参数 - 对于 'line' 策略:无额外参数 - 对于 'paragraph' 策略: - max_chunk_size: 最大chunk大小(默认1000) - overlap: 重叠大小(默认100) - min_chunk_size: 最小chunk大小(默认200) - separator: 段落分隔符(默认'\n') """ try: with open(input_file, 'r', encoding='utf-8') as f: content = f.read() chunks = [] if chunking_strategy == 'line': # 原有的按行处理逻辑 lines = content.split('\n') original_count = len(lines) for line in lines: # 清理文本 cleaned_text = clean_text(line) # 检查是否有意义 if is_meaningful_line(cleaned_text): chunks.append(cleaned_text) logger.info(f"使用按行分块策略") logger.info(f"原始行数: {original_count}") logger.info(f"清理后有效句子数: {len(chunks)}") logger.info(f"过滤比例: {((original_count - len(chunks)) / original_count * 100):.1f}%") elif chunking_strategy == 'paragraph': # 新的段落级分块策略 # 设置默认参数 params = { 'max_chunk_size': 1000, 'overlap': 100, 'min_chunk_size': 200, 'separator': '\n' } params.update(chunking_params) # 先清理整个文档的空白字符 cleaned_content = clean_text(content) # 使用段落分块 chunks = paragraph_chunking(cleaned_content, **params) logger.info(f"使用段落级分块策略") logger.info(f"文档总长度: {len(content)} 字符") logger.info(f"分块数量: {len(chunks)}") if chunks: logger.debug(f"平均chunk大小: {sum(len(chunk) for chunk in chunks) / len(chunks):.1f} 字符") logger.debug(f"最大chunk大小: {max(len(chunk) for chunk in chunks)} 字符") logger.debug(f"最小chunk大小: {min(len(chunk) for chunk in chunks)} 字符") elif chunking_strategy == 'smart': # 智能分块策略,自动检测文档格式 params = { 'max_chunk_size': 1000, 'overlap': 100, 'min_chunk_size': 200 } params.update(chunking_params) # 使用智能分块 chunks = smart_chunking(content, **params) logger.info(f"使用智能分块策略") logger.info(f"文档总长度: {len(content)} 字符") logger.info(f"分块数量: {len(chunks)}") if chunks: logger.debug(f"平均chunk大小: {sum(len(chunk) for chunk in chunks) / len(chunks):.1f} 字符") logger.debug(f"最大chunk大小: {max(len(chunk) for chunk in chunks)} 字符") logger.debug(f"最小chunk大小: {min(len(chunk) for chunk in chunks)} 字符") else: raise ValueError(f"不支持的分块策略: {chunking_strategy}") if not chunks: logger.warning("警告:没有找到有效的内容块!") return None logger.info(f"正在处理 {len(chunks)} 个内容块...") # 使用API接口进行编码 logger.info("使用API接口进行编码...") chunk_embeddings = encode_texts_via_api(chunks, batch_size=32) embedding_data = { 'chunks': chunks, 'embeddings': chunk_embeddings, 'chunking_strategy': chunking_strategy, 'chunking_params': chunking_params, 'model_path': 'api_service' } with open(output_file, 'wb') as f: pickle.dump(embedding_data, f) logger.info(f"已保存嵌入向量到 {output_file}") return embedding_data except FileNotFoundError: logger.error(f"错误:找不到文件 {input_file}") return None except Exception as e: logger.error(f"处理文档时出错:{e}") return None def paragraph_chunking(text, max_chunk_size=1000, overlap=100, min_chunk_size=200, separator='\n\n'): """ 段落级智能分块函数 - 使用固定chunk大小分块,不按页面分割 Args: text (str): 输入文本 max_chunk_size (int): 最大chunk大小(字符数) overlap (int): 重叠部分大小(字符数) min_chunk_size (int): 最小chunk大小(字符数) separator (str): 段落分隔符 Returns: list: 分块后的文本列表 """ if not text or not text.strip(): return [] # 直接使用固定长度分块策略,不考虑页面标记 return _fixed_length_chunking(text, max_chunk_size, overlap, min_chunk_size) def _split_long_content(content, max_size, min_size, separator): """ 拆分过长的内容 Args: content (str): 要拆分的内容 max_size (int): 最大大小 min_size (int): 最小大小 separator (str): 分隔符 Returns: list: 拆分后的块列表 """ if len(content) <= max_size: return [content] # 尝试按段落拆分 paragraphs = content.split(separator) if len(paragraphs) > 1: chunks = [] current_chunk = "" for para in paragraphs: if not current_chunk: current_chunk = para elif len(current_chunk + separator + para) <= max_size: current_chunk += separator + para else: if current_chunk: chunks.append(current_chunk) current_chunk = para if current_chunk: chunks.append(current_chunk) return chunks # 如果不能按段落拆分,按句子拆分 sentences = _split_into_sentences(content) chunks = [] current_chunk = "" for sentence in sentences: if not current_chunk: current_chunk = sentence elif len(current_chunk + " " + sentence) <= max_size: current_chunk += " " + sentence else: if current_chunk: chunks.append(current_chunk) current_chunk = sentence if current_chunk: chunks.append(current_chunk) return chunks def _split_into_sentences(text): """ 将文本拆分为句子 Args: text (str): 输入文本 Returns: list: 句子列表 """ # 简单的句子分割(可以根据需要改进) import re # 按句号、问号、感叹号分割,但保留数字中的点 sentence_endings = re.compile(r'(?<=[.!?])\s+(?=[\dA-Z\u4e00-\u9fa5])') sentences = sentence_endings.split(text.strip()) return [s.strip() for s in sentences if s.strip()] def _create_overlap_chunk(previous_chunk, new_paragraph, overlap_size): """ 创建带有重叠内容的新chunk Args: previous_chunk (str): 前一个chunk new_paragraph (str): 新段落 overlap_size (int): 重叠大小 Returns: str: 带重叠的新chunk """ if overlap_size <= 0: return new_paragraph # 从前一个chunk的末尾获取重叠内容 overlap_text = previous_chunk[-overlap_size:] if len(previous_chunk) > overlap_size else previous_chunk # 尝试在句子边界处分割重叠内容 sentences = _split_into_sentences(overlap_text) if len(sentences) > 1: # 去掉可能不完整的第一个句子 overlap_text = " ".join(sentences[1:]) elif len(overlap_text) > overlap_size * 0.5: # 如果只有一个句子且长度合适,保留它 pass else: # 重叠内容太少,不使用重叠 return new_paragraph return overlap_text + "\n\n" + new_paragraph def _add_overlap_to_chunk(previous_chunk, current_chunk, overlap_size): """ 为当前chunk添加与前一个chunk的重叠 Args: previous_chunk (str): 前一个chunk current_chunk (str): 当前chunk overlap_size (int): 重叠大小 Returns: str: 带重叠的chunk """ if overlap_size <= 0: return current_chunk # 从前一个chunk的末尾获取重叠内容 overlap_text = previous_chunk[-overlap_size:] if len(previous_chunk) > overlap_size else previous_chunk # 尝试在句子边界处分割 sentences = _split_into_sentences(overlap_text) if len(sentences) > 1: overlap_text = " ".join(sentences[1:]) return overlap_text + "\n\n" + current_chunk def smart_chunking(text, max_chunk_size=1000, overlap=100, min_chunk_size=200): """ 智能分块函数,自动检测文档格式并选择最佳分块策略 Args: text (str): 输入文本 max_chunk_size (int): 最大chunk大小(字符数) overlap (int): 重叠部分大小(字符数) min_chunk_size (int): 最小chunk大小(字符数) Returns: list: 分块后的文本列表 """ if not text or not text.strip(): return [] # 检测文档类型(支持 # Page 和 # File 格式) has_page_markers = '# Page' in text or '# File' in text has_paragraph_breaks = '\n\n' in text has_line_breaks = '\n' in text # 选择合适的分隔符和策略 if has_page_markers: # 使用页面分隔符 return _page_based_chunking(text, max_chunk_size, overlap, min_chunk_size) elif has_paragraph_breaks: # 使用段落分隔符 return paragraph_chunking(text, max_chunk_size, overlap, min_chunk_size, '\n\n') elif has_line_breaks: # 使用行分隔符 return _line_based_chunking(text, max_chunk_size, overlap, min_chunk_size) else: # 按固定长度分块 return _fixed_length_chunking(text, max_chunk_size, overlap, min_chunk_size) def _page_based_chunking(text, max_chunk_size, overlap, min_chunk_size): """基于页面的分块策略""" import re # 使用正则表达式分割页面(支持 # Page 和 # File 格式) page_pattern = r'#\s*(Page\s+\d+|File\s+[^\n]+)' pages = re.split(page_pattern, text) # 清理和过滤页面内容 cleaned_pages = [] for page in pages: page = page.strip() if page and len(page) > min_chunk_size * 0.3: # 过滤太小的页面 cleaned_pages.append(page) if not cleaned_pages: return [] # 如果页面内容过大,需要进一步分割 chunks = [] for page in cleaned_pages: if len(page) <= max_chunk_size: chunks.append(page) else: # 页面过大,需要分割 sub_chunks = _split_long_content(page, max_chunk_size, min_chunk_size, '\n') chunks.extend(sub_chunks) # 添加重叠 if overlap > 0 and len(chunks) > 1: chunks = _add_overlaps_to_chunks(chunks, overlap) return chunks def _line_based_chunking(text, max_chunk_size, overlap, min_chunk_size): """基于行的分块策略""" lines = text.split('\n') chunks = [] current_chunk = "" for line in lines: line = line.strip() if not line: continue if not current_chunk: current_chunk = line elif len(current_chunk + '\n' + line) <= max_chunk_size: current_chunk += '\n' + line else: if len(current_chunk) >= min_chunk_size: chunks.append(current_chunk) current_chunk = _create_overlap_for_line(current_chunk, line, overlap) else: # 当前行太长,需要分割 split_chunks = _split_long_content(current_chunk + '\n' + line, max_chunk_size, min_chunk_size, '\n') if chunks and split_chunks: split_chunks[0] = _add_overlap_to_chunk(chunks[-1], split_chunks[0], overlap) chunks.extend(split_chunks[:-1]) current_chunk = split_chunks[-1] if split_chunks else "" if current_chunk and len(current_chunk) >= min_chunk_size: chunks.append(current_chunk) elif current_chunk and chunks: chunks[-1] += '\n' + current_chunk return chunks def _fixed_length_chunking(text, max_chunk_size, overlap, min_chunk_size): """固定长度分块策略""" chunks = [] start = 0 while start < len(text): end = start + max_chunk_size if end >= len(text): chunks.append(text[start:]) break # 尝试在句号、问号或感叹号处分割 split_pos = end for i in range(end, max(start, end - 100), -1): if text[i] in '.!?。!?': split_pos = i + 1 break chunk = text[start:split_pos] if len(chunk) >= min_chunk_size: chunks.append(chunk) start = split_pos - overlap if overlap > 0 else split_pos else: start += max_chunk_size // 2 return chunks def _create_overlap_for_line(previous_chunk, new_line, overlap_size): """为行分块创建重叠""" if overlap_size <= 0: return new_line # 从前一个chunk的末尾获取重叠内容 overlap_text = previous_chunk[-overlap_size:] if len(previous_chunk) > overlap_size else previous_chunk # 尝试在合适的边界分割 last_newline = overlap_text.rfind('\n') if last_newline > 0: overlap_text = overlap_text[last_newline + 1:] return overlap_text + '\n' + new_line def _add_overlaps_to_chunks(chunks, overlap_size): """为chunks添加重叠""" if overlap_size <= 0 or len(chunks) <= 1: return chunks result = [chunks[0]] for i in range(1, len(chunks)): previous_chunk = chunks[i-1] current_chunk = chunks[i] # 添加重叠 overlap_text = previous_chunk[-overlap_size:] if len(previous_chunk) > overlap_size else previous_chunk # 尝试在合适的边界分割 last_newline = overlap_text.rfind('\n') if last_newline > 0: overlap_text = overlap_text[last_newline + 1:] elif '.' in overlap_text: # 尝试在句号处分割 last_period = overlap_text.rfind('.') if last_period > 0: overlap_text = overlap_text[last_period + 1:].strip() if overlap_text: combined_chunk = overlap_text + '\n\n' + current_chunk result.append(combined_chunk) else: result.append(current_chunk) return result def split_document_by_pages(input_file='document.txt', output_file='pagination.txt'): """ 按页或文件分割document.txt文件,将每页内容整理成一行写入pagination.txt Args: input_file (str): 输入文档文件路径 output_file (str): 输出序列化文件路径 """ try: with open(input_file, 'r', encoding='utf-8') as f: lines = f.readlines() pages = [] current_page = [] for line in lines: line = line.strip() # 检查是否是页分隔符(支持 # Page 和 # File 格式) if re.match(r'^#\s*(Page|File)', line, re.IGNORECASE): # 如果当前页有内容,保存当前页 if current_page: # 将当前页内容合并成一行 page_content = ' '.join(current_page).strip() if page_content: # 只保存非空页面 pages.append(page_content) current_page = [] continue # 如果不是页分隔符且有内容,添加到当前页 if line: current_page.append(line) # 处理最后一页 if current_page: page_content = ' '.join(current_page).strip() if page_content: pages.append(page_content) logger.info(f"总共分割出 {len(pages)} 页") # 写入序列化文件 with open(output_file, 'w', encoding='utf-8') as f: for i, page_content in enumerate(pages, 1): f.write(f"{page_content}\n") logger.info(f"已将页面内容序列化到 {output_file}") return pages except FileNotFoundError: logger.error(f"错误:找不到文件 {input_file}") return [] except Exception as e: logger.error(f"分割文档时出错:{e}") return [] def test_chunking_strategies(): """ 测试不同的分块策略,比较效果 """ # 测试文本 test_text = """ 第一段:这是一个测试段落。包含了多个句子。这是为了测试分块功能。 第二段:这是另一个段落。它也包含了多个句子,用来验证分块策略的效果。我们需要确保分块的质量。 第三段:这是第三个段落,内容比较长,包含了更多的信息。这个段落可能会触发分块逻辑,因为它可能会超过最大chunk大小的限制。我们需要确保在这种情况下,分块算法能够正确地处理,并且在句子边界进行分割。 第四段:这是第四个段落。它相对较短。 第五段:这是最后一个段落。它用来测试分块策略的完整性和准确性。 """ logger.debug("=" * 60) logger.debug("分块策略测试") logger.debug("=" * 60) # 测试1: 段落级分块(小chunk) logger.debug("\n1. 段落级分块 - 小chunk (max_size=200):") chunks_small = paragraph_chunking(test_text, max_chunk_size=200, overlap=50) for i, chunk in enumerate(chunks_small): logger.debug(f"Chunk {i+1} (长度: {len(chunk)}): {chunk[:50]}...") # 测试2: 段落级分块(大chunk) logger.debug("\n2. 段落级分块 - 大chunk (max_size=500):") chunks_large = paragraph_chunking(test_text, max_chunk_size=500, overlap=100) for i, chunk in enumerate(chunks_large): logger.debug(f"Chunk {i+1} (长度: {len(chunk)}): {chunk[:50]}...") # 测试3: 段落级分块(无重叠) logger.debug("\n3. 段落级分块 - 无重叠:") chunks_no_overlap = paragraph_chunking(test_text, max_chunk_size=300, overlap=0) for i, chunk in enumerate(chunks_no_overlap): logger.debug(f"Chunk {i+1} (长度: {len(chunk)}): {chunk[:50]}...") logger.debug(f"\n测试总结:") logger.debug(f"- 小chunk策略: {len(chunks_small)} 个chunks") logger.debug(f"- 大chunk策略: {len(chunks_large)} 个chunks") logger.debug(f"- 无重叠策略: {len(chunks_no_overlap)} 个chunks") def demo_usage(): """ 演示如何使用新的分块功能 """ logger.debug("=" * 60) logger.debug("使用示例") logger.debug("=" * 60) logger.debug("\n1. 使用传统的按行分块:") logger.debug("embed_document('document.txt', 'line_embedding.pkl', chunking_strategy='line')") logger.debug("\n2. 使用段落级分块(默认参数):") logger.debug("embed_document('document.txt', 'paragraph_embedding.pkl', chunking_strategy='paragraph')") logger.debug("\n3. 使用自定义参数的段落级分块:") logger.debug("embed_document('document.txt', 'custom_embedding.pkl',") logger.debug(" chunking_strategy='paragraph',") logger.debug(" max_chunk_size=1500,") logger.debug(" overlap=200,") logger.debug(" min_chunk_size=300)") # 如果直接运行此文件,执行测试 if __name__ == "__main__": #test_chunking_strategies() #demo_usage() # 使用新的智能分块示例: embed_document("./projects/test/dataset/all_hp_product_spec_book2506/document.txt", "./projects/test/dataset/all_hp_product_spec_book2506/smart_embedding.pkl", chunking_strategy='smart', # 使用智能分块策略 max_chunk_size=800, # 较小的chunk大小 overlap=100) def cache_terms_embeddings(bot_id: str, terms_list: List[Dict[str, Any]]) -> Dict[str, Any]: """ 处理terms列表,生成embedding并缓存 Args: bot_id: 机器人ID,用于缓存key terms_list: terms列表,每个term包含name, description, synonyms等字段 Returns: Dict: 包含embedding数据的字典 """ if not terms_list: return {} cache_key = f"{bot_id}_terms" cache_file = f"projects/cache/{cache_key}.pkl" # 确保cache目录存在 os.makedirs("projects/cache", exist_ok=True) # 检查缓存是否存在且有效 if os.path.exists(cache_file): try: with open(cache_file, 'rb') as f: cached_data = pickle.load(f) # 验证缓存数据是否匹配当前的terms current_hash = _generate_terms_hash(terms_list) if cached_data.get('hash') == current_hash: logger.info(f"Using cached terms embeddings for {cache_key}") return cached_data except Exception as e: logger.error(f"Error loading cache: {e}") # 准备要编码的文本 term_texts = [] term_info = [] for term in terms_list: # 构建term的完整文本用于embedding term_text_parts = [] if 'name' in term and term['name']: term_text_parts.append(f"Name: {term['name']}") if 'description' in term and term['description']: term_text_parts.append(f"Description: {term['description']}") # 处理同义词 synonyms = [] if 'synonyms' in term and term['synonyms']: if isinstance(term['synonyms'], list): synonyms = term['synonyms'] elif isinstance(term['synonyms'], str): synonyms = [s.strip() for s in term['synonyms'].split(',') if s.strip()] if synonyms: term_text_parts.append(f"Synonyms: {', '.join(synonyms)}") term_text = " | ".join(term_text_parts) term_texts.append(term_text) # 保存原始信息 term_info.append({ 'name': term.get('name', ''), 'description': term.get('description', ''), 'synonyms': synonyms }) # 生成embeddings try: embeddings = encode_texts_via_api(term_texts, batch_size=16) # 准备缓存数据 cache_data = { 'hash': _generate_terms_hash(terms_list), 'term_info': term_info, 'embeddings': embeddings, 'texts': term_texts } # 保存到缓存 with open(cache_file, 'wb') as f: pickle.dump(cache_data, f) logger.info(f"Cached {len(term_texts)} terms embeddings to {cache_file}") return cache_data except Exception as e: logger.error(f"Error generating terms embeddings: {e}") return {} def search_similar_terms(query_text: str, cached_terms_data: Dict[str, Any]) -> List[Dict[str, Any]]: """ 在缓存的terms中搜索与查询文本相似的terms Args: query_text: 查询文本 cached_terms_data: 缓存的terms数据 Returns: List[Dict]: 匹配的terms列表,按相似度降序排列 """ if not cached_terms_data or not query_text or 'embeddings' not in cached_terms_data: return [] try: # 生成查询文本的embedding query_embedding = encode_texts_via_api([query_text], batch_size=1) if len(query_embedding) == 0: return [] query_vector = query_embedding[0] term_embeddings = cached_terms_data['embeddings'] term_info = cached_terms_data['term_info'] # 添加调试信息 logger.debug(f"DEBUG: Query text: '{query_text}'") logger.debug(f"DEBUG: Query vector shape: {query_vector.shape}, norm: {np.linalg.norm(query_vector)}") # 计算cos相似度 similarities = _cosine_similarity(query_vector, term_embeddings) logger.debug(f"DEBUG: Similarities: {similarities}") logger.debug(f"DEBUG: Max similarity: {np.max(similarities):.3f}, Mean similarity: {np.mean(similarities):.3f}") # 获取所有terms的相似度 matches = [] for i, similarity in enumerate(similarities): match = { 'term_info': term_info[i], 'similarity': float(similarity), 'index': i } matches.append(match) # 按相似度降序排列 matches.sort(key=lambda x: x['similarity'], reverse=True) # 只返回top5结果 return matches[:5] except Exception as e: logger.error(f"Error in similarity search: {e}") return [] def format_terms_analysis(similar_terms: List[Dict[str, Any]]) -> str: """ 格式化相似terms为指定格式的字符串 Args: similar_terms: 相似terms列表 Returns: str: 格式化后的terms分析 """ if not similar_terms: return "" formatted_terms = [] for i, match in enumerate(similar_terms, 1): term_info = match['term_info'] similarity = match['similarity'] name = term_info.get('name', '') description = term_info.get('description', '') synonyms = term_info.get('synonyms', []) # 格式化同义词 synonyms_str = ', '.join(synonyms) if synonyms else 'N/A' formatted_term = f"{i}) Name: {name}, Description: {description}, Synonyms: {synonyms_str} (Similarity: {similarity:.3f})" formatted_terms.append(formatted_term) return "\n".join(formatted_terms) def _generate_terms_hash(terms_list: List[Dict[str, Any]]) -> str: """生成terms列表的哈希值用于缓存验证""" # 将terms列表转换为标准化的字符串 terms_str = json.dumps(terms_list, sort_keys=True, ensure_ascii=False) return hashlib.md5(terms_str.encode('utf-8')).hexdigest() def _cosine_similarity(query_vector: np.ndarray, term_embeddings: np.ndarray) -> np.ndarray: """ 计算查询向量与所有term embeddings的cos相似度 参考semantic_search_server.py的实现,假设向量已经归一化 Args: query_vector: 查询向量 (shape: [embedding_dim]) term_embeddings: term embeddings矩阵 (shape: [n_terms, embedding_dim]) Returns: np.ndarray: 相似度数组 (shape: [n_terms]) """ # 使用与semantic_search_server.py相同的算法 if len(term_embeddings.shape) > 1: cos_scores = np.dot(term_embeddings, query_vector) / ( np.linalg.norm(term_embeddings, axis=1) * np.linalg.norm(query_vector) + 1e-8 ) else: cos_scores = np.array([0.0] * len(term_embeddings)) return cos_scores def process_terms_with_embedding(terms_list: List[Dict[str, Any]], bot_id: str, query_text: str) -> str: """ 完整的terms处理流程:缓存、搜索相似度、格式化输出 Args: terms_list: terms列表 bot_id: 机器人ID query_text: 用户查询文本 Returns: str: 格式化后的terms分析结果 """ if not terms_list or not query_text: return "" # 1. 缓存terms的embeddings cached_data = cache_terms_embeddings(bot_id, terms_list) if not cached_data: return "" # 2. 搜索相似的terms (取top5) similar_terms = search_similar_terms(query_text, cached_data) # 3. 格式化输出 if similar_terms: return format_terms_analysis(similar_terms) else: # 当没有找到相似terms时,可以返回空字符串或者提示信息 # 这里返回空字符串,让调用方决定如何处理 return "" # 其他示例调用(注释掉的): # split_document_by_pages("/Users/moshui/Documents/felo/qwen-agent/projects/test/dataset/all_hp_product_spec_book2506/document.txt") # embed_document("/Users/moshui/Documents/felo/qwen-agent/projects/test/dataset/all_hp_product_spec_book2506/document.txt") # 取消注释来运行