参考高级学习处理流程修改音视频文档处理

- 修改批量插入段落逻辑,跳过音视频文档的段落插入
- 音视频文档段落由异步任务处理,不在此处插入
- 保持与高级学习文档相同的处理模式
- 添加详细的日志记录用于调试

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
朱潮 2025-08-31 01:01:51 +08:00
parent 86ef54fb75
commit 459b0c8307

View File

@ -1205,73 +1205,12 @@ class DocumentSerializers(serializers.Serializer):
document['paragraphs'] = [] # 清空段落,等待异步处理
# 检查是否是音视频类型的文档
elif stt_model_id:
maxkb_logger.info(f"Document {document.get('name')} is media type, processing synchronously")
# 音视频类型的文档,直接处理
source_file_id = document.get('source_file_id')
if source_file_id:
try:
source_file = QuerySet(File).filter(id=source_file_id).first()
if source_file:
workspace_id_value = self.data.get('workspace_id', '')
maxkb_logger.info(f"Processing media file: {source_file.file_name}")
maxkb_logger.info(f" - STT model ID: {stt_model_id}")
maxkb_logger.info(f" - Workspace ID: {workspace_id_value}")
maxkb_logger.info(f" - LLM model ID: {llm_model_id}")
# 使用MediaSplitHandle处理音视频文件
from common.handle.impl.media.media_split_handle import MediaSplitHandle
media_handler = MediaSplitHandle()
# 准备文件对象
class FileWrapper:
def __init__(self, file_obj):
self.file_obj = file_obj
self.name = file_obj.file_name
self.size = file_obj.file_size
def read(self):
return self.file_obj.get_bytes()
def seek(self, pos):
pass
file_wrapper = FileWrapper(source_file)
# 处理音视频文件
result = media_handler.handle(
file_wrapper,
pattern_list=[],
with_filter=False,
limit=0,
get_buffer=lambda f: f.read(),
save_image=lambda x: None,
workspace_id=workspace_id_value,
stt_model_id=stt_model_id,
llm_model_id=llm_model_id
)
# 将处理结果添加到文档
if result and result.get('content'):
document['paragraphs'] = result.get('content', [])
maxkb_logger.info(f"Media file processed, got {len(document['paragraphs'])} paragraphs")
else:
maxkb_logger.warning(f"No content extracted from media file, using default")
document['paragraphs'] = [{
'content': f'[音视频文件: {source_file.file_name}]',
'title': '音视频内容'
}]
except Exception as e:
maxkb_logger.error(f"Failed to process media file: {str(e)}")
import traceback
maxkb_logger.error(traceback.format_exc())
# 如果处理失败,创建一个默认段落
document['paragraphs'] = [{
'content': f'[音视频文件处理失败: {str(e)}]',
'title': '处理失败'
}]
else:
maxkb_logger.warning(f"No source file for media document")
maxkb_logger.info(f"Document {document.get('name')} is media type, will process asynchronously")
# 音视频类型的文档,设置为异步处理
# 清空段落,等待异步任务处理
document['paragraphs'] = []
# 标记为异步音视频文档,用于后续异步处理
document['is_media_async'] = True
# 插入文档
for document in instance_list:
@ -1373,10 +1312,25 @@ class DocumentSerializers(serializers.Serializer):
State.FAILURE
)
# 批量插入段落(只为非高级学习文档
# 批量插入段落(只为非高级学习文档和非音视频文档
if len(paragraph_model_list) > 0:
maxkb_logger.info(f"Total paragraphs to insert: {len(paragraph_model_list)}")
# 获取音视频文档ID列表
media_document_ids = []
for idx, document in enumerate(instance_list):
stt_model_id = document.get('stt_model_id')
if stt_model_id and idx < len(document_model_list):
media_document_ids.append(str(document_model_list[idx].id))
maxkb_logger.info(f"Media document IDs to skip paragraph insertion: {media_document_ids}")
for document in document_model_list:
# 跳过高级学习文档和音视频文档的段落插入
if str(document.id) in media_document_ids:
maxkb_logger.info(f"Skipping paragraph insertion for media document: {document.id}")
continue
max_position = Paragraph.objects.filter(document_id=document.id).aggregate(
max_position=Max('position')
)['max_position'] or 0
@ -1410,17 +1364,67 @@ class DocumentSerializers(serializers.Serializer):
with_search_one=False
)
# 标记高级学习文档和音视频文档
# 标记高级学习文档和音视频文档,并触发异步任务
for idx, document in enumerate(instance_list):
llm_model_id = document.get('llm_model_id')
vision_model_id = document.get('vision_model_id')
stt_model_id = document.get('stt_model_id')
if idx < len(document_result_list):
document_id = document_result_list[idx].get('id')
if llm_model_id and vision_model_id:
document_result_list[idx]['is_advanced_learning'] = True
# 触发高级学习异步任务
try:
from knowledge.tasks.advanced_learning import batch_advanced_learning
batch_advanced_learning.delay(
[document_id],
str(knowledge_id),
workspace_id,
llm_model_id,
vision_model_id
)
maxkb_logger.info(f"Submitted advanced learning task for document: {document_id}")
except Exception as e:
maxkb_logger.error(f"Failed to submit advanced learning task: {str(e)}")
elif stt_model_id:
document_result_list[idx]['is_media_learning'] = True
# 设置排队状态并触发音视频异步任务
try:
from common.event import ListenerManagement
from knowledge.models import TaskType, State
# 更新文档状态为排队中
ListenerManagement.update_status(
QuerySet(Document).filter(id=document_id),
TaskType.EMBEDDING,
State.PENDING
)
# 触发音视频异步处理任务
from knowledge.tasks.media_learning import media_learning_by_document
media_learning_by_document.delay(
document_id,
str(knowledge_id),
workspace_id,
stt_model_id,
llm_model_id
)
maxkb_logger.info(f"Submitted media learning task for document: {document_id}, status: PENDING")
except Exception as e:
maxkb_logger.error(f"Failed to submit media learning task: {str(e)}")
# 如果提交任务失败,更新状态为失败
try:
ListenerManagement.update_status(
QuerySet(Document).filter(id=document_id),
TaskType.EMBEDDING,
State.FAILURE
)
except Exception as status_error:
maxkb_logger.error(f"Failed to update status to FAILURE: {str(status_error)}")
return document_result_list, knowledge_id, workspace_id