transaction.on_commit 是 Django 提供的机制,它注册的回调函数只会在当前事务成功提交到数据库后执行。这样确保了:
Some checks are pending
sync2gitee / repo-sync (push) Waiting to run
Typos Check / Spell Check with Typos (push) Waiting to run

- 事务成功时:文档已保存在数据库中,异步任务可以正常查询
  - 事务失败时:回调不会执行,避免了处理不存在的数据
This commit is contained in:
朱潮 2025-12-18 22:02:49 +08:00
parent 3f85269df0
commit 3b143448a6

View File

@ -225,6 +225,48 @@ class BatchEditHitHandlingSerializer(serializers.Serializer):
raise AppApiException(500, _('The type only supports optimization|directly_return')) raise AppApiException(500, _('The type only supports optimization|directly_return'))
def submit_advanced_learning_task(document_id, knowledge_id, workspace_id, llm_model_id, vision_model_id):
"""提交高级学习任务的辅助函数"""
try:
from knowledge.tasks.advanced_learning import batch_advanced_learning
batch_advanced_learning.delay(
[document_id],
knowledge_id,
workspace_id,
llm_model_id,
vision_model_id
)
maxkb_logger.info(f"Advanced learning task submitted for document: {document_id}")
except Exception as e:
maxkb_logger.error(f"Failed to submit advanced learning task: {str(e)}")
ListenerManagement.update_status(
QuerySet(Document).filter(id=document_id),
TaskType.EMBEDDING,
State.FAILURE
)
def submit_media_learning_task(document_id, knowledge_id, workspace_id, stt_model_id, llm_model_id):
"""提交音视频学习任务的辅助函数"""
try:
from knowledge.tasks.media_learning import media_learning_by_document
media_learning_by_document.delay(
document_id,
knowledge_id,
workspace_id,
stt_model_id,
llm_model_id
)
maxkb_logger.info(f"Media learning task submitted for document: {document_id}")
except Exception as e:
maxkb_logger.error(f"Failed to submit media learning task: {str(e)}")
ListenerManagement.update_status(
QuerySet(Document).filter(id=document_id),
TaskType.GENERATE,
State.FAILURE
)
class DocumentSerializers(serializers.Serializer): class DocumentSerializers(serializers.Serializer):
class Export(serializers.Serializer): class Export(serializers.Serializer):
type = serializers.CharField(required=True, validators=[ type = serializers.CharField(required=True, validators=[
@ -1290,16 +1332,7 @@ class DocumentSerializers(serializers.Serializer):
try: try:
from knowledge.tasks.advanced_learning import advanced_learning_by_document from knowledge.tasks.advanced_learning import advanced_learning_by_document
# 使用 apply_async 并添加延迟,确保事务提交后再执行 # 使用 apply_async 并添加延迟,确保事务提交后再执行
advanced_learning_by_document.apply_async( # 高级学习任务已使用 transaction.on_commit 在批量保存后统一提交
args=[
str(document_model.id),
str(knowledge_id),
self.data.get('workspace_id', ''),
llm_model_id,
vision_model_id
],
countdown=2 # 延迟2秒执行
)
maxkb_logger.info(f"Advanced learning task submitted for document {document_model.id}") maxkb_logger.info(f"Advanced learning task submitted for document {document_model.id}")
except Exception as e: except Exception as e:
maxkb_logger.error(f"Failed to submit advanced learning task: {str(e)}") maxkb_logger.error(f"Failed to submit advanced learning task: {str(e)}")
@ -1367,60 +1400,37 @@ class DocumentSerializers(serializers.Serializer):
llm_model_id = document.get('llm_model_id') llm_model_id = document.get('llm_model_id')
vision_model_id = document.get('vision_model_id') vision_model_id = document.get('vision_model_id')
stt_model_id = document.get('stt_model_id') stt_model_id = document.get('stt_model_id')
if idx < len(document_result_list): if idx < len(document_result_list):
document_id = document_result_list[idx].get('id') document_id = document_result_list[idx].get('id')
if llm_model_id and vision_model_id: if llm_model_id and vision_model_id:
document_result_list[idx]['is_advanced_learning'] = True document_result_list[idx]['is_advanced_learning'] = True
# 触发高级学习异步任务 # 使用 transaction.on_commit 提交高级学习任务
try: transaction.on_commit(
from knowledge.tasks.advanced_learning import batch_advanced_learning lambda did=document_id: submit_advanced_learning_task(
batch_advanced_learning.delay( did, str(knowledge_id), workspace_id,
[document_id], llm_model_id, vision_model_id
str(knowledge_id),
workspace_id,
llm_model_id,
vision_model_id
) )
maxkb_logger.info(f"Submitted advanced learning task for document: {document_id}") )
except Exception as e:
maxkb_logger.error(f"Failed to submit advanced learning task: {str(e)}")
elif stt_model_id: elif stt_model_id:
document_result_list[idx]['is_media_learning'] = True document_result_list[idx]['is_media_learning'] = True
# 设置排队状态并触发音视频异步任务 # 设置排队状态(在事务内立即执行)
try: ListenerManagement.update_status(
# 更新文档状态为排队中 QuerySet(Document).filter(id=document_id),
ListenerManagement.update_status( TaskType.GENERATE,
QuerySet(Document).filter(id=document_id), State.PENDING
TaskType.GENERATE, )
State.PENDING
# 使用 transaction.on_commit 提交音视频学习任务
transaction.on_commit(
lambda did=document_id: submit_media_learning_task(
did, str(knowledge_id), workspace_id,
stt_model_id, llm_model_id
) )
)
# 触发音视频异步处理任务
from knowledge.tasks.media_learning import media_learning_by_document
media_learning_by_document.delay(
document_id,
str(knowledge_id),
workspace_id,
stt_model_id,
llm_model_id
)
maxkb_logger.info(f"Submitted media learning task for document: {document_id}, status: PENDING")
except Exception as e:
maxkb_logger.error(f"Failed to submit media learning task: {str(e)}")
# 如果提交任务失败,更新状态为失败
try:
ListenerManagement.update_status(
QuerySet(Document).filter(id=document_id),
TaskType.GENERATE,
State.FAILURE
)
except Exception as status_error:
maxkb_logger.error(f"Failed to update status to FAILURE: {str(status_error)}")
return document_result_list, knowledge_id, workspace_id return document_result_list, knowledge_id, workspace_id
def batch_sync(self, instance: Dict, with_valid=True): def batch_sync(self, instance: Dict, with_valid=True):