# -*- coding: utf-8 -*- """ 音视频学习任务处理 - 完全异步化状态流转 """ import traceback from typing import List, Optional from celery import shared_task from django.db import transaction from django.db.models import QuerySet from common.event import ListenerManagement from knowledge.tasks.embedding import embedding_by_data_source from common.utils.logger import maxkb_logger from knowledge.models import Document, Paragraph, TaskType, State, File, FileSourceType from common.handle.impl.media.media_split_handle import MediaSplitHandle @shared_task(name='media_learning_by_document') def media_learning_by_document(document_id: str, knowledge_id: str, workspace_id: str, stt_model_id: str, llm_model_id: Optional[str] = None, limit: Optional[int] = None, patterns: Optional[List[str]] = None, with_filter: Optional[bool] = None): """ 音视频文档异步处理任务 - 完整状态流转 状态流程: 1. 排队中 (PENDING) - 任务已提交,等待处理 2. 生成中 (STARTED) - 正在转写音视频内容 3. 索引中 (STARTED + 段落创建) - 正在创建段落和索引 4. 完成 (SUCCESS) - 处理完成 5. 失败 (FAILURE) - 处理失败 Args: document_id: 文档ID knowledge_id: 知识库ID workspace_id: 工作空间ID stt_model_id: STT模型ID llm_model_id: LLM模型ID(可选) limit: 分段长度(可选) patterns: 分段正则列表(可选) with_filter: 是否清除特殊字符(可选) """ maxkb_logger.info(f"🎬 Starting media learning task for document: {document_id}") maxkb_logger.info(f"📋 Current status: PENDING (排队中)") maxkb_logger.info(f"📝 Split parameters - limit: {limit}, patterns: {patterns}, with_filter: {with_filter}") try: # 验证文档存在 document = QuerySet(Document).filter(id=document_id).first() if not document: raise ValueError(f"Document not found: {document_id}") # 验证源文件 source_file_id = document.meta.get('source_file_id') if not source_file_id: raise ValueError(f"Source file not found for document: {document_id}") source_file = QuerySet(File).filter(id=source_file_id).first() if not source_file: raise ValueError(f"Source file not found: {source_file_id}") maxkb_logger.info(f"🎵 Processing media file: {source_file.file_name}") # 第1步:更新状态为生成中(音视频转写) maxkb_logger.info(f"🔄 Updating status to: STARTED (生成中)") ListenerManagement.update_status( QuerySet(Document).filter(id=document_id), TaskType.GENERATE, State.STARTED ) # 实际处理音视频文件 maxkb_logger.info(f"📝 Processing media file: {source_file.file_name}") # 使用MediaSplitHandle进行实际处理 try: from common.handle.impl.media.media_split_handle import MediaSplitHandle from django.core.files.base import ContentFile # 创建处理器 handler = MediaSplitHandle() # 创建临时文件对象 temp_file = ContentFile(source_file.get_bytes(), name=source_file.file_name) # 获取文件内容的函数 def get_buffer(file_obj): return file_obj.read() # 处理音视频文件(传递分块参数) result = handler.handle( file=temp_file, pattern_list=patterns or [], # 使用传入的分段模式 with_filter=with_filter if with_filter is not None else False, limit=limit if limit is not None else 0, # 0表示使用默认值(在handle中会转为1000) get_buffer=get_buffer, save_image=False, stt_model_id=stt_model_id, llm_model_id=llm_model_id, workspace_id=workspace_id, use_actual_processing=True # 标记需要实际处理 ) # 提取段落数据 paragraphs_data = [] for paragraph in result.get('content', []): paragraphs_data.append({ 'content': paragraph['content'], 'title': paragraph['title'], 'metadata': paragraph.get('metadata', {}) }) maxkb_logger.info(f"✅ Successfully processed media file, generated {len(paragraphs_data)} paragraphs") except Exception as processing_error: maxkb_logger.error(f"❌ Failed to process media file: {str(processing_error)}") # 如果处理失败,生成基础段落 paragraphs_data = [{ 'content': f'音视频文件 "{source_file.file_name}" 处理失败: {str(processing_error)}', 'title': '处理失败', 'metadata': { 'error': str(processing_error), 'file_name': source_file.file_name } }] maxkb_logger.info(f"📝 Generated {len(paragraphs_data)} paragraphs for media file") # 第2步:更新状态为索引中(段落创建和向量化) maxkb_logger.info(f"📚 Updating status to: STARTED (索引中)") # 状态保持为STARTED,但通过日志区分阶段 # 创建段落对象 with transaction.atomic(): paragraph_models = [] for idx, para_data in enumerate(paragraphs_data): paragraph = Paragraph( document_id=document_id, knowledge_id=knowledge_id, content=para_data.get('content', ''), title=para_data.get('title', f'段落 {idx + 1}'), position=idx + 1, status_meta=para_data.get('metadata', {}) ) paragraph_models.append(paragraph) # 批量保存段落 if paragraph_models: QuerySet(Paragraph).bulk_create(paragraph_models) maxkb_logger.info(f"✅ Created {len(paragraph_models)} paragraphs for document {document_id}") # 更新文档字符长度 total_char_length = sum(len(p.content) for p in paragraph_models) document.char_length = total_char_length document.save() # 第3步:触发向量化任务 maxkb_logger.info(f"🔍 Starting embedding for document: {document_id}") embedding_by_data_source(document_id, knowledge_id, workspace_id) # 第4步:更新状态为完成 maxkb_logger.info(f"✅ Updating status to: SUCCESS (完成)") ListenerManagement.update_status( QuerySet(Document).filter(id=document_id), TaskType.GENERATE, State.SUCCESS ) maxkb_logger.info(f"🎉 Media learning completed successfully for document: {document_id}") maxkb_logger.info(f"📊 Final stats: {len(paragraph_models)} paragraphs, {total_char_length} characters") except Exception as e: maxkb_logger.error(f"❌ Media learning failed for document {document_id}: {str(e)}") maxkb_logger.error(traceback.format_exc()) # 更新文档状态为失败 maxkb_logger.info(f"💥 Updating status to: FAILURE (失败)") ListenerManagement.update_status( QuerySet(Document).filter(id=document_id), TaskType.GENERATE, State.FAILURE ) raise @shared_task(name='media_learning_batch') def media_learning_batch(document_id_list: List[str], knowledge_id: str, workspace_id: str, stt_model_id: str, llm_model_id: Optional[str] = None, limit: Optional[int] = None, patterns: Optional[List[str]] = None, with_filter: Optional[bool] = None): """ 批量音视频处理任务 Args: document_id_list: 文档ID列表 knowledge_id: 知识库ID workspace_id: 工作空间ID stt_model_id: STT模型ID llm_model_id: LLM模型ID(可选) limit: 分段长度(可选) patterns: 分段正则列表(可选) with_filter: 是否清除特殊字符(可选) """ maxkb_logger.info(f"🎬 Starting batch media learning for {len(document_id_list)} documents") # 为每个文档提交单独的处理任务 for document_id in document_id_list: try: media_learning_by_document.delay( document_id, knowledge_id, workspace_id, stt_model_id, llm_model_id, limit, patterns, with_filter ) maxkb_logger.info(f"📋 Submitted media learning task for document: {document_id}") except Exception as e: maxkb_logger.error(f"Failed to submit task for document {document_id}: {str(e)}") # 更新失败状态 try: ListenerManagement.update_status( QuerySet(Document).filter(id=document_id), TaskType.GENERATE, State.FAILURE ) except Exception as status_error: maxkb_logger.error(f"Failed to update status for document {document_id}: {str(status_error)}") maxkb_logger.info(f"✅ Batch media learning tasks submitted")