From 459b0c8307c356af0aeb9666ce7436ccbc223ebc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Sun, 31 Aug 2025 01:01:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=82=E8=80=83=E9=AB=98=E7=BA=A7=E5=AD=A6?= =?UTF-8?q?=E4=B9=A0=E5=A4=84=E7=90=86=E6=B5=81=E7=A8=8B=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E9=9F=B3=E8=A7=86=E9=A2=91=E6=96=87=E6=A1=A3=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 修改批量插入段落逻辑,跳过音视频文档的段落插入 - 音视频文档段落由异步任务处理,不在此处插入 - 保持与高级学习文档相同的处理模式 - 添加详细的日志记录用于调试 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- apps/knowledge/serializers/document.py | 142 +++++++++++++------------ 1 file changed, 73 insertions(+), 69 deletions(-) diff --git a/apps/knowledge/serializers/document.py b/apps/knowledge/serializers/document.py index 78f93e40..66c947e1 100644 --- a/apps/knowledge/serializers/document.py +++ b/apps/knowledge/serializers/document.py @@ -1205,73 +1205,12 @@ class DocumentSerializers(serializers.Serializer): document['paragraphs'] = [] # 清空段落,等待异步处理 # 检查是否是音视频类型的文档 elif stt_model_id: - maxkb_logger.info(f"Document {document.get('name')} is media type, processing synchronously") - # 音视频类型的文档,直接处理 - source_file_id = document.get('source_file_id') - if source_file_id: - try: - source_file = QuerySet(File).filter(id=source_file_id).first() - if source_file: - workspace_id_value = self.data.get('workspace_id', '') - maxkb_logger.info(f"Processing media file: {source_file.file_name}") - maxkb_logger.info(f" - STT model ID: {stt_model_id}") - maxkb_logger.info(f" - Workspace ID: {workspace_id_value}") - maxkb_logger.info(f" - LLM model ID: {llm_model_id}") - - # 使用MediaSplitHandle处理音视频文件 - from common.handle.impl.media.media_split_handle import MediaSplitHandle - media_handler = MediaSplitHandle() - - # 准备文件对象 - class FileWrapper: - def __init__(self, file_obj): - self.file_obj = file_obj - self.name = file_obj.file_name - self.size = file_obj.file_size - - def read(self): - return self.file_obj.get_bytes() - - def seek(self, pos): - pass - - file_wrapper = FileWrapper(source_file) - - # 处理音视频文件 - result = media_handler.handle( - file_wrapper, - pattern_list=[], - with_filter=False, - limit=0, - get_buffer=lambda f: f.read(), - save_image=lambda x: None, - workspace_id=workspace_id_value, - stt_model_id=stt_model_id, - llm_model_id=llm_model_id - ) - - # 将处理结果添加到文档 - if result and result.get('content'): - document['paragraphs'] = result.get('content', []) - maxkb_logger.info(f"Media file processed, got {len(document['paragraphs'])} paragraphs") - else: - maxkb_logger.warning(f"No content extracted from media file, using default") - document['paragraphs'] = [{ - 'content': f'[音视频文件: {source_file.file_name}]', - 'title': '音视频内容' - }] - except Exception as e: - maxkb_logger.error(f"Failed to process media file: {str(e)}") - import traceback - maxkb_logger.error(traceback.format_exc()) - # 如果处理失败,创建一个默认段落 - document['paragraphs'] = [{ - 'content': f'[音视频文件处理失败: {str(e)}]', - 'title': '处理失败' - }] - else: - maxkb_logger.warning(f"No source file for media document") - document['paragraphs'] = [] + maxkb_logger.info(f"Document {document.get('name')} is media type, will process asynchronously") + # 音视频类型的文档,设置为异步处理 + # 清空段落,等待异步任务处理 + document['paragraphs'] = [] + # 标记为异步音视频文档,用于后续异步处理 + document['is_media_async'] = True # 插入文档 for document in instance_list: @@ -1373,10 +1312,25 @@ class DocumentSerializers(serializers.Serializer): State.FAILURE ) - # 批量插入段落(只为非高级学习文档) + # 批量插入段落(只为非高级学习文档和非音视频文档) if len(paragraph_model_list) > 0: maxkb_logger.info(f"Total paragraphs to insert: {len(paragraph_model_list)}") + + # 获取音视频文档ID列表 + media_document_ids = [] + for idx, document in enumerate(instance_list): + stt_model_id = document.get('stt_model_id') + if stt_model_id and idx < len(document_model_list): + media_document_ids.append(str(document_model_list[idx].id)) + + maxkb_logger.info(f"Media document IDs to skip paragraph insertion: {media_document_ids}") + for document in document_model_list: + # 跳过高级学习文档和音视频文档的段落插入 + if str(document.id) in media_document_ids: + maxkb_logger.info(f"Skipping paragraph insertion for media document: {document.id}") + continue + max_position = Paragraph.objects.filter(document_id=document.id).aggregate( max_position=Max('position') )['max_position'] or 0 @@ -1410,17 +1364,67 @@ class DocumentSerializers(serializers.Serializer): with_search_one=False ) - # 标记高级学习文档和音视频文档 + # 标记高级学习文档和音视频文档,并触发异步任务 for idx, document in enumerate(instance_list): llm_model_id = document.get('llm_model_id') vision_model_id = document.get('vision_model_id') stt_model_id = document.get('stt_model_id') if idx < len(document_result_list): + document_id = document_result_list[idx].get('id') + if llm_model_id and vision_model_id: document_result_list[idx]['is_advanced_learning'] = True + # 触发高级学习异步任务 + try: + from knowledge.tasks.advanced_learning import batch_advanced_learning + batch_advanced_learning.delay( + [document_id], + str(knowledge_id), + workspace_id, + llm_model_id, + vision_model_id + ) + maxkb_logger.info(f"Submitted advanced learning task for document: {document_id}") + except Exception as e: + maxkb_logger.error(f"Failed to submit advanced learning task: {str(e)}") + elif stt_model_id: document_result_list[idx]['is_media_learning'] = True + # 设置排队状态并触发音视频异步任务 + try: + from common.event import ListenerManagement + from knowledge.models import TaskType, State + + # 更新文档状态为排队中 + ListenerManagement.update_status( + QuerySet(Document).filter(id=document_id), + TaskType.EMBEDDING, + State.PENDING + ) + + # 触发音视频异步处理任务 + from knowledge.tasks.media_learning import media_learning_by_document + media_learning_by_document.delay( + document_id, + str(knowledge_id), + workspace_id, + stt_model_id, + llm_model_id + ) + maxkb_logger.info(f"Submitted media learning task for document: {document_id}, status: PENDING") + + except Exception as e: + maxkb_logger.error(f"Failed to submit media learning task: {str(e)}") + # 如果提交任务失败,更新状态为失败 + try: + ListenerManagement.update_status( + QuerySet(Document).filter(id=document_id), + TaskType.EMBEDDING, + State.FAILURE + ) + except Exception as status_error: + maxkb_logger.error(f"Failed to update status to FAILURE: {str(status_error)}") return document_result_list, knowledge_id, workspace_id