feat: switch Celery broker to Redis with Sentinel support

This commit is contained in:
CaptainB 2025-07-07 11:24:28 +08:00
parent 78a0a0bd50
commit 25d395d74a

View File

@ -7,7 +7,8 @@
@desc:
"""
import os
import shutil
from redis.sentinel import Sentinel
from maxkb.const import CONFIG, PROJECT_DIR, LOG_DIR
@ -15,11 +16,58 @@ from maxkb.const import CONFIG, PROJECT_DIR, LOG_DIR
celery_data_dir = os.path.join(PROJECT_DIR, 'data', 'celery_task')
if not os.path.exists(celery_data_dir) or not os.path.isdir(celery_data_dir):
os.makedirs(celery_data_dir)
broker_path = os.path.join(celery_data_dir, "celery_db.sqlite3")
backend_path = os.path.join(celery_data_dir, "celery_results.sqlite3")
# 使用sql_lite 当做broker 和 响应接收
CELERY_BROKER_URL = f'sqla+sqlite:///{broker_path}'
CELERY_result_backend = f'db+sqlite:///{backend_path}'
# Celery using redis as broker
redis_celery_once_db = CONFIG.get("REDIS_CELERY_ONCE_DB", 3)
redis_celery_db = CONFIG.get('REDIS_CELERY_DB', 2)
CELERY_BROKER_URL_FORMAT = '%(protocol)s://:%(password)s@%(host)s:%(port)s/%(db)s'
if CONFIG.get('REDIS_SENTINEL_MASTER') and CONFIG.get('REDIS_SENTINEL_SENTINELS'):
sentinels_str = CONFIG.get('REDIS_SENTINEL_SENTINELS')
sentinels = [
(host.strip(), int(port))
for hostport in sentinels_str.split(',')
for host, port in [hostport.strip().split(':')]
]
CELERY_BROKER_URL = ';'.join([CELERY_BROKER_URL_FORMAT % {
'protocol': 'sentinel', 'password': CONFIG.get('REDIS_PASSWORD'),
'host': item[0], 'port': item[1], 'db': redis_celery_db
} for item in sentinels])
SENTINEL_OPTIONS = {
'master_name': CONFIG.get('REDIS_SENTINEL_MASTER'),
}
CELERY_BROKER_TRANSPORT_OPTIONS = CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = SENTINEL_OPTIONS
# celery-once 哨兵模式配置
sentinel = Sentinel(
sentinels,
socket_timeout=5,
password=CONFIG.get('REDIS_SENTINEL_PASSWORD', CONFIG.get('REDIS_PASSWORD'))
)
master_host, master_port = sentinel.discover_master(CONFIG.get('REDIS_SENTINEL_MASTER'))
celery_once_settings = {
'url': f"redis://:{CONFIG.get('REDIS_PASSWORD')}@{master_host}:{master_port}/{redis_celery_once_db}",
'master_name': CONFIG.get('REDIS_SENTINEL_MASTER'),
'password': CONFIG.get('REDIS_PASSWORD'),
'db': redis_celery_once_db,
}
else:
CELERY_BROKER_URL = CELERY_BROKER_URL_FORMAT % {
'protocol': 'redis',
'password': CONFIG.get('REDIS_PASSWORD'),
'host': CONFIG.get('REDIS_HOST'),
'port': CONFIG.get('REDIS_PORT'),
'db': redis_celery_db
}
# celery-once 常规模式配置
celery_once_settings = {
'url': CELERY_BROKER_URL_FORMAT % {
'protocol': 'redis',
'password': CONFIG.get('REDIS_PASSWORD'),
'host': CONFIG.get('REDIS_HOST'),
'port': CONFIG.get('REDIS_PORT'),
'db': redis_celery_once_db,
}
}
CELERY_result_backend = CELERY_BROKER_URL
CELERY_timezone = CONFIG.TIME_ZONE
CELERY_ENABLE_UTC = False
CELERY_task_serializer = 'pickle'
@ -33,16 +81,11 @@ CELERY_WORKER_REDIRECT_STDOUTS = True
CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO"
CELERY_TASK_SOFT_TIME_LIMIT = 3600
CELERY_WORKER_CANCEL_LONG_RUNNING_TASKS_ON_CONNECTION_LOSS = True
CELERY_TASK_ACKS_LATE = True
celery_once_path = os.path.join(celery_data_dir, "celery_once")
try:
if os.path.exists(celery_once_path) and os.path.isdir(celery_once_path):
shutil.rmtree(celery_once_path)
except Exception as e:
pass
# celery-once 配置
celery_once_settings['default_timeout'] = 3600 # 锁的默认超时时间(秒)
CELERY_ONCE = {
'backend': 'celery_once.backends.File',
'settings': {'location': celery_once_path}
'backend': 'celery_once.backends.Redis',
'settings': celery_once_settings
}
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_LOG_DIR = os.path.join(LOG_DIR, 'celery')