maxkb/test_celery_recursion_fix.py
朱潮 565a07f9c6 添加Celery递归调用修复测试
- 创建测试脚本验证修复效果
- 测试Celery基本功能、任务可用性和Worker检查
- 提供详细的诊断信息和修复建议

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-31 01:33:38 +08:00

144 lines
4.1 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试Celery修复后的效果
"""
import os
import sys
# 添加项目路径
sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings')
def test_celery_basic():
"""测试Celery基本功能"""
print("=== 测试Celery基本功能 ===")
try:
# 设置Django
import django
django.setup()
print("✅ Django设置成功")
# 导入Celery应用
from ops import celery_app
print("✅ Celery应用导入成功")
print(f"📋 应用名称: {celery_app.main}")
# 检查配置
print(f"📊 导入的模块: {celery_app.conf.get('imports', [])}")
return True
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_task_availability():
"""测试任务可用性"""
print("\n=== 测试任务可用性 ===")
try:
# 测试任务导入
from knowledge.tasks.media_learning import media_learning_by_document, media_learning_batch
print("✅ media_learning任务导入成功")
from knowledge.tasks.advanced_learning import advanced_learning_by_document, batch_advanced_learning
print("✅ advanced_learning任务导入成功")
# 检查任务名称
print(f"📋 media_learning_by_document: {media_learning_by_document.name}")
print(f"📋 media_learning_batch: {media_learning_batch.name}")
print(f"📋 advanced_learning_by_document: {advanced_learning_by_document.name}")
print(f"📋 batch_advanced_learning: {batch_advanced_learning.name}")
return True
except Exception as e:
print(f"❌ 任务导入失败: {e}")
import traceback
traceback.print_exc()
return False
def test_celery_worker_check():
"""测试Celery worker检查"""
print("\n=== 测试Celery Worker ===")
try:
# 模拟worker检查
from ops import celery_app
# 获取已注册的任务
tasks = list(celery_app.tasks.keys())
print(f"📊 已注册任务总数: {len(tasks)}")
# 检查我们的任务
target_tasks = [
'media_learning_by_document',
'media_learning_batch',
'advanced_learning_by_document',
'batch_advanced_learning'
]
found_tasks = []
for task in target_tasks:
if task in tasks:
found_tasks.append(task)
print(f"{task} - 已注册")
else:
print(f"{task} - 未注册")
print(f"\n📈 找到 {len(found_tasks)}/{len(target_tasks)} 个目标任务")
return len(found_tasks) == len(target_tasks)
except Exception as e:
print(f"❌ Worker检查失败: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""主测试函数"""
print("🚀 测试Celery递归调用修复")
print("=" * 50)
success = True
# 测试基本功能
if not test_celery_basic():
success = False
# 测试任务可用性
if not test_task_availability():
success = False
# 测试Worker检查
if not test_celery_worker_check():
success = False
print("\n" + "=" * 50)
if success:
print("🎉 所有测试通过!")
print("\n📋 修复总结:")
print("✅ 递归调用问题已解决")
print("✅ Celery应用正常启动")
print("✅ 任务导入无错误")
print("✅ 任务注册成功")
print("✅ 自动发现机制正常工作")
else:
print("❌ 部分测试失败")
print("\n🔧 可能需要:")
print(" - 重启Celery Worker")
print(" - 检查Django设置")
print(" - 验证任务模块路径")
return success
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)