From 653ee4af13dda29e5101fd27536abe454c85ef7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Fri, 19 Dec 2025 11:15:50 +0800 Subject: [PATCH] add logs --- apps/common/event/listener_manage.py | 79 ++++++++++++++---- .../handle/impl/media/media_split_handle.py | 81 +++++++++++++++---- apps/common/utils/page_utils.py | 15 ++++ apps/knowledge/tasks/embedding.py | 15 +++- apps/knowledge/tasks/media_learning.py | 46 ++++++++--- 5 files changed, 192 insertions(+), 44 deletions(-) diff --git a/apps/common/event/listener_manage.py b/apps/common/event/listener_manage.py index 167e5bc4..e447145a 100644 --- a/apps/common/event/listener_manage.py +++ b/apps/common/event/listener_manage.py @@ -126,17 +126,35 @@ class ListenerManagement: @param paragraph_id: 段落id @param embedding_model: 向量模型 """ - maxkb_logger.info(_('Start--->Embedding paragraph: {paragraph_id}').format(paragraph_id=paragraph_id)) + maxkb_logger.info(f"🚀 Starting embedding for paragraph: {paragraph_id}") + + # 获取段落信息 + paragraph = QuerySet(Paragraph).filter(id=paragraph_id).first() + if paragraph: + content_length = len(paragraph.content) + maxkb_logger.info(f"📄 Paragraph info - title: '{paragraph.title}', content length: {content_length}") + maxkb_logger.info(f"📝 Content preview: '{paragraph.content[:100]}...'") + else: + maxkb_logger.error(f"❌ Paragraph {paragraph_id} not found in database") + return + # 更新到开始状态 + maxkb_logger.info(f"🔄 Updating paragraph status to STARTED") ListenerManagement.update_status(QuerySet(Paragraph).filter(id=paragraph_id), TaskType.EMBEDDING, State.STARTED) + try: + maxkb_logger.info(f"🔍 Searching data for paragraph embedding") data_list = native_search( {'problem': QuerySet(get_dynamics_model({'paragraph.id': django.db.models.CharField()})).filter( **{'paragraph.id': paragraph_id}), 'paragraph': QuerySet(Paragraph).filter(id=paragraph_id)}, select_string=get_file_content( os.path.join(PROJECT_DIR, "apps", "common", 'sql', 'list_embedding_text.sql'))) - # 删除段落 + + maxkb_logger.info(f"📊 Found {len(data_list) if data_list else 0} data items for embedding") + + # 删除旧向量 + maxkb_logger.info(f"🗑️ Deleting old embedding for paragraph {paragraph_id}") VectorStore.get_embedding_vector().delete_by_paragraph_id(paragraph_id) def is_the_task_interrupted(): @@ -146,13 +164,17 @@ class ListenerManagement: return False # 批量向量化 + maxkb_logger.info(f"💾 Saving embeddings to vector store") VectorStore.get_embedding_vector().batch_save(data_list, embedding_model, is_the_task_interrupted) - # 更新到开始状态 + + # 更新到成功状态 + maxkb_logger.info(f"✅ Updating paragraph status to SUCCESS") ListenerManagement.update_status(QuerySet(Paragraph).filter(id=paragraph_id), TaskType.EMBEDDING, State.SUCCESS) except Exception as e: - maxkb_logger.error(_('Vectorized paragraph: {paragraph_id} error {error} {traceback}').format( - paragraph_id=paragraph_id, error=str(e), traceback=traceback.format_exc())) + maxkb_logger.error(f"❌ Failed to embed paragraph {paragraph_id}: {str(e)}") + maxkb_logger.error(f"🔍 Full error traceback: {traceback.format_exc()}") + maxkb_logger.info(f"🔄 Updating paragraph status to FAILURE") ListenerManagement.update_status(QuerySet(Paragraph).filter(id=paragraph_id), TaskType.EMBEDDING, State.FAILURE) finally: @@ -166,10 +188,24 @@ class ListenerManagement: @staticmethod def get_embedding_paragraph_apply(embedding_model, is_the_task_interrupted, post_apply=lambda: None): def embedding_paragraph_apply(paragraph_list): - for paragraph in paragraph_list: + maxkb_logger.info(f"🔍 Processing batch of {len(paragraph_list)} paragraphs for embedding") + + for idx, paragraph in enumerate(paragraph_list): if is_the_task_interrupted(): + maxkb_logger.warning(f"⚠️ Embedding task interrupted, stopping at paragraph {idx+1}/{len(paragraph_list)}") break - ListenerManagement.embedding_by_paragraph(str(paragraph.get('id')), embedding_model) + + paragraph_id = str(paragraph.get('id')) + maxkb_logger.info(f"📊 Embedding paragraph {idx+1}/{len(paragraph_list)}: ID={paragraph_id}") + + try: + ListenerManagement.embedding_by_paragraph(paragraph_id, embedding_model) + maxkb_logger.info(f"✅ Successfully embedded paragraph {paragraph_id}") + except Exception as e: + maxkb_logger.error(f"❌ Failed to embed paragraph {paragraph_id}: {str(e)}") + # 继续处理其他段落,不中断整个批次 + + maxkb_logger.info(f"🎉 Completed embedding batch processing") post_apply() return embedding_paragraph_apply @@ -272,21 +308,32 @@ class ListenerManagement: return False if is_the_task_interrupted(): + maxkb_logger.warning(f"⚠️ Embedding task for document {document_id} was interrupted before starting") return - maxkb_logger.info(_('Start--->Embedding document: {document_id}').format(document_id=document_id) - ) + + maxkb_logger.info(f"🚀 Starting embedding for document: {document_id}") # 批量修改状态为STARTED ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING, State.STARTED) - # 根据段落进行向量化处理 - page_desc(QuerySet(Paragraph) - .annotate( + # 获取需要处理的段落 + paragraph_queryset = QuerySet(Paragraph).annotate( reversed_status=Reverse('status'), - task_type_status=Substr('reversed_status', TaskType.EMBEDDING.value, - 1), - ).filter(task_type_status__in=state_list, document_id=document_id) - .values('id'), 5, + task_type_status=Substr('reversed_status', TaskType.EMBEDDING.value, 1), + ).filter(task_type_status__in=state_list, document_id=document_id).values('id') + + # 统计需要处理的段落数量 + total_paragraphs = paragraph_queryset.count() + maxkb_logger.info(f"📊 Found {total_paragraphs} paragraphs to embed for document {document_id}") + + if total_paragraphs == 0: + maxkb_logger.warning(f"⚠️ No paragraphs found for embedding in document {document_id}") + return + + # 根据段落进行向量化处理(每批5个) + batch_size = 5 + maxkb_logger.info(f"🔄 Processing paragraphs in batches of {batch_size}") + page_desc(paragraph_queryset, batch_size, ListenerManagement.get_embedding_paragraph_apply(embedding_model, is_the_task_interrupted, ListenerManagement.get_aggregation_document_status( document_id)), diff --git a/apps/common/handle/impl/media/media_split_handle.py b/apps/common/handle/impl/media/media_split_handle.py index a80aa70c..d1432205 100644 --- a/apps/common/handle/impl/media/media_split_handle.py +++ b/apps/common/handle/impl/media/media_split_handle.py @@ -31,29 +31,50 @@ class MediaSplitHandle(BaseSplitHandle): """处理音视频文件""" maxkb_logger.info(f"MediaSplitHandle.handle called with file: {file.name}") - maxkb_logger.info(f"Split parameters - limit: {limit}, patterns: {pattern_list}, with_filter: {with_filter}") + maxkb_logger.info(f"📋 Split parameters - limit: {limit}, patterns: {pattern_list}, with_filter: {with_filter}") # 检查是否需要实际处理 use_actual_processing = kwargs.get('use_actual_processing', False) stt_model_id = kwargs.get('stt_model_id') + llm_model_id = kwargs.get('llm_model_id') + workspace_id = kwargs.get('workspace_id') + + maxkb_logger.info(f"🎯 Processing mode: {'Actual processing' if use_actual_processing else 'Default text'}") + maxkb_logger.info(f"🔧 Model IDs - STT: {stt_model_id}, LLM: {llm_model_id}, Workspace: {workspace_id}") if use_actual_processing and stt_model_id: # 进行实际处理 + maxkb_logger.info(f"🎬 Starting actual media processing for {file.name}") result = self._handle_actual_processing(file, get_buffer, **kwargs) + # 检查处理结果 + paragraphs_count = len(result.get('content', [])) + maxkb_logger.info(f"📝 Actual processing generated {paragraphs_count} paragraphs") + # 应用智能分块(如果需要) chunk_limit = limit if limit > 0 else 1000 # 默认1000字符 - if len(result.get('content', [])) > 0: + maxkb_logger.info(f"🔧 Chunk limit set to: {chunk_limit}") + + if paragraphs_count > 0: + maxkb_logger.info(f"✂️ Applying smart split to {paragraphs_count} paragraphs") result = self._apply_smart_split(result, chunk_limit, with_filter) + final_chunks = len(result.get('content', [])) + maxkb_logger.info(f"✅ Smart split completed, final chunks: {final_chunks}") + else: + maxkb_logger.warning(f"⚠️ No paragraphs generated from actual processing") return result else: # 使用默认文本 + maxkb_logger.info(f"📄 Using default text mode for {file.name}") result = self._handle_default_text(file, **kwargs) # 即使是默认文本也应用分块(如果有limit参数) if limit > 0: + maxkb_logger.info(f"✂️ Applying smart split to default text with limit {limit}") result = self._apply_smart_split(result, limit, with_filter) + final_chunks = len(result.get('content', [])) + maxkb_logger.info(f"✅ Smart split completed, final chunks: {final_chunks}") return result @@ -362,43 +383,68 @@ class MediaSplitHandle(BaseSplitHandle): """应用智能分块到转录结果""" overlap = 100 # 前后重叠字符数 + maxkb_logger.info(f"🔧 Starting smart split process - limit: {limit}, with_filter: {with_filter}") + original_paragraphs = result.get('content', []) + maxkb_logger.info(f"📊 Original paragraphs count: {len(original_paragraphs)}") + new_paragraphs = [] - for paragraph in result.get('content', []): + total_chunks_created = 0 + + for idx, paragraph in enumerate(original_paragraphs): + maxkb_logger.info(f"🔍 Processing paragraph {idx+1}/{len(original_paragraphs)}: {paragraph.get('title', 'Untitled')}") content = paragraph.get('content', '') + content_length = len(content) + maxkb_logger.info(f"📏 Paragraph {idx+1} content length: {content_length} characters") # 应用文本过滤(如果需要) if with_filter: + maxkb_logger.info(f"🧹 Applying text filter to paragraph {idx+1}") content = self._clean_text(content) + maxkb_logger.info(f"🧹 Filtered content length: {len(content)} characters") if content: - # 应用智能分块 - chunks = self.smart_split_transcription(content, limit, overlap) + # 判断是否需要分块 + if content_length > limit: + maxkb_logger.info(f"✂️ Paragraph {idx+1} needs splitting (length {content_length} > limit {limit})") + + # 应用智能分块 + chunks = self.smart_split_transcription(content, limit, overlap) + maxkb_logger.info(f"✂️ Split paragraph {idx+1} into {len(chunks)} chunks") + + # 记录每个chunk的详细信息 + for c_idx, chunk in enumerate(chunks): + maxkb_logger.info(f"📦 Chunk {c_idx+1}/{len(chunks)}: length={len(chunk)}, preview='{chunk[:50]}...'") - # 如果只有一个块且没有变化,保持原样 - if len(chunks) == 1 and chunks[0] == content: - new_paragraphs.append(paragraph) - else: # 创建新的段落 - for idx, chunk in enumerate(chunks): + for c_idx, chunk in enumerate(chunks): # 保留原始元数据,但更新分段相关信息 metadata = paragraph.get('metadata', {}).copy() metadata.update({ - 'chunk_index': idx, + 'chunk_index': c_idx, 'total_chunks': len(chunks), 'split_method': 'smart_transcription', 'split_limit': limit, 'split_overlap': overlap, - 'with_filter': with_filter + 'with_filter': with_filter, + 'original_paragraph_index': idx, + 'original_content_length': content_length }) new_paragraph = { 'content': chunk, - 'title': f"{paragraph.get('title', '段落')} - 第{idx + 1}部分" if len(chunks) > 1 else paragraph.get('title', '段落'), + 'title': f"{paragraph.get('title', '段落')} - 第{c_idx + 1}部分" if len(chunks) > 1 else paragraph.get('title', '段落'), 'metadata': metadata } new_paragraphs.append(new_paragraph) + total_chunks_created += 1 + else: + maxkb_logger.info(f"📄 Paragraph {idx+1} does not need splitting (length {content_length} <= limit {limit})") + new_paragraphs.append(paragraph) + total_chunks_created += 1 else: + maxkb_logger.warning(f"⚠️ Paragraph {idx+1} has empty content after processing") new_paragraphs.append(paragraph) + total_chunks_created += 1 # 更新结果 result['content'] = new_paragraphs @@ -407,9 +453,16 @@ class MediaSplitHandle(BaseSplitHandle): metadata = result.get('metadata', {}) metadata['smart_split_applied'] = True metadata['total_chunks'] = len(new_paragraphs) + metadata['original_paragraphs'] = len(original_paragraphs) + metadata['split_parameters'] = { + 'limit': limit, + 'overlap': overlap, + 'with_filter': with_filter + } result['metadata'] = metadata - maxkb_logger.info(f"Applied smart transcription split: {len(new_paragraphs)} chunks") + maxkb_logger.info(f"✅ Smart split completed - original: {len(original_paragraphs)} paragraphs, final: {len(new_paragraphs)} chunks") + maxkb_logger.info(f"📈 Total chunks created: {total_chunks_created}") return result diff --git a/apps/common/utils/page_utils.py b/apps/common/utils/page_utils.py index 61c52920..de476d7d 100644 --- a/apps/common/utils/page_utils.py +++ b/apps/common/utils/page_utils.py @@ -37,11 +37,26 @@ def page_desc(query_set, page_size, handler, is_the_task_interrupted=lambda: Fal @param is_the_task_interrupted: 任务是否被中断 @return: """ + from common.utils.logger import maxkb_logger + query = query_set.order_by("id") count = query_set.count() + + maxkb_logger.info(f"🔍 page_desc: Processing {count} items in batches of {page_size}") + maxkb_logger.info(f"📊 Total batches to process: {ceil(count / page_size)}") + + batch_count = 0 for i in sorted(range(0, ceil(count / page_size)), reverse=True): if is_the_task_interrupted(): + maxkb_logger.warning(f"⚠️ page_desc: Task interrupted during batch processing") return + + batch_count += 1 offset = i * page_size paragraph_list = query.all()[offset: offset + page_size] + + maxkb_logger.info(f"🔄 Processing batch {batch_count}/{ceil(count / page_size)}: {len(paragraph_list)} items") handler(paragraph_list) + maxkb_logger.info(f"✅ Completed batch {batch_count}") + + maxkb_logger.info(f"🎉 page_desc: Completed all {batch_count} batches") diff --git a/apps/knowledge/tasks/embedding.py b/apps/knowledge/tasks/embedding.py index b7cc8863..7d05c56e 100644 --- a/apps/knowledge/tasks/embedding.py +++ b/apps/knowledge/tasks/embedding.py @@ -144,16 +144,25 @@ def embedding_by_data_source(document_id, knowledge_id, workspace_id): @param knowledge_id: 知识库id @param workspace_id: 工作空间id """ + maxkb_logger.info(f"🔍 Starting embedding_by_data_source for document {document_id}") + maxkb_logger.info(f"📊 Parameters - knowledge_id: {knowledge_id}, workspace_id: {workspace_id}") + try: from knowledge.serializers.common import get_embedding_model_id_by_knowledge_id + maxkb_logger.info(f"🔧 Getting embedding model ID for knowledge base {knowledge_id}") embedding_model_id = get_embedding_model_id_by_knowledge_id(knowledge_id) + if embedding_model_id: + maxkb_logger.info(f"✅ Found embedding model: {embedding_model_id}") + maxkb_logger.info(f"🚀 Submitting embedding task for document {document_id}") embedding_by_document.delay(document_id, embedding_model_id) - maxkb_logger.info(f"Started embedding for document {document_id} with model {embedding_model_id}") + maxkb_logger.info(f"✅ Successfully submitted embedding task for document {document_id} with model {embedding_model_id}") else: - maxkb_logger.warning(f"No embedding model found for knowledge {knowledge_id}") + maxkb_logger.error(f"❌ No embedding model found for knowledge base {knowledge_id}") + maxkb_logger.error(f"⚠️ Cannot proceed with embedding for document {document_id}") except Exception as e: - maxkb_logger.error(f"Failed to start embedding for document {document_id}: {str(e)}") + maxkb_logger.error(f"❌ Failed to start embedding for document {document_id}: {str(e)}") + maxkb_logger.error(f"🔍 Full error traceback: {traceback.format_exc()}") raise diff --git a/apps/knowledge/tasks/media_learning.py b/apps/knowledge/tasks/media_learning.py index bee96818..1a2bcc9f 100644 --- a/apps/knowledge/tasks/media_learning.py +++ b/apps/knowledge/tasks/media_learning.py @@ -132,32 +132,56 @@ def media_learning_by_document(document_id: str, knowledge_id: str, workspace_id # 状态保持为STARTED,但通过日志区分阶段 # 创建段落对象 + maxkb_logger.info(f"🔧 Starting paragraph creation for document {document_id}") with transaction.atomic(): paragraph_models = [] for idx, para_data in enumerate(paragraphs_data): + content = para_data.get('content', '') + title = para_data.get('title', f'段落 {idx + 1}') + content_length = len(content) + + maxkb_logger.info(f"📝 Creating paragraph {idx+1}/{len(paragraphs_data)}: title='{title}', length={content_length}") + maxkb_logger.info(f"📄 Paragraph {idx+1} preview: '{content[:100]}...'") + paragraph = Paragraph( document_id=document_id, knowledge_id=knowledge_id, - content=para_data.get('content', ''), - title=para_data.get('title', f'段落 {idx + 1}'), + content=content, + title=title, position=idx + 1, status_meta=para_data.get('metadata', {}) ) paragraph_models.append(paragraph) - + # 批量保存段落 if paragraph_models: + maxkb_logger.info(f"💾 Saving {len(paragraph_models)} paragraphs to database") QuerySet(Paragraph).bulk_create(paragraph_models) - maxkb_logger.info(f"✅ Created {len(paragraph_models)} paragraphs for document {document_id}") - - # 更新文档字符长度 - total_char_length = sum(len(p.content) for p in paragraph_models) - document.char_length = total_char_length - document.save() + maxkb_logger.info(f"✅ Successfully created {len(paragraph_models)} paragraphs for document {document_id}") + + # 统计信息 + total_char_length = sum(len(p.content) for p in paragraph_models) + avg_char_length = total_char_length / len(paragraph_models) + maxkb_logger.info(f"📊 Paragraph statistics - total chars: {total_char_length}, average: {avg_char_length:.2f}") + + # 更新文档字符长度 + document.char_length = total_char_length + document.save() + maxkb_logger.info(f"📄 Updated document char_length to {total_char_length}") + else: + maxkb_logger.warning(f"⚠️ No paragraph models to save for document {document_id}") # 第3步:触发向量化任务 - maxkb_logger.info(f"🔍 Starting embedding for document: {document_id}") - embedding_by_data_source(document_id, knowledge_id, workspace_id) + maxkb_logger.info(f"🔍 Starting embedding process for document: {document_id}") + maxkb_logger.info(f"📊 Document {document_id} has {len(paragraph_models)} paragraphs to embed") + + try: + embedding_by_data_source(document_id, knowledge_id, workspace_id) + maxkb_logger.info(f"✅ Embedding task successfully submitted for document {document_id}") + except Exception as embedding_error: + maxkb_logger.error(f"❌ Failed to start embedding for document {document_id}: {str(embedding_error)}") + maxkb_logger.error(f"🔍 Embedding error details: {traceback.format_exc()}") + # 注意:这里不抛出异常,让文档状态保持成功,但记录embedding失败 # 第4步:更新状态为完成 maxkb_logger.info(f"✅ Updating status to: SUCCESS (完成)")