maxkb/apps/ops/celery/__init__.py
朱潮 c4eaeb6499 修复Celery递归调用错误
- 移除Django应用配置中的手动任务注册
- 使用Celery的imports配置确保任务模块被导入
- 避免在ready()方法中访问Celery应用
- 使用Celery的自动发现机制处理任务注册

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-31 01:33:18 +08:00

46 lines
1.4 KiB
Python

# -*- coding: utf-8 -*-
import os
from celery import Celery
from celery.schedules import crontab
from kombu import Exchange, Queue
from maxkb import settings
from .heartbeat import *
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings')
app = Celery('MaxKB')
configs = {k: v for k, v in settings.__dict__.items() if k.startswith('CELERY')}
configs['worker_concurrency'] = 5
# Using a string here means the worker will not have to
# pickle the object when using Windows.
# app.config_from_object('django.conf:settings', namespace='CELERY')
configs["task_queues"] = [
Queue("celery", Exchange("celery"), routing_key="celery"),
Queue("model", Exchange("model"), routing_key="model")
]
app.namespace = 'CELERY'
app.conf.update(
{key.replace('CELERY_', '') if key.replace('CELERY_', '').lower() == key.replace('CELERY_',
'') else key: configs.get(
key) for
key
in configs.keys()})
# 配置任务自动发现
app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS])
# 确保任务模块被导入
app.conf.update(
imports=[
'knowledge.tasks.advanced_learning',
'knowledge.tasks.media_learning',
'knowledge.tasks.embedding',
'knowledge.tasks.generate',
'knowledge.tasks.sync'
]
)