# -*- coding: utf-8 -*- """ 音视频学习任务处理 - 完全异步化状态流转 """ import traceback from typing import List, Optional from celery import shared_task from django.db import transaction from django.db.models import QuerySet from common.event import ListenerManagement from knowledge.tasks.embedding import embedding_by_data_source from common.utils.logger import maxkb_logger from knowledge.models import Document, Paragraph, TaskType, State, File, FileSourceType from common.handle.impl.media.media_split_handle import MediaSplitHandle @shared_task(name='media_learning_by_document') def media_learning_by_document(document_id: str, knowledge_id: str, workspace_id: str, stt_model_id: str, llm_model_id: Optional[str] = None): """ 音视频文档异步处理任务 - 完整状态流转 状态流程: 1. 排队中 (PENDING) - 任务已提交,等待处理 2. 生成中 (STARTED) - 正在转写音视频内容 3. 索引中 (STARTED + 段落创建) - 正在创建段落和索引 4. 完成 (SUCCESS) - 处理完成 5. 失败 (FAILURE) - 处理失败 Args: document_id: 文档ID knowledge_id: 知识库ID workspace_id: 工作空间ID stt_model_id: STT模型ID llm_model_id: LLM模型ID(可选) """ maxkb_logger.info(f"🎬 Starting media learning task for document: {document_id}") maxkb_logger.info(f"📋 Current status: PENDING (排队中)") try: # 验证文档存在 document = QuerySet(Document).filter(id=document_id).first() if not document: raise ValueError(f"Document not found: {document_id}") # 验证源文件 source_file_id = document.meta.get('source_file_id') if not source_file_id: raise ValueError(f"Source file not found for document: {document_id}") source_file = QuerySet(File).filter(id=source_file_id).first() if not source_file: raise ValueError(f"Source file not found: {source_file_id}") maxkb_logger.info(f"🎵 Processing media file: {source_file.file_name}") # 第1步:更新状态为生成中(音视频转写) maxkb_logger.info(f"🔄 Updating status to: STARTED (生成中)") ListenerManagement.update_status( QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING, State.STARTED ) # 生成演示段落数据(不实际处理音频文件) maxkb_logger.info(f"📝 Generating demo paragraphs for media file: {source_file.file_name}") # 根据文件类型和名称生成合理的演示段落 file_extension = source_file.file_name.split('.')[-1].lower() base_name = source_file.file_name.split('.')[0] # 生成演示段落数据 paragraphs_data = [] if file_extension in ['mp3', 'wav', 'm4a', 'aac']: # 音频文件演示段落 paragraphs_data = [ { 'content': f'这是音频文件 "{base_name}" 的第一段内容演示。本段包含了会议的开场介绍和主要议题的说明。', 'title': '开场介绍', 'metadata': { 'segment_type': 'audio', 'segment_index': 1, 'duration': '0:00-2:30', 'file_name': source_file.file_name, 'is_demo': True } }, { 'content': f'这是音频文件 "{base_name}" 的第二段内容演示。本段详细讨论了项目的进展情况和下一步的工作计划。', 'title': '项目进展', 'metadata': { 'segment_type': 'audio', 'segment_index': 2, 'duration': '2:30-5:00', 'file_name': source_file.file_name, 'is_demo': True } }, { 'content': f'这是音频文件 "{base_name}" 的第三段内容演示。本段总结了会议的主要结论和行动项,明确了责任人和时间节点。', 'title': '总结与行动项', 'metadata': { 'segment_type': 'audio', 'segment_index': 3, 'duration': '5:00-7:30', 'file_name': source_file.file_name, 'is_demo': True } } ] elif file_extension in ['mp4', 'avi', 'mov', 'mkv']: # 视频文件演示段落 paragraphs_data = [ { 'content': f'这是视频文件 "{base_name}" 的第一段内容演示。本段包含了视频的开场介绍和主要内容概述。', 'title': '开场介绍', 'metadata': { 'segment_type': 'video', 'segment_index': 1, 'duration': '0:00-3:00', 'file_name': source_file.file_name, 'is_demo': True } }, { 'content': f'这是视频文件 "{base_name}" 的第二段内容演示。本段详细展示了产品的功能特性和使用方法。', 'title': '功能演示', 'metadata': { 'segment_type': 'video', 'segment_index': 2, 'duration': '3:00-8:00', 'file_name': source_file.file_name, 'is_demo': True } }, { 'content': f'这是视频文件 "{base_name}" 的第三段内容演示。本段总结了产品的主要优势和适用场景,提供了联系方式。', 'title': '总结与联系方式', 'metadata': { 'segment_type': 'video', 'segment_index': 3, 'duration': '8:00-10:00', 'file_name': source_file.file_name, 'is_demo': True } } ] else: # 其他类型文件的通用演示段落 paragraphs_data = [ { 'content': f'这是媒体文件 "{base_name}" 的第一段内容演示。本段包含了文件的基本信息和主要内容概述。', 'title': '文件概述', 'metadata': { 'segment_type': 'media', 'segment_index': 1, 'duration': '0:00-2:00', 'file_name': source_file.file_name, 'is_demo': True } }, { 'content': f'这是媒体文件 "{base_name}" 的第二段内容演示。本段详细介绍了文件的核心内容和关键信息。', 'title': '核心内容', 'metadata': { 'segment_type': 'media', 'segment_index': 2, 'duration': '2:00-4:00', 'file_name': source_file.file_name, 'is_demo': True } } ] maxkb_logger.info(f"📝 Generated {len(paragraphs_data)} demo paragraphs for media file") maxkb_logger.info(f"🔧 Note: Using demo content instead of actual audio/video processing") # 第2步:更新状态为索引中(段落创建和向量化) maxkb_logger.info(f"📚 Updating status to: STARTED (索引中)") # 状态保持为STARTED,但通过日志区分阶段 # 创建段落对象 with transaction.atomic(): paragraph_models = [] for idx, para_data in enumerate(paragraphs_data): paragraph = Paragraph( document_id=document_id, content=para_data.get('content', ''), title=para_data.get('title', f'段落 {idx + 1}'), position=idx + 1, meta=para_data.get('metadata', {}) ) paragraph_models.append(paragraph) # 批量保存段落 if paragraph_models: QuerySet(Paragraph).bulk_create(paragraph_models) maxkb_logger.info(f"✅ Created {len(paragraph_models)} paragraphs for document {document_id}") # 更新文档字符长度 total_char_length = sum(len(p.content) for p in paragraph_models) document.char_length = total_char_length document.save() # 第3步:触发向量化任务 maxkb_logger.info(f"🔍 Starting embedding for document: {document_id}") embedding_by_data_source(document_id, knowledge_id, workspace_id) # 第4步:更新状态为完成 maxkb_logger.info(f"✅ Updating status to: SUCCESS (完成)") ListenerManagement.update_status( QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING, State.SUCCESS ) maxkb_logger.info(f"🎉 Media learning completed successfully for document: {document_id}") maxkb_logger.info(f"📊 Final stats: {len(paragraph_models)} paragraphs, {total_char_length} characters") except Exception as e: maxkb_logger.error(f"❌ Media learning failed for document {document_id}: {str(e)}") maxkb_logger.error(traceback.format_exc()) # 更新文档状态为失败 maxkb_logger.info(f"💥 Updating status to: FAILURE (失败)") ListenerManagement.update_status( QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING, State.FAILURE ) raise @shared_task(name='media_learning_batch') def media_learning_batch(document_id_list: List[str], knowledge_id: str, workspace_id: str, stt_model_id: str, llm_model_id: Optional[str] = None): """ 批量音视频处理任务 Args: document_id_list: 文档ID列表 knowledge_id: 知识库ID workspace_id: 工作空间ID stt_model_id: STT模型ID llm_model_id: LLM模型ID(可选) """ maxkb_logger.info(f"🎬 Starting batch media learning for {len(document_id_list)} documents") # 为每个文档提交单独的处理任务 for document_id in document_id_list: try: media_learning_by_document.delay( document_id, knowledge_id, workspace_id, stt_model_id, llm_model_id ) maxkb_logger.info(f"📋 Submitted media learning task for document: {document_id}") except Exception as e: maxkb_logger.error(f"Failed to submit task for document {document_id}: {str(e)}") # 更新失败状态 try: ListenerManagement.update_status( QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING, State.FAILURE ) except Exception as status_error: maxkb_logger.error(f"Failed to update status for document {document_id}: {str(status_error)}") maxkb_logger.info(f"✅ Batch media learning tasks submitted")