From 3b143448a6d554212d999ef8bed51abdda280f60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Thu, 18 Dec 2025 22:02:49 +0800 Subject: [PATCH] =?UTF-8?q?transaction.on=5Fcommit=20=E6=98=AF=20Django=20?= =?UTF-8?q?=E6=8F=90=E4=BE=9B=E7=9A=84=E6=9C=BA=E5=88=B6=EF=BC=8C=E5=AE=83?= =?UTF-8?q?=E6=B3=A8=E5=86=8C=E7=9A=84=E5=9B=9E=E8=B0=83=E5=87=BD=E6=95=B0?= =?UTF-8?q?=E5=8F=AA=E4=BC=9A=E5=9C=A8=E5=BD=93=E5=89=8D=E4=BA=8B=E5=8A=A1?= =?UTF-8?q?=E6=88=90=E5=8A=9F=E6=8F=90=E4=BA=A4=E5=88=B0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E5=90=8E=E6=89=A7=E8=A1=8C=E3=80=82=E8=BF=99=E6=A0=B7?= =?UTF-8?q?=E7=A1=AE=E4=BF=9D=E4=BA=86=EF=BC=9A=20=20=20-=20=E4=BA=8B?= =?UTF-8?q?=E5=8A=A1=E6=88=90=E5=8A=9F=E6=97=B6=EF=BC=9A=E6=96=87=E6=A1=A3?= =?UTF-8?q?=E5=B7=B2=E4=BF=9D=E5=AD=98=E5=9C=A8=E6=95=B0=E6=8D=AE=E5=BA=93?= =?UTF-8?q?=E4=B8=AD=EF=BC=8C=E5=BC=82=E6=AD=A5=E4=BB=BB=E5=8A=A1=E5=8F=AF?= =?UTF-8?q?=E4=BB=A5=E6=AD=A3=E5=B8=B8=E6=9F=A5=E8=AF=A2=20=20=20-=20?= =?UTF-8?q?=E4=BA=8B=E5=8A=A1=E5=A4=B1=E8=B4=A5=E6=97=B6=EF=BC=9A=E5=9B=9E?= =?UTF-8?q?=E8=B0=83=E4=B8=8D=E4=BC=9A=E6=89=A7=E8=A1=8C=EF=BC=8C=E9=81=BF?= =?UTF-8?q?=E5=85=8D=E4=BA=86=E5=A4=84=E7=90=86=E4=B8=8D=E5=AD=98=E5=9C=A8?= =?UTF-8?q?=E7=9A=84=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/knowledge/serializers/document.py | 122 +++++++++++++------------ 1 file changed, 66 insertions(+), 56 deletions(-) diff --git a/apps/knowledge/serializers/document.py b/apps/knowledge/serializers/document.py index 443932b7..62f75aa0 100644 --- a/apps/knowledge/serializers/document.py +++ b/apps/knowledge/serializers/document.py @@ -225,6 +225,48 @@ class BatchEditHitHandlingSerializer(serializers.Serializer): raise AppApiException(500, _('The type only supports optimization|directly_return')) +def submit_advanced_learning_task(document_id, knowledge_id, workspace_id, llm_model_id, vision_model_id): + """提交高级学习任务的辅助函数""" + try: + from knowledge.tasks.advanced_learning import batch_advanced_learning + batch_advanced_learning.delay( + [document_id], + knowledge_id, + workspace_id, + llm_model_id, + vision_model_id + ) + maxkb_logger.info(f"Advanced learning task submitted for document: {document_id}") + except Exception as e: + maxkb_logger.error(f"Failed to submit advanced learning task: {str(e)}") + ListenerManagement.update_status( + QuerySet(Document).filter(id=document_id), + TaskType.EMBEDDING, + State.FAILURE + ) + + +def submit_media_learning_task(document_id, knowledge_id, workspace_id, stt_model_id, llm_model_id): + """提交音视频学习任务的辅助函数""" + try: + from knowledge.tasks.media_learning import media_learning_by_document + media_learning_by_document.delay( + document_id, + knowledge_id, + workspace_id, + stt_model_id, + llm_model_id + ) + maxkb_logger.info(f"Media learning task submitted for document: {document_id}") + except Exception as e: + maxkb_logger.error(f"Failed to submit media learning task: {str(e)}") + ListenerManagement.update_status( + QuerySet(Document).filter(id=document_id), + TaskType.GENERATE, + State.FAILURE + ) + + class DocumentSerializers(serializers.Serializer): class Export(serializers.Serializer): type = serializers.CharField(required=True, validators=[ @@ -1290,16 +1332,7 @@ class DocumentSerializers(serializers.Serializer): try: from knowledge.tasks.advanced_learning import advanced_learning_by_document # 使用 apply_async 并添加延迟,确保事务提交后再执行 - advanced_learning_by_document.apply_async( - args=[ - str(document_model.id), - str(knowledge_id), - self.data.get('workspace_id', ''), - llm_model_id, - vision_model_id - ], - countdown=2 # 延迟2秒执行 - ) + # 高级学习任务已使用 transaction.on_commit 在批量保存后统一提交 maxkb_logger.info(f"Advanced learning task submitted for document {document_model.id}") except Exception as e: maxkb_logger.error(f"Failed to submit advanced learning task: {str(e)}") @@ -1367,60 +1400,37 @@ class DocumentSerializers(serializers.Serializer): llm_model_id = document.get('llm_model_id') vision_model_id = document.get('vision_model_id') stt_model_id = document.get('stt_model_id') - + if idx < len(document_result_list): document_id = document_result_list[idx].get('id') - + if llm_model_id and vision_model_id: document_result_list[idx]['is_advanced_learning'] = True - # 触发高级学习异步任务 - try: - from knowledge.tasks.advanced_learning import batch_advanced_learning - batch_advanced_learning.delay( - [document_id], - str(knowledge_id), - workspace_id, - llm_model_id, - vision_model_id + # 使用 transaction.on_commit 提交高级学习任务 + transaction.on_commit( + lambda did=document_id: submit_advanced_learning_task( + did, str(knowledge_id), workspace_id, + llm_model_id, vision_model_id ) - maxkb_logger.info(f"Submitted advanced learning task for document: {document_id}") - except Exception as e: - maxkb_logger.error(f"Failed to submit advanced learning task: {str(e)}") - + ) + elif stt_model_id: document_result_list[idx]['is_media_learning'] = True - # 设置排队状态并触发音视频异步任务 - try: - # 更新文档状态为排队中 - ListenerManagement.update_status( - QuerySet(Document).filter(id=document_id), - TaskType.GENERATE, - State.PENDING + # 设置排队状态(在事务内立即执行) + ListenerManagement.update_status( + QuerySet(Document).filter(id=document_id), + TaskType.GENERATE, + State.PENDING + ) + + # 使用 transaction.on_commit 提交音视频学习任务 + transaction.on_commit( + lambda did=document_id: submit_media_learning_task( + did, str(knowledge_id), workspace_id, + stt_model_id, llm_model_id ) - - # 触发音视频异步处理任务 - from knowledge.tasks.media_learning import media_learning_by_document - media_learning_by_document.delay( - document_id, - str(knowledge_id), - workspace_id, - stt_model_id, - llm_model_id - ) - maxkb_logger.info(f"Submitted media learning task for document: {document_id}, status: PENDING") - - except Exception as e: - maxkb_logger.error(f"Failed to submit media learning task: {str(e)}") - # 如果提交任务失败,更新状态为失败 - try: - ListenerManagement.update_status( - QuerySet(Document).filter(id=document_id), - TaskType.GENERATE, - State.FAILURE - ) - except Exception as status_error: - maxkb_logger.error(f"Failed to update status to FAILURE: {str(status_error)}") - + ) + return document_result_list, knowledge_id, workspace_id def batch_sync(self, instance: Dict, with_valid=True):