diff --git a/test_async_audio.py b/test_async_audio.py deleted file mode 100644 index d4a69e25..00000000 --- a/test_async_audio.py +++ /dev/null @@ -1,168 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -测试异步音频处理功能 -""" -import os -import sys -import asyncio -import time -from unittest.mock import Mock, MagicMock - -# 添加项目路径 -sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB') - -from apps.common.handle.impl.media.media_adapter.async_audio_processor import AsyncAudioProcessor -from apps.common.handle.impl.media.media_adapter.logger import MediaLogger - - -class MockLogger: - """模拟日志器""" - def info(self, msg): - print(f"[INFO] {msg}") - - def warning(self, msg): - print(f"[WARNING] {msg}") - - def error(self, msg, exc_info=False): - print(f"[ERROR] {msg}") - - -async def test_async_processor(): - """测试异步处理器""" - print("=== 测试异步音频处理器 ===") - - # 创建配置 - config = { - 'queue_size': 5, - 'async_processing': True - } - - # 创建日志包装器 - mock_logger = MockLogger() - logger_wrapper = MediaLogger(mock_logger) - - # 创建异步处理器 - processor = AsyncAudioProcessor(config, logger_wrapper) - - # 模拟音频数据(创建一个简单的测试音频文件) - test_audio_content = b"fake audio content for testing" - test_file_name = "test_audio.mp3" - - # 模拟STT和LLM模型 - stt_model = Mock() - stt_model.invoke = Mock(return_value="这是测试转写结果") - - llm_model = Mock() - llm_model.invoke = Mock(return_value="这是增强后的文本,带有标点符号。") - - # 测试选项 - options = { - 'enable_punctuation': True, - 'enable_summary': True, - 'segment_duration': 60, # 1分钟分段 - 'language': 'zh-CN' - } - - try: - print("开始测试异步音频处理...") - - # 初始化线程 - processor.initialize_threads() - - # 等待线程启动 - await asyncio.sleep(1) - - # 模拟音频时长为3分钟 - async def mock_get_duration(content): - return 180.0 - processor._get_audio_duration_async = mock_get_duration - - # 处理音频 - start_time = time.time() - result = await processor.process_audio_async( - test_audio_content, test_file_name, stt_model, llm_model, options - ) - end_time = time.time() - - print(f"处理完成,耗时: {end_time - start_time:.2f}秒") - print(f"结果状态: {result['status']}") - print(f"音频时长: {result['duration']:.1f}秒") - print(f"分段数量: {len(result['segments'])}") - print(f"完整文本长度: {len(result['full_text'])}") - - # 显示队列状态 - queue_status = processor.get_queue_status() - print(f"队列状态: {queue_status}") - - # 关闭处理器 - await processor.shutdown() - - print("测试完成!") - - except Exception as e: - print(f"测试失败: {e}") - import traceback - traceback.print_exc() - - -def test_sync_fallback(): - """测试同步回退功能""" - print("\n=== 测试同步回退功能 ===") - - from apps.common.handle.impl.media.media_adapter.processors.audio_processor import AudioProcessor - - # 创建配置 - config = { - 'async_processing': False # 禁用异步处理 - } - - # 创建处理器 - processor = AudioProcessor(config, MockLogger()) - - # 模拟音频数据 - test_audio_content = b"fake audio content for testing" - test_file_name = "test_audio.mp3" - - # 模拟STT和LLM模型 - stt_model = Mock() - stt_model.invoke = Mock(return_value="这是测试转写结果") - - llm_model = Mock() - llm_model.invoke = Mock(return_value="这是增强后的文本,带有标点符号。") - - # 测试选项 - options = { - 'enable_punctuation': True, - 'enable_summary': True, - 'segment_duration': 60, - 'language': 'zh-CN' - } - - try: - print("开始测试同步音频处理...") - - # 处理音频 - start_time = time.time() - result = processor.process( - test_audio_content, test_file_name, stt_model, llm_model, options - ) - end_time = time.time() - - print(f"处理完成,耗时: {end_time - start_time:.2f}秒") - print(f"结果状态: {result['status']}") - print(f"音频时长: {result.get('duration', 0):.1f}秒") - print(f"分段数量: {len(result.get('segments', []))}") - - print("同步回退测试完成!") - - except Exception as e: - print(f"同步回退测试失败: {e}") - import traceback - traceback.print_exc() - - -if __name__ == "__main__": - # 运行测试 - asyncio.run(test_async_processor()) - test_sync_fallback() \ No newline at end of file diff --git a/test_async_simple.py b/test_async_simple.py deleted file mode 100644 index 4c1ce64f..00000000 --- a/test_async_simple.py +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env python -""" -简单测试异步修复 -""" - -import asyncio -from asgiref.sync import sync_to_async - - -class TestModel: - """模拟的模型类""" - def invoke(self, messages): - """同步调用方法""" - return type('Response', (), {'content': 'Test response'})() - - -def get_model_sync(): - """模拟同步获取模型""" - print("同步获取模型...") - return TestModel() - - -async def get_model_async(): - """异步获取模型""" - print("异步获取模型...") - return await sync_to_async(get_model_sync)() - - -async def call_model_async(): - """异步调用模型""" - print("异步调用模型...") - model = await get_model_async() - - # 使用 sync_to_async 包装同步的 invoke 方法 - response = await sync_to_async(model.invoke)([{"role": "user", "content": "test"}]) - - if hasattr(response, 'content'): - return response.content - else: - return str(response) - - -async def main(): - """主测试函数""" - print("=" * 60) - print("测试异步修复") - print("=" * 60) - - try: - result = await call_model_async() - print(f"✓ 异步调用成功: {result}") - except Exception as e: - print(f"✗ 异步调用失败: {e}") - - print("=" * 60) - print("测试完成!") - print("=" * 60) - - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/test_audio_default_text.py b/test_audio_default_text.py deleted file mode 100644 index a4d9f620..00000000 --- a/test_audio_default_text.py +++ /dev/null @@ -1,112 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -测试音频分段处理改为默认文本 -""" -import sys -import os - -# 添加项目路径 -sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB') -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings') - -import django -django.setup() - -from common.handle.impl.media.media_split_handle import MediaSplitHandle -from unittest.mock import Mock - -class MockFile: - def __init__(self, name): - self.name = name - -def mock_get_buffer(file): - return b"fake audio content" - -def test_audio_default_segments(): - """测试音频默认分段生成""" - print("=== 测试音频默认分段生成 ===") - - handler = MediaSplitHandle() - - # 测试音频文件 - test_files = [ - "会议录音.mp3", - "产品演示.mp4", - "培训录音.wav", - "介绍视频.mov" - ] - - for file_name in test_files: - print(f"\n📄 测试文件: {file_name}") - - mock_file = MockFile(file_name) - - try: - result = handler.handle( - file=mock_file, - pattern_list=[], - with_filter=False, - limit=10, - get_buffer=mock_get_buffer, - save_image=False - ) - - print(f"✅ 处理成功") - print(f"📊 段落数量: {len(result['content'])}") - print(f"🏷️ 媒体类型: {result['metadata']['media_type']}") - print(f"🎭 演示模式: {result['metadata']['is_demo_content']}") - - # 显示段落内容 - for i, paragraph in enumerate(result['content'], 1): - print(f"\n{i}. {paragraph['title']}") - print(f" 内容预览: {paragraph['content'][:100]}...") - print(f" 时间范围: {paragraph['metadata']['start_time']}s - {paragraph['metadata']['end_time']}s") - - except Exception as e: - print(f"❌ 处理失败: {e}") - import traceback - traceback.print_exc() - -def test_file_support(): - """测试文件类型支持""" - print("\n=== 测试文件类型支持 ===") - - handler = MediaSplitHandle() - - test_files = [ - ("音频.mp3", True), - ("视频.mp4", True), - ("文档.pdf", False), - ("图片.jpg", False), - ("录音.wav", True), - ("电影.avi", True) - ] - - for file_name, expected in test_files: - mock_file = MockFile(file_name) - result = handler.support(mock_file, mock_get_buffer) - - status = "✅" if result == expected else "❌" - print(f"{status} {file_name}: 支持={result}, 期望={expected}") - -def main(): - """主测试函数""" - print("🚀 测试音频分段处理改为默认文本") - print("=" * 50) - - test_file_support() - test_audio_default_segments() - - print("\n" + "=" * 50) - print("🎉 测试完成!") - - print("\n📋 修改总结:") - print("✅ 音频分段处理已改为默认文本") - print("✅ 不再进行实际的音频处理") - print("✅ 根据文件类型生成合适的演示内容") - print("✅ 保留了完整的元数据信息") - print("✅ 支持音频和视频文件") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/test_celery_recursion_fix.py b/test_celery_recursion_fix.py deleted file mode 100644 index bf0fe645..00000000 --- a/test_celery_recursion_fix.py +++ /dev/null @@ -1,144 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -测试Celery修复后的效果 -""" -import os -import sys - -# 添加项目路径 -sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB') -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings') - -def test_celery_basic(): - """测试Celery基本功能""" - print("=== 测试Celery基本功能 ===") - - try: - # 设置Django - import django - django.setup() - - print("✅ Django设置成功") - - # 导入Celery应用 - from ops import celery_app - - print("✅ Celery应用导入成功") - print(f"📋 应用名称: {celery_app.main}") - - # 检查配置 - print(f"📊 导入的模块: {celery_app.conf.get('imports', [])}") - - return True - - except Exception as e: - print(f"❌ 测试失败: {e}") - import traceback - traceback.print_exc() - return False - -def test_task_availability(): - """测试任务可用性""" - print("\n=== 测试任务可用性 ===") - - try: - # 测试任务导入 - from knowledge.tasks.media_learning import media_learning_by_document, media_learning_batch - print("✅ media_learning任务导入成功") - - from knowledge.tasks.advanced_learning import advanced_learning_by_document, batch_advanced_learning - print("✅ advanced_learning任务导入成功") - - # 检查任务名称 - print(f"📋 media_learning_by_document: {media_learning_by_document.name}") - print(f"📋 media_learning_batch: {media_learning_batch.name}") - print(f"📋 advanced_learning_by_document: {advanced_learning_by_document.name}") - print(f"📋 batch_advanced_learning: {batch_advanced_learning.name}") - - return True - - except Exception as e: - print(f"❌ 任务导入失败: {e}") - import traceback - traceback.print_exc() - return False - -def test_celery_worker_check(): - """测试Celery worker检查""" - print("\n=== 测试Celery Worker ===") - - try: - # 模拟worker检查 - from ops import celery_app - - # 获取已注册的任务 - tasks = list(celery_app.tasks.keys()) - print(f"📊 已注册任务总数: {len(tasks)}") - - # 检查我们的任务 - target_tasks = [ - 'media_learning_by_document', - 'media_learning_batch', - 'advanced_learning_by_document', - 'batch_advanced_learning' - ] - - found_tasks = [] - for task in target_tasks: - if task in tasks: - found_tasks.append(task) - print(f"✅ {task} - 已注册") - else: - print(f"❌ {task} - 未注册") - - print(f"\n📈 找到 {len(found_tasks)}/{len(target_tasks)} 个目标任务") - - return len(found_tasks) == len(target_tasks) - - except Exception as e: - print(f"❌ Worker检查失败: {e}") - import traceback - traceback.print_exc() - return False - -def main(): - """主测试函数""" - print("🚀 测试Celery递归调用修复") - print("=" * 50) - - success = True - - # 测试基本功能 - if not test_celery_basic(): - success = False - - # 测试任务可用性 - if not test_task_availability(): - success = False - - # 测试Worker检查 - if not test_celery_worker_check(): - success = False - - print("\n" + "=" * 50) - if success: - print("🎉 所有测试通过!") - print("\n📋 修复总结:") - print("✅ 递归调用问题已解决") - print("✅ Celery应用正常启动") - print("✅ 任务导入无错误") - print("✅ 任务注册成功") - print("✅ 自动发现机制正常工作") - else: - print("❌ 部分测试失败") - print("\n🔧 可能需要:") - print(" - 重启Celery Worker") - print(" - 检查Django设置") - print(" - 验证任务模块路径") - - return success - -if __name__ == "__main__": - success = main() - sys.exit(0 if success else 1) \ No newline at end of file diff --git a/test_celery_tasks.py b/test_celery_tasks.py deleted file mode 100644 index 8e42bd60..00000000 --- a/test_celery_tasks.py +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -测试Celery任务注册 -""" -import os -import sys - -# 添加项目路径 -sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB') -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings') - -import django -django.setup() - -def test_celery_task_registration(): - """测试Celery任务是否正确注册""" - print("=== 测试Celery任务注册 ===") - - try: - # 导入Celery应用 - from ops import celery_app - - print(f"✅ Celery应用导入成功") - print(f"📋 应用名称: {celery_app.main}") - - # 检查已注册的任务 - registered_tasks = list(celery_app.tasks.keys()) - print(f"📊 已注册任务数量: {len(registered_tasks)}") - - # 检查我们的任务是否已注册 - target_tasks = [ - 'media_learning_by_document', - 'media_learning_batch', - 'advanced_learning_by_document', - 'batch_advanced_learning' - ] - - print(f"\n🔍 检查目标任务:") - for task_name in target_tasks: - if task_name in registered_tasks: - print(f"✅ {task_name} - 已注册") - # 获取任务对象 - task = celery_app.tasks.get(task_name) - print(f" 📝 任务描述: {task.__doc__}") - print(f" 🏷️ 任务名称: {task.name}") - else: - print(f"❌ {task_name} - 未注册") - - # 显示部分其他任务 - print(f"\n📋 其他已注册任务 (前10个):") - other_tasks = [t for t in registered_tasks if t not in target_tasks][:10] - for task in other_tasks: - print(f" • {task}") - - except Exception as e: - print(f"❌ 测试失败: {e}") - import traceback - traceback.print_exc() - -def test_task_import(): - """测试任务导入""" - print("\n=== 测试任务导入 ===") - - try: - # 测试直接导入任务 - from knowledge.tasks.media_learning import media_learning_by_document, media_learning_batch - print("✅ media_learning任务导入成功") - - from knowledge.tasks.advanced_learning import advanced_learning_by_document, batch_advanced_learning - print("✅ advanced_learning任务导入成功") - - # 测试任务调用 - print(f"\n🔧 测试任务调用:") - print(f"📋 media_learning_by_document.name: {media_learning_by_document.name}") - print(f"📋 media_learning_batch.name: {media_learning_batch.name}") - print(f"📋 advanced_learning_by_document.name: {advanced_learning_by_document.name}") - print(f"📋 batch_advanced_learning.name: {batch_advanced_learning.name}") - - except Exception as e: - print(f"❌ 任务导入失败: {e}") - import traceback - traceback.print_exc() - -def main(): - """主测试函数""" - print("🚀 测试Celery任务注册") - print("=" * 50) - - test_task_import() - test_celery_task_registration() - - print("\n" + "=" * 50) - print("🎉 测试完成!") - - print("\n📋 修复总结:") - print("✅ 修复了任务导入问题") - print("✅ 修复了任务注册问题") - print("✅ 验证了Celery自动发现功能") - print("✅ 确保了音视频异步任务可以正常执行") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/test_config_chain.py b/test_config_chain.py deleted file mode 100644 index b8645c4f..00000000 --- a/test_config_chain.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python3 -""" -测试配置对象的传递链 -""" - -import os -import sys - -# 设置环境变量,避免从环境获取默认值 -os.environ['MAXKB_LLM_MODEL_ID'] = '' -os.environ['MAXKB_VISION_MODEL_ID'] = '' - -print("Testing config chain") -print("=" * 60) - -# 模拟 dataclass -from dataclasses import dataclass - -@dataclass -class BaseConfig: - """Base configuration""" - api_url: str = "default_url" - - def __post_init__(self): - print(f" BaseConfig.__post_init__ called") - -class TestConfig(BaseConfig): - """Test configuration with model IDs""" - - @classmethod - def create(cls, llm_id=None, vision_id=None): - print(f"TestConfig.create() called with llm_id={llm_id}, vision_id={vision_id}") - instance = cls() - print(f" After cls(): llm={getattr(instance, 'llm_id', 'NOT SET')}, vision={getattr(instance, 'vision_id', 'NOT SET')}") - - if llm_id: - instance.llm_id = llm_id - print(f" Set llm_id to {llm_id}") - if vision_id: - instance.vision_id = vision_id - print(f" Set vision_id to {vision_id}") - - print(f" Final: llm={instance.llm_id}, vision={instance.vision_id}") - return instance - - def __post_init__(self): - print(f" TestConfig.__post_init__ called") - super().__post_init__() - # Set defaults - self.llm_id = "default_llm" - self.vision_id = "default_vision" - print(f" Set defaults: llm={self.llm_id}, vision={self.vision_id}") - -# Test 1: Direct creation -print("\nTest 1: Direct creation (should use defaults)") -config1 = TestConfig() -print(f"Result: llm={config1.llm_id}, vision={config1.vision_id}") - -# Test 2: Factory method -print("\nTest 2: Factory method with IDs") -config2 = TestConfig.create(llm_id="llm_123", vision_id="vision_456") -print(f"Result: llm={config2.llm_id}, vision={config2.vision_id}") - -print("\n" + "=" * 60) -print("Analysis:") -if config2.llm_id == "llm_123" and config2.vision_id == "vision_456": - print("✅ Factory method correctly overrides defaults") -else: - print("❌ Problem: Factory method failed to override defaults") - print(f" Expected: llm=llm_123, vision=vision_456") - print(f" Got: llm={config2.llm_id}, vision={config2.vision_id}") \ No newline at end of file diff --git a/test_config_simple.py b/test_config_simple.py deleted file mode 100644 index 1bf8cdf8..00000000 --- a/test_config_simple.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python3 -""" -简单测试配置逻辑 -""" - -# 模拟配置类的行为 -class TestConfig: - def __init__(self): - self.llm_model_id = None - self.vision_model_id = None - - @classmethod - def create(cls, llm_model_id=None, vision_model_id=None): - instance = cls() - if llm_model_id: - instance.llm_model_id = llm_model_id - if vision_model_id: - instance.vision_model_id = vision_model_id - print(f"Config created with LLM={instance.llm_model_id}, Vision={instance.vision_model_id}") - return instance - -def test_model_selection(): - """测试模型选择逻辑""" - - TEST_LLM_ID = "0198e029-bfeb-7d43-a6ee-c88662697d3c" - TEST_VISION_ID = "0198e02c-9f2e-7520-a27b-6376ad42d520" - - # 创建配置 - config = TestConfig.create( - llm_model_id=TEST_LLM_ID, - vision_model_id=TEST_VISION_ID - ) - - print("\nTest 1: use_llm=False (should use vision model)") - use_llm = False - if use_llm: - model_id = config.llm_model_id - print(f" Using LLM model: {model_id}") - else: - model_id = config.vision_model_id - print(f" Using Vision model: {model_id}") - - if model_id == TEST_VISION_ID: - print(f" ✅ Correct! Using vision model ID: {TEST_VISION_ID}") - else: - print(f" ❌ Wrong! Using: {model_id}, Expected: {TEST_VISION_ID}") - - print("\nTest 2: use_llm=True (should use LLM model)") - use_llm = True - if use_llm: - model_id = config.llm_model_id - print(f" Using LLM model: {model_id}") - else: - model_id = config.vision_model_id - print(f" Using Vision model: {model_id}") - - if model_id == TEST_LLM_ID: - print(f" ✅ Correct! Using LLM model ID: {TEST_LLM_ID}") - else: - print(f" ❌ Wrong! Using: {model_id}, Expected: {TEST_LLM_ID}") - -if __name__ == "__main__": - print("=" * 60) - print("Testing Model Selection Logic") - print("=" * 60) - test_model_selection() - print("=" * 60) \ No newline at end of file diff --git a/test_django_celery_fix.py b/test_django_celery_fix.py deleted file mode 100644 index c87896dc..00000000 --- a/test_django_celery_fix.py +++ /dev/null @@ -1,140 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -测试Django应用启动和Celery任务注册 -""" -import os -import sys - -# 添加项目路径 -sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB') -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings') - -def test_django_startup(): - """测试Django应用启动""" - print("=== 测试Django应用启动 ===") - - try: - # 设置Django - import django - django.setup() - - print("✅ Django应用启动成功") - print(f"📊 已安装应用数量: {len(django.apps.apps.get_app_configs())}") - - # 检查knowledge应用 - knowledge_app = django.apps.apps.get_app_config('knowledge') - print(f"✅ Knowledge应用已加载: {knowledge_app.name}") - - # 检查应用是否准备好 - if django.apps.apps.ready: - print("✅ Django应用已完全准备好") - else: - print("⚠️ Django应用还未完全准备好") - - return True - - except Exception as e: - print(f"❌ Django应用启动失败: {e}") - import traceback - traceback.print_exc() - return False - -def test_celery_tasks(): - """测试Celery任务""" - print("\n=== 测试Celery任务 ===") - - try: - # 导入Celery应用 - from ops import celery_app - - print(f"✅ Celery应用导入成功") - print(f"📋 应用名称: {celery_app.main}") - - # 检查任务 - registered_tasks = list(celery_app.tasks.keys()) - print(f"📊 已注册任务数量: {len(registered_tasks)}") - - # 检查目标任务 - target_tasks = [ - 'media_learning_by_document', - 'media_learning_batch', - 'advanced_learning_by_document', - 'batch_advanced_learning' - ] - - print(f"\n🔍 检查目标任务:") - for task_name in target_tasks: - if task_name in registered_tasks: - print(f"✅ {task_name} - 已注册") - else: - print(f"❌ {task_name} - 未注册") - - return True - - except Exception as e: - print(f"❌ Celery任务测试失败: {e}") - import traceback - traceback.print_exc() - return False - -def test_task_import(): - """测试任务导入""" - print("\n=== 测试任务导入 ===") - - try: - # 测试导入 - from knowledge.tasks.media_learning import media_learning_by_document, media_learning_batch - print("✅ media_learning任务导入成功") - - from knowledge.tasks.advanced_learning import advanced_learning_by_document, batch_advanced_learning - print("✅ advanced_learning任务导入成功") - - # 测试任务属性 - print(f"\n🔧 任务信息:") - print(f"📋 media_learning_by_document.name: {media_learning_by_document.name}") - print(f"📋 media_learning_batch.name: {media_learning_batch.name}") - - return True - - except Exception as e: - print(f"❌ 任务导入失败: {e}") - import traceback - traceback.print_exc() - return False - -def main(): - """主测试函数""" - print("🚀 测试Django应用启动和Celery任务注册") - print("=" * 60) - - success = True - - # 测试Django启动 - if not test_django_startup(): - success = False - - # 测试任务导入 - if not test_task_import(): - success = False - - # 测试Celery任务 - if not test_celery_tasks(): - success = False - - print("\n" + "=" * 60) - if success: - print("🎉 所有测试通过!") - print("\n📋 修复总结:") - print("✅ Django应用启动正常") - print("✅ 任务导入无错误") - print("✅ Celery任务正确注册") - print("✅ 应用启动顺序正确") - else: - print("❌ 部分测试失败") - - return success - -if __name__ == "__main__": - success = main() - sys.exit(0 if success else 1) \ No newline at end of file diff --git a/test_fixed_media_async.py b/test_fixed_media_async.py deleted file mode 100644 index 15300d3a..00000000 --- a/test_fixed_media_async.py +++ /dev/null @@ -1,213 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -测试修复后的音视频异步处理流程 -""" -import time - - -def test_fixed_media_processing(): - """测试修复后的音视频处理流程""" - print("🔧 测试修复后的音视频异步处理流程") - print("=" * 50) - - # 模拟文档信息 - test_files = [ - { - 'name': '会议录音.mp3', - 'type': 'audio', - 'expected_segments': 3 - }, - { - 'name': '产品演示.mp4', - 'type': 'video', - 'expected_segments': 3 - }, - { - 'name': '培训录音.wav', - 'type': 'audio', - 'expected_segments': 3 - }, - { - 'name': '介绍视频.mov', - 'type': 'video', - 'expected_segments': 3 - } - ] - - for i, file_info in enumerate(test_files, 1): - print(f"\n📄 测试文件 {i}: {file_info['name']}") - print(f"🎵 文件类型: {file_info['type']}") - print(f"📊 预期分段数: {file_info['expected_segments']}") - - # 模拟处理流程 - print(f"\n🔄 处理流程:") - - # 1. 排队中 - print(f" 📋 状态: 排队中 (PENDING)") - print(f" 📝 任务已提交到异步队列") - time.sleep(0.5) - - # 2. 生成中 - print(f" 🔄 状态: 生成中 (STARTED)") - print(f" 🔧 开始生成演示段落(不实际处理音频)") - time.sleep(0.5) - - # 3. 索引中 - print(f" 📚 状态: 索引中 (STARTED)") - print(f" 📝 创建段落对象") - print(f" 🔍 生成向量索引") - time.sleep(0.5) - - # 4. 完成 - print(f" ✅ 状态: 完成 (SUCCESS)") - print(f" 📊 生成 {file_info['expected_segments']} 个演示段落") - - # 显示演示段落内容 - print(f"\n📝 演示段落内容:") - if file_info['type'] == 'audio': - segments = [ - "开场介绍 - 包含会议的开场介绍和主要议题的说明", - "项目进展 - 详细讨论了项目的进展情况和下一步的工作计划", - "总结与行动项 - 总结了会议的主要结论和行动项" - ] - else: - segments = [ - "开场介绍 - 包含视频的开场介绍和主要内容概述", - "功能演示 - 详细展示了产品的功能特性和使用方法", - "总结与联系方式 - 总结了产品的主要优势和适用场景" - ] - - for j, segment in enumerate(segments, 1): - print(f" {j}. {segment}") - - print(f"\n📊 处理统计:") - print(f" 📝 段落数量: {file_info['expected_segments']}") - print(f" 🔤 字符数量: ~{file_info['expected_segments'] * 200}") - print(f" ⏱️ 处理时长: < 1秒(演示模式)") - print(f" 🏷️ 标记: 演示内容 (is_demo: True)") - - print(f"\n" + "-" * 30) - - print(f"\n🎉 所有测试文件处理完成!") - - -def test_error_handling(): - """测试错误处理""" - print(f"\n❌ 测试错误处理场景") - print("=" * 30) - - # 模拟错误场景 - error_scenarios = [ - { - 'scenario': '导入错误修复', - 'description': 'embedding_by_data_source 导入路径已修复', - 'status': '✅ 已解决' - }, - { - 'scenario': '任务提交失败', - 'description': '异步任务提交失败时的处理', - 'status': '✅ 已实现' - }, - { - 'scenario': '文件不存在', - 'description': '源文件不存在时的错误处理', - 'status': '✅ 已实现' - }, - { - 'scenario': '处理失败', - 'description': '处理过程中的异常处理', - 'status': '✅ 已实现' - } - ] - - for i, scenario in enumerate(error_scenarios, 1): - print(f"\n{i}. {scenario['scenario']}") - print(f" 描述: {scenario['description']}") - print(f" 状态: {scenario['status']}") - time.sleep(0.3) - - print(f"\n🔧 错误处理特性:") - print(f" ✅ 详细的错误日志") - print(f" ✅ 状态正确更新为 FAILURE") - print(f" ✅ 支持手动重新处理") - print(f" ✅ 异常捕获和优雅降级") - - -def test_demo_content_features(): - """测试演示内容特性""" - print(f"\n🎭 测试演示内容特性") - print("=" * 30) - - features = [ - { - 'feature': '智能分段', - 'description': '根据文件类型生成合适的演示段落', - 'benefit': '更真实的处理体验' - }, - { - 'feature': '元数据标记', - 'description': '每个段落都标记为演示内容 (is_demo: True)', - 'benefit': '便于区分真实处理和演示内容' - }, - { - 'feature': '文件类型识别', - 'description': '自动识别音频/视频文件类型', - 'benefit': '生成更贴合的演示内容' - }, - { - 'feature': '时长信息', - 'description': '为每个段落添加模拟的时长信息', - 'benefit': '更真实的分段效果' - } - ] - - for i, feature in enumerate(features, 1): - print(f"\n{i}. {feature['feature']}") - print(f" 描述: {feature['description']}") - print(f" 优势: {feature['benefit']}") - time.sleep(0.3) - - print(f"\n🎯 演示内容适用场景:") - print(f" 🧪 开发和测试环境") - print(f" 📚 功能演示和展示") - print(f" 🔧 系统集成测试") - print(f" 🎓 用户培训和指导") - - -def main(): - """主测试函数""" - print("🚀 音视频异步处理修复验证测试") - print("=" * 60) - - # 运行测试 - test_fixed_media_processing() - test_error_handling() - test_demo_content_features() - - print(f"\n" + "=" * 60) - print("🎊 修复验证测试完成!") - - print(f"\n📋 修复内容总结:") - print(f"✅ 修复了 embedding_by_data_source 导入错误") - print(f"✅ 实现了演示内容生成(不实际处理音频)") - print(f"✅ 保持了完整的状态流转") - print(f"✅ 完善了错误处理机制") - print(f"✅ 支持多种音视频文件类型") - - print(f"\n🔄 状态流程(修复后):") - print(f"📋 排队中 → 🔄 生成中 → 📚 索引中 → ✅ 完成") - print(f" ↓") - print(f"💥 失败") - - print(f"\n🎭 演示模式特性:") - print(f"🔧 不实际处理音频文件") - print(f"📝 生成合理的演示段落") - print(f"🏷️ 标记为演示内容") - print(f"⚡ 快速处理,无延迟") - - print(f"\n🚀 现在可以正常使用音视频异步处理功能!") - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/test_image_access.py b/test_image_access.py deleted file mode 100644 index 227739a1..00000000 --- a/test_image_access.py +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/env python -""" -测试图片存储和访问 - -这个脚本会: -1. 创建一个测试图片在存储目录 -2. 打印正确的访问URL -""" - -import os -import sys - -def main(): - # 设置存储路径(本地开发环境) - storage_path = os.getenv('MAXKB_STORAGE_PATH', './tmp/maxkb/storage') - - print("=" * 60) - print("MaxKB 图片存储和访问测试") - print("=" * 60) - - # 创建目录结构 - image_dir = os.path.join(storage_path, 'mineru', 'images') - os.makedirs(image_dir, exist_ok=True) - print(f"\n1. 存储目录:{image_dir}") - - # 创建测试图片文件 - test_image = os.path.join(image_dir, 'ac3681aaa7a346b49ef9c7ceb7b94058.jpg') - with open(test_image, 'wb') as f: - # 写入一个简单的测试内容(实际应该是图片二进制数据) - f.write(b'TEST IMAGE CONTENT') - print(f"2. 创建测试文件:{test_image}") - - # 生成访问URL - print("\n3. 访问URL:") - print(f" 本地开发:http://localhost:8080/storage/mineru/images/ac3681aaa7a346b49ef9c7ceb7b94058.jpg") - print(f" Docker环境:http://localhost:8080/storage/mineru/images/ac3681aaa7a346b49ef9c7ceb7b94058.jpg") - - # 列出当前存储目录的所有文件 - print(f"\n4. 存储目录内容:") - for root, dirs, files in os.walk(storage_path): - level = root.replace(storage_path, '').count(os.sep) - indent = ' ' * level - print(f'{indent}{os.path.basename(root)}/') - subindent = ' ' * (level + 1) - for file in files: - file_path = os.path.join(root, file) - file_size = os.path.getsize(file_path) - print(f'{subindent}{file} ({file_size} bytes)') - - print("\n" + "=" * 60) - print("测试完成!") - print("\n注意事项:") - print("1. 确保Django服务器正在运行") - print("2. URL路径现在是 /storage/ 开头,简洁直接") - print("3. 如果使用Docker,确保volume正确挂载") - print("=" * 60) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/test_maxkb_adapter.py b/test_maxkb_adapter.py deleted file mode 100644 index 9d211ab1..00000000 --- a/test_maxkb_adapter.py +++ /dev/null @@ -1,289 +0,0 @@ -#!/usr/bin/env python3 -""" -MaxKB Adapter Import and Basic Functionality Test - -This script specifically tests the MaxKB adapter imports and basic functionality. -""" - -import sys -import os -from pathlib import Path - -# Add the project root to Python path -project_root = Path(__file__).parent -sys.path.insert(0, str(project_root)) - -# For MaxKB, also add the apps directory to the path -apps_path = project_root / 'apps' -if apps_path.exists(): - sys.path.insert(0, str(apps_path)) - print(f"✅ Added apps directory to Python path: {apps_path}") - -# Setup Django environment if we're in MaxKB -try: - import django - os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings') - django.setup() - print("✅ Django environment initialized") -except ImportError: - print("ℹ️ Django not available - running in standalone mode") -except Exception as e: - print(f"ℹ️ Could not initialize Django: {e}") - -def test_imports(): - """Test MaxKB adapter imports""" - print("=" * 60) - print("🔍 Testing MaxKB Adapter Imports") - print("=" * 60) - - results = [] - - # Test 1: Import main adapter module - print("\n1. Testing main adapter import...") - try: - from common.handle.impl.mineru.maxkb_adapter import adapter - print(" ✅ Successfully imported adapter module") - results.append(("adapter module", True)) - - # Check for required classes - assert hasattr(adapter, 'MaxKBAdapter'), "MaxKBAdapter class not found" - print(" ✅ MaxKBAdapter class found") - - assert hasattr(adapter, 'MinerUExtractor'), "MinerUExtractor class not found" - print(" ✅ MinerUExtractor class found") - - assert hasattr(adapter, 'MinerUAdapter'), "MinerUAdapter class not found" - print(" ✅ MinerUAdapter class found") - - except ImportError as e: - print(f" ❌ Failed to import adapter: {e}") - results.append(("adapter module", False)) - except AssertionError as e: - print(f" ❌ Assertion failed: {e}") - results.append(("adapter module", False)) - - # Test 2: Import file storage client - print("\n2. Testing file storage client import...") - try: - from common.handle.impl.mineru.maxkb_adapter import file_storage_client - print(" ✅ Successfully imported file_storage_client module") - - assert hasattr(file_storage_client, 'FileStorageClient'), "FileStorageClient class not found" - print(" ✅ FileStorageClient class found") - results.append(("file_storage_client", True)) - - except ImportError as e: - print(f" ❌ Failed to import file_storage_client: {e}") - results.append(("file_storage_client", False)) - except AssertionError as e: - print(f" ❌ Assertion failed: {e}") - results.append(("file_storage_client", False)) - - # Test 3: Import model client - print("\n3. Testing model client import...") - try: - from common.handle.impl.mineru.maxkb_adapter import maxkb_model_client - print(" ✅ Successfully imported maxkb_model_client module") - - assert hasattr(maxkb_model_client, 'MaxKBModelClient'), "MaxKBModelClient class not found" - print(" ✅ MaxKBModelClient class found") - - assert hasattr(maxkb_model_client, 'maxkb_model_client'), "maxkb_model_client instance not found" - print(" ✅ maxkb_model_client instance found") - results.append(("maxkb_model_client", True)) - - except ImportError as e: - print(f" ❌ Failed to import maxkb_model_client: {e}") - results.append(("maxkb_model_client", False)) - except AssertionError as e: - print(f" ❌ Assertion failed: {e}") - results.append(("maxkb_model_client", False)) - - # Test 4: Import configuration - print("\n4. Testing configuration import...") - try: - from common.handle.impl.mineru.maxkb_adapter import config_maxkb - print(" ✅ Successfully imported config_maxkb module") - - assert hasattr(config_maxkb, 'MaxKBMinerUConfig'), "MaxKBMinerUConfig class not found" - print(" ✅ MaxKBMinerUConfig class found") - results.append(("config_maxkb", True)) - - except ImportError as e: - print(f" ❌ Failed to import config_maxkb: {e}") - results.append(("config_maxkb", False)) - except AssertionError as e: - print(f" ❌ Assertion failed: {e}") - results.append(("config_maxkb", False)) - - # Test 5: Import logger - print("\n5. Testing logger import...") - try: - from common.handle.impl.mineru.maxkb_adapter import logger - print(" ✅ Successfully imported logger module") - results.append(("logger", True)) - - except ImportError as e: - print(f" ❌ Failed to import logger: {e}") - results.append(("logger", False)) - - # Test 6: Import base parser (parent module) - print("\n6. Testing base parser import...") - try: - from common.handle.impl.mineru import base_parser - print(" ✅ Successfully imported base_parser module") - - assert hasattr(base_parser, 'PlatformAdapter'), "PlatformAdapter class not found" - print(" ✅ PlatformAdapter class found") - - assert hasattr(base_parser, 'BaseMinerUExtractor'), "BaseMinerUExtractor class not found" - print(" ✅ BaseMinerUExtractor class found") - results.append(("base_parser", True)) - - except ImportError as e: - print(f" ❌ Failed to import base_parser: {e}") - results.append(("base_parser", False)) - except AssertionError as e: - print(f" ❌ Assertion failed: {e}") - results.append(("base_parser", False)) - - # Print summary - print("\n" + "=" * 60) - print("📊 Import Test Summary") - print("=" * 60) - - passed = sum(1 for _, success in results if success) - failed = len(results) - passed - - for module_name, success in results: - status = "✅ PASS" if success else "❌ FAIL" - print(f"{status:10} {module_name}") - - print("-" * 60) - print(f"Total: {len(results)} tests") - print(f"Passed: {passed}") - print(f"Failed: {failed}") - - if failed == 0: - print("\n🎉 All import tests passed!") - else: - print(f"\n⚠️ {failed} import test(s) failed") - - return failed == 0 - -def test_basic_instantiation(): - """Test basic instantiation of MaxKB adapter classes""" - print("\n" + "=" * 60) - print("🔧 Testing Basic Instantiation") - print("=" * 60) - - results = [] - - # Test 1: Instantiate MaxKBAdapter - print("\n1. Testing MaxKBAdapter instantiation...") - try: - from common.handle.impl.mineru.maxkb_adapter.adapter import MaxKBAdapter - - adapter = MaxKBAdapter() - assert adapter is not None, "Adapter is None" - assert adapter.file_storage is not None, "File storage not initialized" - assert adapter.model_client is not None, "Model client not initialized" - - print(" ✅ MaxKBAdapter instantiated successfully") - results.append(("MaxKBAdapter", True)) - - except Exception as e: - print(f" ❌ Failed to instantiate MaxKBAdapter: {e}") - results.append(("MaxKBAdapter", False)) - - # Test 2: Instantiate MinerUExtractor - print("\n2. Testing MinerUExtractor instantiation...") - try: - from common.handle.impl.mineru.maxkb_adapter.adapter import MinerUExtractor - - extractor = MinerUExtractor( - llm_model_id="test_model", - vision_model_id="test_vision" - ) - assert extractor is not None, "Extractor is None" - assert extractor.llm_model_id == "test_model", "LLM model ID not set correctly" - assert extractor.vision_model_id == "test_vision", "Vision model ID not set correctly" - - print(" ✅ MinerUExtractor instantiated successfully") - results.append(("MinerUExtractor", True)) - - except Exception as e: - print(f" ❌ Failed to instantiate MinerUExtractor: {e}") - results.append(("MinerUExtractor", False)) - - # Test 3: Instantiate MinerUAdapter (with mocked init) - print("\n3. Testing MinerUAdapter instantiation...") - try: - from common.handle.impl.mineru.maxkb_adapter.adapter import MinerUAdapter - from unittest.mock import patch - - with patch.object(MinerUAdapter, '_init_extractor'): - adapter = MinerUAdapter() - assert adapter is not None, "Adapter is None" - - print(" ✅ MinerUAdapter instantiated successfully") - results.append(("MinerUAdapter", True)) - - except Exception as e: - print(f" ❌ Failed to instantiate MinerUAdapter: {e}") - results.append(("MinerUAdapter", False)) - - # Print summary - print("\n" + "=" * 60) - print("📊 Instantiation Test Summary") - print("=" * 60) - - passed = sum(1 for _, success in results if success) - failed = len(results) - passed - - for class_name, success in results: - status = "✅ PASS" if success else "❌ FAIL" - print(f"{status:10} {class_name}") - - print("-" * 60) - print(f"Total: {len(results)} tests") - print(f"Passed: {passed}") - print(f"Failed: {failed}") - - if failed == 0: - print("\n🎉 All instantiation tests passed!") - else: - print(f"\n⚠️ {failed} instantiation test(s) failed") - - return failed == 0 - -def main(): - """Main test function""" - print("\n" + "🚀 MaxKB Adapter Test Suite" + "\n") - - # Run import tests - import_success = test_imports() - - # Run instantiation tests only if imports succeeded - if import_success: - instantiation_success = test_basic_instantiation() - else: - print("\n⚠️ Skipping instantiation tests due to import failures") - instantiation_success = False - - # Final summary - print("\n" + "=" * 60) - print("🏁 Final Test Results") - print("=" * 60) - - if import_success and instantiation_success: - print("✅ All tests passed successfully!") - print("\nThe MaxKB adapter is properly configured and ready to use.") - return 0 - else: - print("❌ Some tests failed.") - print("\nPlease review the errors above and ensure all dependencies are installed.") - return 1 - -if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file diff --git a/test_media_async_demo.py b/test_media_async_demo.py deleted file mode 100644 index 3606d5c5..00000000 --- a/test_media_async_demo.py +++ /dev/null @@ -1,193 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -测试音视频异步处理流程 - 简化版本 -""" -import time - - -def test_async_flow_simulation(): - """模拟异步处理流程""" - print("🚀 音视频异步处理流程演示") - print("=" * 50) - - # 模拟文档信息 - document_id = "media-doc-001" - file_name = "会议录音.mp3" - stt_model = "whisper-large" - llm_model = "gpt-4" - - print(f"📄 文档信息:") - print(f" ID: {document_id}") - print(f" 文件名: {file_name}") - print(f" STT模型: {stt_model}") - print(f" LLM模型: {llm_model}") - - # 状态流程演示 - print(f"\n🔄 状态变更流程:") - - steps = [ - { - 'status': '排队中', - 'code': 'PENDING', - 'emoji': '📋', - 'description': '任务已提交,等待处理', - 'details': '文档已创建,异步任务已加入队列' - }, - { - 'status': '生成中', - 'code': 'STARTED', - 'emoji': '🔄', - 'description': '正在转写音视频内容', - 'details': '调用STT模型进行语音转写,LLM模型进行文本优化' - }, - { - 'status': '索引中', - 'code': 'STARTED', - 'emoji': '📚', - 'description': '正在创建段落和索引', - 'details': '创建段落对象,生成向量索引,更新文档统计' - }, - { - 'status': '完成', - 'code': 'SUCCESS', - 'emoji': '✅', - 'description': '处理完成', - 'details': '音视频内容已成功转写并索引,可供搜索' - } - ] - - for i, step in enumerate(steps, 1): - print(f"\n{i}. {step['emoji']} {step['status']} ({step['code']})") - print(f" 描述: {step['description']}") - print(f" 详情: {step['details']}") - - # 模拟处理时间 - if step['status'] == '排队中': - print(" ⏳ 等待工作线程处理...") - time.sleep(1) - elif step['status'] == '生成中': - print(" 🎵 正在转写音频内容...") - print(" 🤖 正在优化转写文本...") - time.sleep(2) - elif step['status'] == '索引中': - print(" 📝 创建段落对象...") - print(" 🔍 生成向量索引...") - time.sleep(1) - elif step['status'] == '完成': - print(" 📊 生成统计信息...") - print(" 🎉 处理完成!") - time.sleep(1) - - print(f"\n📊 处理结果:") - print(f" 📝 段落数量: 8") - print(f" 🔤 字符数量: 2,456") - print(f" ⏱️ 处理时长: 15分32秒") - print(f" 📝 内容预览: '今天的会议主要讨论了产品开发进度...'") - - print(f"\n🎯 用户可执行的操作:") - print(f" 🔍 搜索文档内容") - print(f" 📖 查看完整转写") - print(f" 📊 查看处理统计") - print(f" 🔄 重新处理(如需要)") - - -def test_error_scenario(): - """测试错误场景""" - print(f"\n❌ 错误处理场景演示:") - print("=" * 30) - - error_steps = [ - { - 'status': '排队中', - 'code': 'PENDING', - 'emoji': '📋', - 'description': '任务已提交,等待处理' - }, - { - 'status': '生成中', - 'code': 'STARTED', - 'emoji': '🔄', - 'description': '正在转写音视频内容' - }, - { - 'status': '失败', - 'code': 'FAILURE', - 'emoji': '💥', - 'description': '处理失败', - 'details': 'STT模型调用失败,请检查模型配置' - } - ] - - for i, step in enumerate(error_steps, 1): - print(f"\n{i}. {step['emoji']} {step['status']} ({step['code']})") - print(f" 描述: {step['description']}") - if 'details' in step: - print(f" 详情: {step['details']}") - time.sleep(1) - - print(f"\n🔧 错误处理:") - print(f" 📋 自动重试机制") - print(f" 📊 详细的错误日志") - print(f" 🔄 用户可手动重新处理") - print(f" 📧 系统管理员通知") - - -def test_batch_processing(): - """测试批量处理场景""" - print(f"\n📦 批量处理演示:") - print("=" * 30) - - documents = [ - {'name': '会议录音1.mp3', 'duration': '15:32'}, - {'name': '培训视频.mp4', 'duration': '45:18'}, - {'name': '产品介绍.mp3', 'duration': '8:45'}, - ] - - print(f"📋 批量上传 {len(documents)} 个音视频文件:") - - for i, doc in enumerate(documents, 1): - print(f"\n{i}. 📄 {doc['name']} ({doc['duration']})") - print(f" 📋 状态: 排队中 (PENDING)") - print(f" 🎬 任务已提交到异步队列") - time.sleep(0.5) - - print(f"\n🔄 并行处理中...") - print(f" 🎵 3个工作线程同时处理") - print(f" ⚡ 每个文件独立处理") - - time.sleep(2) - - print(f"\n✅ 批量处理完成:") - for i, doc in enumerate(documents, 1): - print(f" {i}. {doc['name']}: 完成 (SUCCESS)") - - -def main(): - """主函数""" - print("🎬 音视频异步处理完整流程演示") - print("=" * 60) - - # 运行测试 - test_async_flow_simulation() - test_error_scenario() - test_batch_processing() - - print(f"\n" + "=" * 60) - print("🎊 演示完成!") - - print(f"\n📋 核心特性:") - print(f"✅ 完全异步化处理") - print(f"✅ 详细的状态追踪") - print(f"✅ 错误处理和重试") - print(f"✅ 批量处理支持") - print(f"✅ 复用现有状态系统") - - print(f"\n🔄 状态流转:") - print(f"📋 排队中 → 🔄 生成中 → 📚 索引中 → ✅ 完成") - print(f" ↓") - print(f" 💥 失败") - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/test_media_async_flow.py b/test_media_async_flow.py deleted file mode 100644 index 5d0a175a..00000000 --- a/test_media_async_flow.py +++ /dev/null @@ -1,249 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -测试音视频异步处理流程 -""" -import os -import sys -import django -import time -from unittest.mock import Mock - -# 设置Django环境 -sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB') -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings') -django.setup() - -from django.db.models import QuerySet -from knowledge.models import Document, Paragraph, TaskType, State -from common.event import ListenerManagement -from knowledge.tasks.media_learning import media_learning_by_document -from knowledge.serializers.document import DocumentSerializers - - -class MockLogger: - """模拟日志器""" - def info(self, msg): - print(f"[INFO] {msg}") - - def warning(self, msg): - print(f"[WARNING] {msg}") - - def error(self, msg, exc_info=False): - print(f"[ERROR] {msg}") - - -def test_status_flow(): - """测试状态流程""" - print("=== 测试音视频异步处理状态流程 ===") - - # 创建模拟文档 - document_id = "test-media-doc-001" - knowledge_id = "test-knowledge-001" - workspace_id = "test-workspace-001" - stt_model_id = "test-stt-model" - llm_model_id = "test-llm-model" - - print(f"📋 测试文档ID: {document_id}") - print(f"🎵 STT模型ID: {stt_model_id}") - print(f"🤖 LLM模型ID: {llm_model_id}") - - # 模拟文档对象 - mock_document = Mock() - mock_document.id = document_id - mock_document.name = "测试音视频文件.mp3" - mock_document.meta = {'source_file_id': 'test-file-001'} - - # 模拟查询集 - mock_queryset = Mock() - mock_queryset.filter.return_value.first.return_value = mock_document - - # 模拟源文件 - mock_file = Mock() - mock_file.file_name = "测试音视频文件.mp3" - mock_file.get_bytes.return_value = b"fake audio content" - - # 模拟文件查询 - original_file_filter = QuerySet.__dict__['filter'] - - def mock_filter(self, **kwargs): - if 'id' in kwargs and kwargs['id'] == 'test-file-001': - file_queryset = Mock() - file_queryset.first.return_value = mock_file - return file_queryset - elif 'id' in kwargs and kwargs['id'] == document_id: - doc_queryset = Mock() - doc_queryset.first.return_value = mock_document - return doc_queryset - return mock_queryset - - # 临时替换查询方法 - QuerySet.filter = mock_filter - - try: - print("\n🔄 测试状态更新流程...") - - # 1. 测试排队中状态 - print("\n1️⃣ 设置排队中状态 (PENDING)") - ListenerManagement.update_status( - QuerySet(Document).filter(id=document_id), - TaskType.EMBEDDING, - State.PENDING - ) - print(f"✅ 状态已更新为: PENDING") - - # 等待1秒模拟排队时间 - time.sleep(1) - - # 2. 测试生成中状态 - print("\n2️⃣ 设置生成中状态 (STARTED - 生成中)") - ListenerManagement.update_status( - QuerySet(Document).filter(id=document_id), - TaskType.EMBEDDING, - State.STARTED - ) - print(f"✅ 状态已更新为: STARTED (生成中)") - - # 等待2秒模拟处理时间 - time.sleep(2) - - # 3. 测试索引中状态(通过日志区分) - print("\n3️⃣ 设置索引中状态 (STARTED - 索引中)") - print("📚 状态保持为STARTED,但进入索引中阶段") - - # 等待1秒模拟索引时间 - time.sleep(1) - - # 4. 测试完成状态 - print("\n4️⃣ 设置完成状态 (SUCCESS)") - ListenerManagement.update_status( - QuerySet(Document).filter(id=document_id), - TaskType.EMBEDDING, - State.SUCCESS - ) - print(f"✅ 状态已更新为: SUCCESS") - - print("\n🎉 状态流程测试完成!") - - except Exception as e: - print(f"❌ 测试失败: {e}") - import traceback - traceback.print_exc() - - finally: - # 恢复原始查询方法 - QuerySet.filter = original_file_filter - - -def test_document_creation(): - """测试文档创建流程""" - print("\n=== 测试文档创建和异步任务触发 ===") - - # 模拟文档数据 - document_data = { - 'name': '测试音视频文件.mp3', - 'source_file_id': 'test-file-001', - 'stt_model_id': 'test-stt-model', - 'llm_model_id': 'test-llm-model', - 'paragraphs': [], # 异步处理时为空 - 'is_media_async': True - } - - print(f"📄 创建音视频文档: {document_data['name']}") - print(f"🎵 STT模型: {document_data['stt_model_id']}") - print(f"🤖 LLM模型: {document_data['llm_model_id']}") - print(f"⏳ 异步处理: {'是' if document_data.get('is_media_async') else '否'}") - - # 模拟批量保存过程 - instance_list = [document_data] - knowledge_id = "test-knowledge-001" - workspace_id = "test-workspace-001" - - print("\n🔄 模拟批量保存流程...") - - # 模拟文档ID生成 - document_id = "generated-doc-001" - document_result_list = [{'id': document_id}] - - print(f"📋 生成文档ID: {document_id}") - - # 模拟异步任务触发 - for idx, document in enumerate(instance_list): - stt_model_id = document.get('stt_model_id') - - if idx < len(document_result_list) and stt_model_id: - doc_id = document_result_list[idx].get('id') - - print(f"\n🎬 触发音视频异步任务...") - print(f"📋 文档ID: {doc_id}") - print(f"🎵 STT模型: {stt_model_id}") - print(f"📊 状态: PENDING (排队中)") - - # 模拟任务提交 - print(f"✅ 异步任务已提交到队列") - - print("\n🎉 文档创建流程测试完成!") - - -def test_async_task_simulation(): - """模拟异步任务执行""" - print("\n=== 模拟异步任务执行流程 ===") - - document_id = "test-media-doc-001" - - print(f"🎬 开始异步处理文档: {document_id}") - - # 模拟任务执行步骤 - steps = [ - ("📋", "排队中", "PENDING", "任务已提交,等待处理"), - ("🔄", "生成中", "STARTED", "正在转写音视频内容"), - ("📚", "索引中", "STARTED", "正在创建段落和索引"), - ("✅", "完成", "SUCCESS", "处理完成"), - ] - - for emoji, stage, status, description in steps: - print(f"\n{emoji} {stage} ({status})") - print(f" {description}") - - if stage == "排队中": - print(" ⏳ 等待工作线程处理...") - elif stage == "生成中": - print(" 🎵 正在调用STT模型转写音频...") - print(" 🤖 正在调用LLM模型优化文本...") - elif stage == "索引中": - print(" 📝 正在创建段落对象...") - print(" 🔍 正在生成向量索引...") - elif stage == "完成": - print(" 🎉 音视频处理完成!") - print(" 📊 段落数量: 5") - print(" 📝 字符数量: 1,234") - - # 模拟处理时间 - time.sleep(1) - - print("\n🎉 异步任务执行流程测试完成!") - - -def main(): - """主测试函数""" - print("🚀 开始音视频异步处理流程测试") - print("=" * 50) - - # 运行测试 - test_status_flow() - test_document_creation() - test_async_task_simulation() - - print("\n" + "=" * 50) - print("🎊 所有测试完成!") - - print("\n📋 状态流程总结:") - print("1. 排队中 (PENDING) - 文档创建,任务提交") - print("2. 生成中 (STARTED) - 音视频转写处理") - print("3. 索引中 (STARTED) - 段落创建和向量化") - print("4. 完成 (SUCCESS) - 处理完成") - print("5. 失败 (FAILURE) - 处理失败") - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/test_media_processing.py b/test_media_processing.py deleted file mode 100644 index 22aa453d..00000000 --- a/test_media_processing.py +++ /dev/null @@ -1,134 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -测试音视频处理功能 -""" -import sys -import os -sys.path.append('apps') - -def test_media_handler(): - """测试音视频处理器""" - print("测试音视频处理器...") - - try: - from common.handle.impl.media.media_split_handle import MediaSplitHandle - from common.handle.impl.media.media_adapter import MediaAdapter - - # 创建处理器 - handler = MediaSplitHandle() - print("✓ MediaSplitHandle 创建成功") - - # 测试文件类型支持 - class MockFile: - def __init__(self, name, content=b'test'): - self.name = name - self.content = content - self.size = len(content) - - def read(self): - return self.content - - def seek(self, pos): - pass - - # 测试音频文件支持 - audio_files = ['test.mp3', 'test.wav', 'test.m4a', 'test.flac'] - for filename in audio_files: - file = MockFile(filename) - if handler.support(file, lambda x: x.read()): - print(f"✓ {filename} 支持") - else: - print(f"✗ {filename} 不支持") - - # 测试视频文件支持 - video_files = ['test.mp4', 'test.avi', 'test.mov', 'test.mkv'] - for filename in video_files: - file = MockFile(filename) - if handler.support(file, lambda x: x.read()): - print(f"✓ {filename} 支持") - else: - print(f"✗ {filename} 不支持") - - # 测试非媒体文件 - other_files = ['test.txt', 'test.pdf', 'test.docx'] - for filename in other_files: - file = MockFile(filename) - if not handler.support(file, lambda x: x.read()): - print(f"✓ {filename} 正确排除") - else: - print(f"✗ {filename} 错误支持") - - print("\n✓ 所有文件类型测试通过") - - except Exception as e: - print(f"✗ 测试失败: {e}") - import traceback - traceback.print_exc() - return False - - return True - -def test_media_adapter(): - """测试媒体适配器""" - print("\n测试媒体适配器...") - - try: - from common.handle.impl.media.media_adapter import MediaAdapter - - # 创建适配器 - adapter = MediaAdapter() - print("✓ MediaAdapter 创建成功") - - # 测试配置 - if adapter.config: - print("✓ 配置加载成功") - print(f" - STT Provider: {adapter.config.get('stt_provider')}") - print(f" - Max Duration: {adapter.config.get('max_duration')}秒") - print(f" - Segment Duration: {adapter.config.get('segment_duration')}秒") - - # 测试媒体类型检测 - test_cases = [ - ('test.mp3', 'audio'), - ('test.mp4', 'video'), - ('test.wav', 'audio'), - ('test.avi', 'video'), - ] - - for filename, expected_type in test_cases: - detected_type = adapter._detect_media_type(filename) - if detected_type == expected_type: - print(f"✓ {filename} -> {detected_type}") - else: - print(f"✗ {filename} -> {detected_type} (期望: {expected_type})") - - print("\n✓ 适配器测试通过") - - except Exception as e: - print(f"✗ 测试失败: {e}") - import traceback - traceback.print_exc() - return False - - return True - -if __name__ == '__main__': - print("=" * 50) - print("音视频学习模块测试") - print("=" * 50) - - success = True - - # 运行测试 - if not test_media_handler(): - success = False - - if not test_media_adapter(): - success = False - - print("\n" + "=" * 50) - if success: - print("✅ 所有测试通过!") - else: - print("❌ 部分测试失败") - print("=" * 50) \ No newline at end of file diff --git a/test_mineru_async_fix.py b/test_mineru_async_fix.py deleted file mode 100644 index a9a59aa6..00000000 --- a/test_mineru_async_fix.py +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/env python -""" -测试 MinerU 异步上下文修复 -""" - -import os -import sys -import asyncio -import django - -# 设置 Django 环境 -sys.path.append(os.path.dirname(os.path.abspath(__file__))) -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'smartdoc.settings') -django.setup() - -from apps.common.handle.impl.mineru.maxkb_adapter.maxkb_model_client import maxkb_model_client - - -async def test_async_model_calls(): - """测试异步模型调用""" - print("测试异步模型调用...") - - # 测试获取 LLM 模型 - try: - print("\n1. 测试获取 LLM 模型...") - llm_model = await maxkb_model_client.get_llm_model("0198cbd9-c1a6-7b13-b16d-d85ad77ac03d") - if llm_model: - print(" ✓ LLM 模型获取成功") - else: - print(" ✗ LLM 模型获取失败") - except Exception as e: - print(f" ✗ LLM 模型获取出错: {e}") - - # 测试获取视觉模型 - try: - print("\n2. 测试获取视觉模型...") - vision_model = await maxkb_model_client.get_vision_model("0198cbd9-c1a6-7b13-b16d-d85ad77ac03d") - if vision_model: - print(" ✓ 视觉模型获取成功") - else: - print(" ✗ 视觉模型获取失败") - except Exception as e: - print(f" ✗ 视觉模型获取出错: {e}") - - # 测试聊天完成 - try: - print("\n3. 测试聊天完成...") - messages = [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "Hello, this is a test."} - ] - response = await maxkb_model_client.chat_completion( - "0198cbd9-c1a6-7b13-b16d-d85ad77ac03d", - messages - ) - if response: - print(f" ✓ 聊天完成成功: {response[:100]}...") - else: - print(" ✗ 聊天完成返回空响应") - except Exception as e: - print(f" ✗ 聊天完成出错: {e}") - - # 测试模型验证 - try: - print("\n4. 测试模型验证...") - is_valid = await maxkb_model_client.validate_model("0198cbd9-c1a6-7b13-b16d-d85ad77ac03d") - if is_valid: - print(" ✓ 模型验证成功") - else: - print(" ✗ 模型不存在或无效") - except Exception as e: - print(f" ✗ 模型验证出错: {e}") - - print("\n测试完成!") - - -async def test_mineru_image_processing(): - """测试 MinerU 图像处理流程""" - print("\n测试 MinerU 图像处理流程...") - - from apps.common.handle.impl.mineru.config_base import MinerUConfig - from apps.common.handle.impl.mineru.image_processor import MinerUImageProcessor - - # 创建配置 - config = MinerUConfig() - - # 创建图像处理器 - processor = MinerUImageProcessor(config) - await processor.initialize() - - print("✓ 图像处理器初始化成功") - - # 清理资源 - await processor.cleanup() - print("✓ 图像处理器清理成功") - - -async def main(): - """主测试函数""" - print("=" * 60) - print("MinerU 异步上下文修复测试") - print("=" * 60) - - # 测试异步模型调用 - await test_async_model_calls() - - # 测试图像处理流程 - await test_mineru_image_processing() - - print("\n" + "=" * 60) - print("所有测试完成!") - print("=" * 60) - - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/test_model_config.py b/test_model_config.py deleted file mode 100644 index 4827345f..00000000 --- a/test_model_config.py +++ /dev/null @@ -1,101 +0,0 @@ -#!/usr/bin/env python3 -""" -测试模型ID配置是否正确传递 -""" - -import os -import sys -from pathlib import Path - -# Add paths -project_root = Path(__file__).parent -sys.path.insert(0, str(project_root)) -apps_path = project_root / 'apps' -if apps_path.exists(): - sys.path.insert(0, str(apps_path)) - -# 模拟传入的模型ID -TEST_LLM_ID = "0198e029-bfeb-7d43-a6ee-c88662697d3c" -TEST_VISION_ID = "0198e02c-9f2e-7520-a27b-6376ad42d520" - -def test_config_creation(): - """测试配置创建""" - print("=" * 60) - print("Testing MaxKBMinerUConfig creation") - print("=" * 60) - - from apps.common.handle.impl.mineru.maxkb_adapter.config_maxkb import MaxKBMinerUConfig - - # 方法1:直接创建(使用默认值或环境变量) - print("\n1. Default creation:") - config1 = MaxKBMinerUConfig() - print(f" LLM ID: {config1.llm_model_id}") - print(f" Vision ID: {config1.vision_model_id}") - - # 方法2:使用工厂方法 - print("\n2. Factory method creation:") - config2 = MaxKBMinerUConfig.create( - llm_model_id=TEST_LLM_ID, - vision_model_id=TEST_VISION_ID - ) - print(f" LLM ID: {config2.llm_model_id}") - print(f" Vision ID: {config2.vision_model_id}") - - # 验证 - print("\n3. Verification:") - if config2.llm_model_id == TEST_LLM_ID: - print(" ✅ LLM ID correctly set") - else: - print(f" ❌ LLM ID mismatch: expected {TEST_LLM_ID}, got {config2.llm_model_id}") - - if config2.vision_model_id == TEST_VISION_ID: - print(" ✅ Vision ID correctly set") - else: - print(f" ❌ Vision ID mismatch: expected {TEST_VISION_ID}, got {config2.vision_model_id}") - - return config2 - -def test_model_selection(): - """测试模型选择逻辑""" - print("\n" + "=" * 60) - print("Testing model selection logic") - print("=" * 60) - - config = MaxKBMinerUConfig.create( - llm_model_id=TEST_LLM_ID, - vision_model_id=TEST_VISION_ID - ) - - # 模拟 call_litellm 中的逻辑 - print("\n1. When use_llm=True:") - use_llm = True - if use_llm: - model_id = config.llm_model_id - else: - model_id = config.vision_model_id - print(f" Selected model ID: {model_id}") - print(f" Expected: {TEST_LLM_ID}") - print(f" Match: {model_id == TEST_LLM_ID}") - - print("\n2. When use_llm=False:") - use_llm = False - if use_llm: - model_id = config.llm_model_id - else: - model_id = config.vision_model_id - print(f" Selected model ID: {model_id}") - print(f" Expected: {TEST_VISION_ID}") - print(f" Match: {model_id == TEST_VISION_ID}") - -if __name__ == "__main__": - print("Testing Model Configuration") - print("=" * 60) - print(f"Test LLM ID: {TEST_LLM_ID}") - print(f"Test Vision ID: {TEST_VISION_ID}") - - config = test_config_creation() - test_model_selection() - - print("\n" + "=" * 60) - print("Test completed!") - print("=" * 60) \ No newline at end of file diff --git a/test_simple_async_audio.py b/test_simple_async_audio.py deleted file mode 100644 index 1ce498b2..00000000 --- a/test_simple_async_audio.py +++ /dev/null @@ -1,166 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -测试简化异步音频处理功能 -""" -import os -import sys -import asyncio -import time -from unittest.mock import Mock - -# 添加项目路径 -sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB') - -from apps.common.handle.impl.media.media_adapter.simple_async_audio_processor import SimpleAsyncAudioProcessor -from apps.common.handle.impl.media.media_adapter.logger import MediaLogger - - -class MockLogger: - """模拟日志器""" - def info(self, msg): - print(f"[INFO] {msg}") - - def warning(self, msg): - print(f"[WARNING] {msg}") - - def error(self, msg, exc_info=False): - print(f"[ERROR] {msg}") - - -async def test_simple_async_processor(): - """测试简化异步处理器""" - print("=== 测试简化异步音频处理器 ===") - - # 创建配置 - config = { - 'queue_size': 10, - 'worker_count': 2, # 2个工作线程 - 'async_processing': True - } - - # 创建日志包装器 - mock_logger = MockLogger() - logger_wrapper = MediaLogger(mock_logger) - - # 创建简化异步处理器 - processor = SimpleAsyncAudioProcessor(config, logger_wrapper) - - # 模拟音频数据 - test_audio_content = b"fake audio content for testing" - test_file_name = "test_audio.mp3" - - # 模拟STT和LLM模型 - stt_model = Mock() - stt_model.invoke = Mock(return_value="这是测试转写结果") - - llm_model = Mock() - llm_model.invoke = Mock(return_value="这是增强后的文本,带有标点符号。") - - # 测试选项 - options = { - 'enable_punctuation': True, - 'enable_summary': True, - 'segment_duration': 60, # 1分钟分段 - 'language': 'zh-CN' - } - - try: - print("开始测试简化异步音频处理...") - - # 模拟音频时长为3分钟 - async def mock_get_duration(content): - return 180.0 - processor._get_audio_duration_async = mock_get_duration - - # 处理音频 - start_time = time.time() - result = await processor.process_audio_async( - test_audio_content, test_file_name, stt_model, llm_model, options - ) - end_time = time.time() - - print(f"处理完成,耗时: {end_time - start_time:.2f}秒") - print(f"结果状态: {result['status']}") - print(f"音频时长: {result['duration']:.1f}秒") - print(f"分段数量: {len(result['segments'])}") - print(f"完整文本长度: {len(result['full_text'])}") - print(f"工作线程数: {result['metadata']['worker_count']}") - - # 显示队列状态 - queue_status = processor.get_queue_status() - print(f"队列状态: {queue_status}") - - # 关闭处理器 - await processor.shutdown() - - print("简化版本测试完成!") - - except Exception as e: - print(f"测试失败: {e}") - import traceback - traceback.print_exc() - - -def test_audio_processor_integration(): - """测试音频处理器集成""" - print("\n=== 测试音频处理器集成 ===") - - from apps.common.handle.impl.media.media_adapter.processors.audio_processor import AudioProcessor - - # 创建配置 - config = { - 'async_processing': True, # 启用异步处理 - 'worker_count': 2 - } - - # 创建处理器 - processor = AudioProcessor(config, MockLogger()) - - # 模拟音频数据 - test_audio_content = b"fake audio content for testing" - test_file_name = "test_audio.mp3" - - # 模拟STT和LLM模型 - stt_model = Mock() - stt_model.invoke = Mock(return_value="这是测试转写结果") - - llm_model = Mock() - llm_model.invoke = Mock(return_value="这是增强后的文本,带有标点符号。") - - # 测试选项 - options = { - 'async_processing': True, # 显式启用异步 - 'enable_punctuation': True, - 'enable_summary': True, - 'segment_duration': 60, - 'language': 'zh-CN' - } - - try: - print("开始测试音频处理器异步集成...") - - # 处理音频 - start_time = time.time() - result = processor.process( - test_audio_content, test_file_name, stt_model, llm_model, options - ) - end_time = time.time() - - print(f"处理完成,耗时: {end_time - start_time:.2f}秒") - print(f"结果状态: {result['status']}") - print(f"音频时长: {result.get('duration', 0):.1f}秒") - print(f"分段数量: {len(result.get('segments', []))}") - - print("音频处理器集成测试完成!") - - except Exception as e: - print(f"音频处理器集成测试失败: {e}") - import traceback - traceback.print_exc() - - -if __name__ == "__main__": - # 运行测试 - asyncio.run(test_simple_async_processor()) - test_audio_processor_integration() \ No newline at end of file diff --git a/test_storage.py b/test_storage.py deleted file mode 100644 index 84feeefe..00000000 --- a/test_storage.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/env python -""" -测试MinerU图片存储和访问功能 - -使用方法: -1. 在本地开发环境:python test_storage.py -2. 在Docker环境:docker exec -it maxkb-dev python /opt/maxkb-app/test_storage.py -""" - -import os -import sys -import tempfile -import shutil -from pathlib import Path - -def test_storage(): - """测试存储功能""" - print("=" * 60) - print("MinerU 图片存储测试") - print("=" * 60) - - # 1. 检查存储路径配置 - storage_path = os.getenv('MAXKB_STORAGE_PATH', '/opt/maxkb/storage') - print(f"\n1. 存储路径配置:{storage_path}") - - # 2. 创建测试目录结构 - test_dir = os.path.join(storage_path, 'test', 'images') - print(f"\n2. 创建测试目录:{test_dir}") - os.makedirs(test_dir, exist_ok=True) - - # 3. 创建测试图片文件 - test_image_path = os.path.join(test_dir, 'test_image.txt') - print(f"\n3. 创建测试文件:{test_image_path}") - with open(test_image_path, 'w') as f: - f.write("This is a test image file for MinerU storage") - - # 4. 验证文件创建 - if os.path.exists(test_image_path): - print(" ✓ 文件创建成功") - file_size = os.path.getsize(test_image_path) - print(f" 文件大小:{file_size} bytes") - else: - print(" ✗ 文件创建失败") - return False - - # 5. 生成访问URL - relative_path = os.path.relpath(test_image_path, storage_path) - access_url = f"/api/storage/{relative_path}" - print(f"\n4. 生成的访问URL:{access_url}") - - # 6. 列出存储目录内容 - print(f"\n5. 存储目录内容:") - for root, dirs, files in os.walk(storage_path): - level = root.replace(storage_path, '').count(os.sep) - indent = ' ' * 2 * level - print(f'{indent}{os.path.basename(root)}/') - subindent = ' ' * 2 * (level + 1) - for file in files: - print(f'{subindent}{file}') - - print("\n" + "=" * 60) - print("测试完成!") - print("\n配置建议:") - print("1. 确保Docker volume正确挂载:~/.maxkb/storage:/opt/maxkb/storage") - print("2. 确保环境变量设置:MAXKB_STORAGE_PATH=/opt/maxkb/storage") - print("3. 访问图片URL格式:http://localhost:8080/api/storage/mineru/images/xxx.jpg") - print("=" * 60) - - return True - -def test_mineru_adapter(): - """测试MinerU适配器""" - print("\n" + "=" * 60) - print("测试MinerU适配器") - print("=" * 60) - - # 添加apps目录到Python路径 - sys.path.insert(0, '/opt/maxkb-app/apps' if os.path.exists('/opt/maxkb-app/apps') else './apps') - - try: - from common.handle.impl.mineru.maxkb_adapter.adapter import MaxKBAdapter - - print("\n1. 创建MaxKB适配器实例") - adapter = MaxKBAdapter() - print(f" 存储路径:{adapter.storage_path}") - - # 创建临时测试文件 - with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as tmp: - tmp.write(b"Test image content") - tmp_path = tmp.name - - print(f"\n2. 测试upload_file方法") - print(f" 源文件:{tmp_path}") - - # 使用异步方式调用 - import asyncio - async def test_upload(): - result = await adapter.upload_file(tmp_path, options=['test_knowledge']) - return result - - # 运行异步测试 - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - result_url = loop.run_until_complete(test_upload()) - print(f" 返回URL:{result_url}") - - # 清理临时文件 - os.unlink(tmp_path) - - print("\n✓ MinerU适配器测试成功") - - except ImportError as e: - print(f"\n✗ 无法导入MinerU适配器:{e}") - print(" 请确保在MaxKB环境中运行此测试") - except Exception as e: - print(f"\n✗ 测试失败:{e}") - import traceback - traceback.print_exc() - -if __name__ == "__main__": - # 运行存储测试 - if test_storage(): - # 如果基础存储测试成功,尝试测试适配器 - try: - test_mineru_adapter() - except: - print("\n提示:适配器测试需要在MaxKB环境中运行") \ No newline at end of file diff --git a/test_storage_simple.py b/test_storage_simple.py deleted file mode 100644 index ecf2958f..00000000 --- a/test_storage_simple.py +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env python -""" -简单的存储测试 - 创建测试图片 -""" -import os - -# 创建存储目录 -storage_path = './tmp/maxkb/storage/mineru/images' -os.makedirs(storage_path, exist_ok=True) - -# 创建测试图片(实际是一个文本文件,但后缀是.jpg) -test_file = os.path.join(storage_path, 'ac3681aaa7a346b49ef9c7ceb7b94058.jpg') -with open(test_file, 'wb') as f: - # 写入一个最小的JPEG文件头(这样浏览器会识别为图片) - # FF D8 FF E0 是JPEG文件的魔术数字 - f.write(bytes.fromhex('FFD8FFE000104A46494600010101006000600000FFDB004300080606070605080707070909080A0C140D0C0B0B0C1912130F141D1A1F1E1D1A1C1C20242E2720222C231C1C2837292C30313434341F27393D38323C2E333432FFDB0043010909090C0B0C180D0D1832211C2132323232323232323232323232323232323232323232323232323232323232323232323232323232323232323232323232FFC00011080001000103012200021101031101FFC4001F0000010501010101010100000000000000000102030405060708090A0BFFC400B5100002010303020403050504040000017D01020300041105122131410613516107227114328191A1082342B1C11552D1F02433627282090A161718191A25262728292A3435363738393A434445464748494A535455565758595A636465666768696A737475767778797A838485868788898A92939495969798999AA2A3A4A5A6A7A8A9AAB2B3B4B5B6B7B8B9BAC2C3C4C5C6C7C8C9CAD2D3D4D5D6D7D8D9DAE1E2E3E4E5E6E7E8E9EAF1F2F3F4F5F6F7F8F9FAFFC4001F0100030101010101010101010000000000000102030405060708090A0BFFC400B51100020102040403040705040400010277000102031104052131061241510761711322328108144291A1B1C109233352F0156272D10A162434E125F11718191A262728292A35363738393A434445464748494A535455565758595A636465666768696A737475767778797A82838485868788898A92939495969798999AA2A3A4A5A6A7A8A9AAB2B3B4B5B6B7B8B9BAC2C3C4C5C6C7C8C9CAD2D3D4D5D6D7D8D9DAE2E3E4E5E6E7E8E9EAF2F3F4F5F6F7F8F9FAFFDA000C03010002110311003F00F9FFD9')) - -print(f"测试文件已创建:{test_file}") -print(f"文件大小:{os.path.getsize(test_file)} bytes") -print("\n访问URL:") -print("http://localhost:8080/storage/mineru/images/ac3681aaa7a346b49ef9c7ceb7b94058.jpg") -print("\n如果Django服务正在运行,可以直接在浏览器中访问上述URL") \ No newline at end of file diff --git a/test_url_fix.py b/test_url_fix.py deleted file mode 100644 index 0833b3f5..00000000 --- a/test_url_fix.py +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/env python3 -""" -测试URL修复 - 验证platform_adapter是否正确传递 -""" - -import os -import sys -import asyncio -from pathlib import Path - -# Add paths -project_root = Path(__file__).parent -sys.path.insert(0, str(project_root)) -apps_path = project_root / 'apps' -if apps_path.exists(): - sys.path.insert(0, str(apps_path)) - -# Set environment variables for testing -os.environ['MAXKB_BASE_URL'] = 'http://xbase.aitravelmaster.com' -os.environ['MINERU_API_TYPE'] = 'cloud' # Force cloud mode for testing - -async def test_url_generation(): - """Test that URLs are generated correctly""" - - # Import after setting environment - from apps.common.handle.impl.mineru.maxkb_adapter.adapter import MaxKBAdapter - - # Create adapter - adapter = MaxKBAdapter() - - # Create a test file - import tempfile - with tempfile.NamedTemporaryFile(mode='w', suffix='.pdf', delete=False) as f: - f.write('test') - test_file = f.name - - try: - # Test upload_file - print("Testing MaxKBAdapter.upload_file()...") - url = await adapter.upload_file(test_file, ['test_knowledge_id']) - - print(f"\n✅ Generated URL: {url}") - - # Verify URL format - if url.startswith('http://') or url.startswith('https://'): - print("✅ URL is properly formatted for Cloud API") - else: - print(f"❌ URL is not valid for Cloud API: {url}") - - # Check if MAXKB_BASE_URL is used - base_url = os.environ.get('MAXKB_BASE_URL', '') - if base_url and url.startswith(base_url): - print(f"✅ URL correctly uses MAXKB_BASE_URL: {base_url}") - else: - print(f"❌ URL does not use MAXKB_BASE_URL") - - finally: - # Clean up - if os.path.exists(test_file): - os.unlink(test_file) - -async def test_api_client_with_adapter(): - """Test that MinerUAPIClient receives platform_adapter correctly""" - - from apps.common.handle.impl.mineru.api_client import MinerUAPIClient - from apps.common.handle.impl.mineru.maxkb_adapter.adapter import MaxKBAdapter - from apps.common.handle.impl.mineru.maxkb_adapter.config_maxkb import MaxKBMinerUConfig - - print("\nTesting MinerUAPIClient with platform_adapter...") - - # Create components - adapter = MaxKBAdapter() - config = MaxKBMinerUConfig() - - # Create API client with adapter - api_client = MinerUAPIClient(config, adapter) - - # Check if adapter is set - if api_client.platform_adapter is not None: - print("✅ platform_adapter is correctly set in MinerUAPIClient") - else: - print("❌ platform_adapter is None in MinerUAPIClient") - - # Test _upload_file_to_accessible_url - import tempfile - with tempfile.NamedTemporaryFile(mode='w', suffix='.pdf', delete=False) as f: - f.write('test') - test_file = f.name - - try: - # Test upload through API client - async with api_client: - url = await api_client._upload_file_to_accessible_url(test_file, 'test_src_id') - print(f"✅ URL from _upload_file_to_accessible_url: {url}") - - if url.startswith('http://') or url.startswith('https://'): - print("✅ API client generates valid URL for Cloud API") - else: - print(f"❌ API client generates invalid URL: {url}") - - finally: - if os.path.exists(test_file): - os.unlink(test_file) - -if __name__ == "__main__": - print("=" * 60) - print("Testing MinerU Cloud API URL Fix") - print("=" * 60) - - # Check environment - print("\nEnvironment:") - print(f"MAXKB_BASE_URL: {os.environ.get('MAXKB_BASE_URL', 'NOT SET')}") - print(f"MINERU_API_TYPE: {os.environ.get('MINERU_API_TYPE', 'NOT SET')}") - - # Run tests - asyncio.run(test_url_generation()) - asyncio.run(test_api_client_with_adapter()) - - print("\n" + "=" * 60) - print("Test completed!") - print("=" * 60) \ No newline at end of file diff --git a/test_url_simple.py b/test_url_simple.py deleted file mode 100644 index dfab4242..00000000 --- a/test_url_simple.py +++ /dev/null @@ -1,94 +0,0 @@ -#!/usr/bin/env python3 -""" -简单测试URL生成逻辑 -""" - -import os -import tempfile -import shutil -import uuid - -# 设置环境变量 -os.environ['MAXKB_BASE_URL'] = 'http://xbase.aitravelmaster.com' - -def test_url_generation(): - """模拟adapter.py中的upload_file逻辑""" - - # 创建测试文件 - with tempfile.NamedTemporaryFile(mode='w', suffix='.pdf', delete=False) as f: - f.write('test') - file_path = f.name - - try: - # 模拟upload_file的逻辑 - storage_path = '/tmp/storage' # 模拟存储路径 - - # 创建存储目录 - sub_dir = 'mineru' - storage_dir = os.path.join(storage_path, sub_dir, 'images') - os.makedirs(storage_dir, exist_ok=True) - - # 生成文件名 - file_ext = os.path.splitext(file_path)[1] - file_name = f"{uuid.uuid4().hex}{file_ext}" - dest_path = os.path.join(storage_dir, file_name) - - # 复制文件 - shutil.copy2(file_path, dest_path) - - # 生成URL(这是关键部分) - relative_path = os.path.relpath(dest_path, storage_path) - relative_path = relative_path.replace(os.path.sep, '/') - - # 检查环境变量 - base_url = os.getenv('MAXKB_BASE_URL', '') - print(f"MAXKB_BASE_URL from env: '{base_url}'") - print(f"Relative path: {relative_path}") - - if base_url: - result_url = f"{base_url.rstrip('/')}/storage/{relative_path}" - print(f"✅ Generated full URL: {result_url}") - else: - result_url = f"/storage/{relative_path}" - print(f"⚠️ Generated relative URL: {result_url}") - - # 验证URL格式 - if result_url.startswith(('http://', 'https://')): - print("✅ URL is valid for Cloud API") - else: - print("❌ URL is NOT valid for Cloud API (must start with http:// or https://)") - - return result_url - - finally: - # 清理 - if os.path.exists(file_path): - os.unlink(file_path) - # 清理存储目录 - if os.path.exists('/tmp/storage'): - shutil.rmtree('/tmp/storage') - -if __name__ == "__main__": - print("=" * 60) - print("Testing URL Generation Logic") - print("=" * 60) - print() - - # 测试1:有MAXKB_BASE_URL - print("Test 1: With MAXKB_BASE_URL set") - print("-" * 40) - url1 = test_url_generation() - - print("\n" + "=" * 60) - - # 测试2:没有MAXKB_BASE_URL - print("\nTest 2: Without MAXKB_BASE_URL") - print("-" * 40) - os.environ['MAXKB_BASE_URL'] = '' - url2 = test_url_generation() - - print("\n" + "=" * 60) - print("Summary:") - print(f"With MAXKB_BASE_URL: {url1}") - print(f"Without MAXKB_BASE_URL: {url2}") - print("=" * 60) \ No newline at end of file