remove test
Some checks failed
sync2gitee / repo-sync (push) Has been cancelled
Typos Check / Spell Check with Typos (push) Has been cancelled

This commit is contained in:
朱潮 2025-08-31 11:18:01 +08:00
parent 5f9f2a9325
commit ec6e699390
21 changed files with 0 additions and 2754 deletions

View File

@ -1,168 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试异步音频处理功能
"""
import os
import sys
import asyncio
import time
from unittest.mock import Mock, MagicMock
# 添加项目路径
sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB')
from apps.common.handle.impl.media.media_adapter.async_audio_processor import AsyncAudioProcessor
from apps.common.handle.impl.media.media_adapter.logger import MediaLogger
class MockLogger:
"""模拟日志器"""
def info(self, msg):
print(f"[INFO] {msg}")
def warning(self, msg):
print(f"[WARNING] {msg}")
def error(self, msg, exc_info=False):
print(f"[ERROR] {msg}")
async def test_async_processor():
"""测试异步处理器"""
print("=== 测试异步音频处理器 ===")
# 创建配置
config = {
'queue_size': 5,
'async_processing': True
}
# 创建日志包装器
mock_logger = MockLogger()
logger_wrapper = MediaLogger(mock_logger)
# 创建异步处理器
processor = AsyncAudioProcessor(config, logger_wrapper)
# 模拟音频数据(创建一个简单的测试音频文件)
test_audio_content = b"fake audio content for testing"
test_file_name = "test_audio.mp3"
# 模拟STT和LLM模型
stt_model = Mock()
stt_model.invoke = Mock(return_value="这是测试转写结果")
llm_model = Mock()
llm_model.invoke = Mock(return_value="这是增强后的文本,带有标点符号。")
# 测试选项
options = {
'enable_punctuation': True,
'enable_summary': True,
'segment_duration': 60, # 1分钟分段
'language': 'zh-CN'
}
try:
print("开始测试异步音频处理...")
# 初始化线程
processor.initialize_threads()
# 等待线程启动
await asyncio.sleep(1)
# 模拟音频时长为3分钟
async def mock_get_duration(content):
return 180.0
processor._get_audio_duration_async = mock_get_duration
# 处理音频
start_time = time.time()
result = await processor.process_audio_async(
test_audio_content, test_file_name, stt_model, llm_model, options
)
end_time = time.time()
print(f"处理完成,耗时: {end_time - start_time:.2f}")
print(f"结果状态: {result['status']}")
print(f"音频时长: {result['duration']:.1f}")
print(f"分段数量: {len(result['segments'])}")
print(f"完整文本长度: {len(result['full_text'])}")
# 显示队列状态
queue_status = processor.get_queue_status()
print(f"队列状态: {queue_status}")
# 关闭处理器
await processor.shutdown()
print("测试完成!")
except Exception as e:
print(f"测试失败: {e}")
import traceback
traceback.print_exc()
def test_sync_fallback():
"""测试同步回退功能"""
print("\n=== 测试同步回退功能 ===")
from apps.common.handle.impl.media.media_adapter.processors.audio_processor import AudioProcessor
# 创建配置
config = {
'async_processing': False # 禁用异步处理
}
# 创建处理器
processor = AudioProcessor(config, MockLogger())
# 模拟音频数据
test_audio_content = b"fake audio content for testing"
test_file_name = "test_audio.mp3"
# 模拟STT和LLM模型
stt_model = Mock()
stt_model.invoke = Mock(return_value="这是测试转写结果")
llm_model = Mock()
llm_model.invoke = Mock(return_value="这是增强后的文本,带有标点符号。")
# 测试选项
options = {
'enable_punctuation': True,
'enable_summary': True,
'segment_duration': 60,
'language': 'zh-CN'
}
try:
print("开始测试同步音频处理...")
# 处理音频
start_time = time.time()
result = processor.process(
test_audio_content, test_file_name, stt_model, llm_model, options
)
end_time = time.time()
print(f"处理完成,耗时: {end_time - start_time:.2f}")
print(f"结果状态: {result['status']}")
print(f"音频时长: {result.get('duration', 0):.1f}")
print(f"分段数量: {len(result.get('segments', []))}")
print("同步回退测试完成!")
except Exception as e:
print(f"同步回退测试失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
# 运行测试
asyncio.run(test_async_processor())
test_sync_fallback()

View File

@ -1,61 +0,0 @@
#!/usr/bin/env python
"""
简单测试异步修复
"""
import asyncio
from asgiref.sync import sync_to_async
class TestModel:
"""模拟的模型类"""
def invoke(self, messages):
"""同步调用方法"""
return type('Response', (), {'content': 'Test response'})()
def get_model_sync():
"""模拟同步获取模型"""
print("同步获取模型...")
return TestModel()
async def get_model_async():
"""异步获取模型"""
print("异步获取模型...")
return await sync_to_async(get_model_sync)()
async def call_model_async():
"""异步调用模型"""
print("异步调用模型...")
model = await get_model_async()
# 使用 sync_to_async 包装同步的 invoke 方法
response = await sync_to_async(model.invoke)([{"role": "user", "content": "test"}])
if hasattr(response, 'content'):
return response.content
else:
return str(response)
async def main():
"""主测试函数"""
print("=" * 60)
print("测试异步修复")
print("=" * 60)
try:
result = await call_model_async()
print(f"✓ 异步调用成功: {result}")
except Exception as e:
print(f"✗ 异步调用失败: {e}")
print("=" * 60)
print("测试完成!")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,112 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试音频分段处理改为默认文本
"""
import sys
import os
# 添加项目路径
sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings')
import django
django.setup()
from common.handle.impl.media.media_split_handle import MediaSplitHandle
from unittest.mock import Mock
class MockFile:
def __init__(self, name):
self.name = name
def mock_get_buffer(file):
return b"fake audio content"
def test_audio_default_segments():
"""测试音频默认分段生成"""
print("=== 测试音频默认分段生成 ===")
handler = MediaSplitHandle()
# 测试音频文件
test_files = [
"会议录音.mp3",
"产品演示.mp4",
"培训录音.wav",
"介绍视频.mov"
]
for file_name in test_files:
print(f"\n📄 测试文件: {file_name}")
mock_file = MockFile(file_name)
try:
result = handler.handle(
file=mock_file,
pattern_list=[],
with_filter=False,
limit=10,
get_buffer=mock_get_buffer,
save_image=False
)
print(f"✅ 处理成功")
print(f"📊 段落数量: {len(result['content'])}")
print(f"🏷️ 媒体类型: {result['metadata']['media_type']}")
print(f"🎭 演示模式: {result['metadata']['is_demo_content']}")
# 显示段落内容
for i, paragraph in enumerate(result['content'], 1):
print(f"\n{i}. {paragraph['title']}")
print(f" 内容预览: {paragraph['content'][:100]}...")
print(f" 时间范围: {paragraph['metadata']['start_time']}s - {paragraph['metadata']['end_time']}s")
except Exception as e:
print(f"❌ 处理失败: {e}")
import traceback
traceback.print_exc()
def test_file_support():
"""测试文件类型支持"""
print("\n=== 测试文件类型支持 ===")
handler = MediaSplitHandle()
test_files = [
("音频.mp3", True),
("视频.mp4", True),
("文档.pdf", False),
("图片.jpg", False),
("录音.wav", True),
("电影.avi", True)
]
for file_name, expected in test_files:
mock_file = MockFile(file_name)
result = handler.support(mock_file, mock_get_buffer)
status = "" if result == expected else ""
print(f"{status} {file_name}: 支持={result}, 期望={expected}")
def main():
"""主测试函数"""
print("🚀 测试音频分段处理改为默认文本")
print("=" * 50)
test_file_support()
test_audio_default_segments()
print("\n" + "=" * 50)
print("🎉 测试完成!")
print("\n📋 修改总结:")
print("✅ 音频分段处理已改为默认文本")
print("✅ 不再进行实际的音频处理")
print("✅ 根据文件类型生成合适的演示内容")
print("✅ 保留了完整的元数据信息")
print("✅ 支持音频和视频文件")
if __name__ == "__main__":
main()

View File

@ -1,144 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试Celery修复后的效果
"""
import os
import sys
# 添加项目路径
sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings')
def test_celery_basic():
"""测试Celery基本功能"""
print("=== 测试Celery基本功能 ===")
try:
# 设置Django
import django
django.setup()
print("✅ Django设置成功")
# 导入Celery应用
from ops import celery_app
print("✅ Celery应用导入成功")
print(f"📋 应用名称: {celery_app.main}")
# 检查配置
print(f"📊 导入的模块: {celery_app.conf.get('imports', [])}")
return True
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_task_availability():
"""测试任务可用性"""
print("\n=== 测试任务可用性 ===")
try:
# 测试任务导入
from knowledge.tasks.media_learning import media_learning_by_document, media_learning_batch
print("✅ media_learning任务导入成功")
from knowledge.tasks.advanced_learning import advanced_learning_by_document, batch_advanced_learning
print("✅ advanced_learning任务导入成功")
# 检查任务名称
print(f"📋 media_learning_by_document: {media_learning_by_document.name}")
print(f"📋 media_learning_batch: {media_learning_batch.name}")
print(f"📋 advanced_learning_by_document: {advanced_learning_by_document.name}")
print(f"📋 batch_advanced_learning: {batch_advanced_learning.name}")
return True
except Exception as e:
print(f"❌ 任务导入失败: {e}")
import traceback
traceback.print_exc()
return False
def test_celery_worker_check():
"""测试Celery worker检查"""
print("\n=== 测试Celery Worker ===")
try:
# 模拟worker检查
from ops import celery_app
# 获取已注册的任务
tasks = list(celery_app.tasks.keys())
print(f"📊 已注册任务总数: {len(tasks)}")
# 检查我们的任务
target_tasks = [
'media_learning_by_document',
'media_learning_batch',
'advanced_learning_by_document',
'batch_advanced_learning'
]
found_tasks = []
for task in target_tasks:
if task in tasks:
found_tasks.append(task)
print(f"{task} - 已注册")
else:
print(f"{task} - 未注册")
print(f"\n📈 找到 {len(found_tasks)}/{len(target_tasks)} 个目标任务")
return len(found_tasks) == len(target_tasks)
except Exception as e:
print(f"❌ Worker检查失败: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""主测试函数"""
print("🚀 测试Celery递归调用修复")
print("=" * 50)
success = True
# 测试基本功能
if not test_celery_basic():
success = False
# 测试任务可用性
if not test_task_availability():
success = False
# 测试Worker检查
if not test_celery_worker_check():
success = False
print("\n" + "=" * 50)
if success:
print("🎉 所有测试通过!")
print("\n📋 修复总结:")
print("✅ 递归调用问题已解决")
print("✅ Celery应用正常启动")
print("✅ 任务导入无错误")
print("✅ 任务注册成功")
print("✅ 自动发现机制正常工作")
else:
print("❌ 部分测试失败")
print("\n🔧 可能需要:")
print(" - 重启Celery Worker")
print(" - 检查Django设置")
print(" - 验证任务模块路径")
return success
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View File

@ -1,103 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试Celery任务注册
"""
import os
import sys
# 添加项目路径
sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings')
import django
django.setup()
def test_celery_task_registration():
"""测试Celery任务是否正确注册"""
print("=== 测试Celery任务注册 ===")
try:
# 导入Celery应用
from ops import celery_app
print(f"✅ Celery应用导入成功")
print(f"📋 应用名称: {celery_app.main}")
# 检查已注册的任务
registered_tasks = list(celery_app.tasks.keys())
print(f"📊 已注册任务数量: {len(registered_tasks)}")
# 检查我们的任务是否已注册
target_tasks = [
'media_learning_by_document',
'media_learning_batch',
'advanced_learning_by_document',
'batch_advanced_learning'
]
print(f"\n🔍 检查目标任务:")
for task_name in target_tasks:
if task_name in registered_tasks:
print(f"{task_name} - 已注册")
# 获取任务对象
task = celery_app.tasks.get(task_name)
print(f" 📝 任务描述: {task.__doc__}")
print(f" 🏷️ 任务名称: {task.name}")
else:
print(f"{task_name} - 未注册")
# 显示部分其他任务
print(f"\n📋 其他已注册任务 (前10个):")
other_tasks = [t for t in registered_tasks if t not in target_tasks][:10]
for task in other_tasks:
print(f"{task}")
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
def test_task_import():
"""测试任务导入"""
print("\n=== 测试任务导入 ===")
try:
# 测试直接导入任务
from knowledge.tasks.media_learning import media_learning_by_document, media_learning_batch
print("✅ media_learning任务导入成功")
from knowledge.tasks.advanced_learning import advanced_learning_by_document, batch_advanced_learning
print("✅ advanced_learning任务导入成功")
# 测试任务调用
print(f"\n🔧 测试任务调用:")
print(f"📋 media_learning_by_document.name: {media_learning_by_document.name}")
print(f"📋 media_learning_batch.name: {media_learning_batch.name}")
print(f"📋 advanced_learning_by_document.name: {advanced_learning_by_document.name}")
print(f"📋 batch_advanced_learning.name: {batch_advanced_learning.name}")
except Exception as e:
print(f"❌ 任务导入失败: {e}")
import traceback
traceback.print_exc()
def main():
"""主测试函数"""
print("🚀 测试Celery任务注册")
print("=" * 50)
test_task_import()
test_celery_task_registration()
print("\n" + "=" * 50)
print("🎉 测试完成!")
print("\n📋 修复总结:")
print("✅ 修复了任务导入问题")
print("✅ 修复了任务注册问题")
print("✅ 验证了Celery自动发现功能")
print("✅ 确保了音视频异步任务可以正常执行")
if __name__ == "__main__":
main()

View File

@ -1,71 +0,0 @@
#!/usr/bin/env python3
"""
测试配置对象的传递链
"""
import os
import sys
# 设置环境变量,避免从环境获取默认值
os.environ['MAXKB_LLM_MODEL_ID'] = ''
os.environ['MAXKB_VISION_MODEL_ID'] = ''
print("Testing config chain")
print("=" * 60)
# 模拟 dataclass
from dataclasses import dataclass
@dataclass
class BaseConfig:
"""Base configuration"""
api_url: str = "default_url"
def __post_init__(self):
print(f" BaseConfig.__post_init__ called")
class TestConfig(BaseConfig):
"""Test configuration with model IDs"""
@classmethod
def create(cls, llm_id=None, vision_id=None):
print(f"TestConfig.create() called with llm_id={llm_id}, vision_id={vision_id}")
instance = cls()
print(f" After cls(): llm={getattr(instance, 'llm_id', 'NOT SET')}, vision={getattr(instance, 'vision_id', 'NOT SET')}")
if llm_id:
instance.llm_id = llm_id
print(f" Set llm_id to {llm_id}")
if vision_id:
instance.vision_id = vision_id
print(f" Set vision_id to {vision_id}")
print(f" Final: llm={instance.llm_id}, vision={instance.vision_id}")
return instance
def __post_init__(self):
print(f" TestConfig.__post_init__ called")
super().__post_init__()
# Set defaults
self.llm_id = "default_llm"
self.vision_id = "default_vision"
print(f" Set defaults: llm={self.llm_id}, vision={self.vision_id}")
# Test 1: Direct creation
print("\nTest 1: Direct creation (should use defaults)")
config1 = TestConfig()
print(f"Result: llm={config1.llm_id}, vision={config1.vision_id}")
# Test 2: Factory method
print("\nTest 2: Factory method with IDs")
config2 = TestConfig.create(llm_id="llm_123", vision_id="vision_456")
print(f"Result: llm={config2.llm_id}, vision={config2.vision_id}")
print("\n" + "=" * 60)
print("Analysis:")
if config2.llm_id == "llm_123" and config2.vision_id == "vision_456":
print("✅ Factory method correctly overrides defaults")
else:
print("❌ Problem: Factory method failed to override defaults")
print(f" Expected: llm=llm_123, vision=vision_456")
print(f" Got: llm={config2.llm_id}, vision={config2.vision_id}")

View File

@ -1,67 +0,0 @@
#!/usr/bin/env python3
"""
简单测试配置逻辑
"""
# 模拟配置类的行为
class TestConfig:
def __init__(self):
self.llm_model_id = None
self.vision_model_id = None
@classmethod
def create(cls, llm_model_id=None, vision_model_id=None):
instance = cls()
if llm_model_id:
instance.llm_model_id = llm_model_id
if vision_model_id:
instance.vision_model_id = vision_model_id
print(f"Config created with LLM={instance.llm_model_id}, Vision={instance.vision_model_id}")
return instance
def test_model_selection():
"""测试模型选择逻辑"""
TEST_LLM_ID = "0198e029-bfeb-7d43-a6ee-c88662697d3c"
TEST_VISION_ID = "0198e02c-9f2e-7520-a27b-6376ad42d520"
# 创建配置
config = TestConfig.create(
llm_model_id=TEST_LLM_ID,
vision_model_id=TEST_VISION_ID
)
print("\nTest 1: use_llm=False (should use vision model)")
use_llm = False
if use_llm:
model_id = config.llm_model_id
print(f" Using LLM model: {model_id}")
else:
model_id = config.vision_model_id
print(f" Using Vision model: {model_id}")
if model_id == TEST_VISION_ID:
print(f" ✅ Correct! Using vision model ID: {TEST_VISION_ID}")
else:
print(f" ❌ Wrong! Using: {model_id}, Expected: {TEST_VISION_ID}")
print("\nTest 2: use_llm=True (should use LLM model)")
use_llm = True
if use_llm:
model_id = config.llm_model_id
print(f" Using LLM model: {model_id}")
else:
model_id = config.vision_model_id
print(f" Using Vision model: {model_id}")
if model_id == TEST_LLM_ID:
print(f" ✅ Correct! Using LLM model ID: {TEST_LLM_ID}")
else:
print(f" ❌ Wrong! Using: {model_id}, Expected: {TEST_LLM_ID}")
if __name__ == "__main__":
print("=" * 60)
print("Testing Model Selection Logic")
print("=" * 60)
test_model_selection()
print("=" * 60)

View File

@ -1,140 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试Django应用启动和Celery任务注册
"""
import os
import sys
# 添加项目路径
sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings')
def test_django_startup():
"""测试Django应用启动"""
print("=== 测试Django应用启动 ===")
try:
# 设置Django
import django
django.setup()
print("✅ Django应用启动成功")
print(f"📊 已安装应用数量: {len(django.apps.apps.get_app_configs())}")
# 检查knowledge应用
knowledge_app = django.apps.apps.get_app_config('knowledge')
print(f"✅ Knowledge应用已加载: {knowledge_app.name}")
# 检查应用是否准备好
if django.apps.apps.ready:
print("✅ Django应用已完全准备好")
else:
print("⚠️ Django应用还未完全准备好")
return True
except Exception as e:
print(f"❌ Django应用启动失败: {e}")
import traceback
traceback.print_exc()
return False
def test_celery_tasks():
"""测试Celery任务"""
print("\n=== 测试Celery任务 ===")
try:
# 导入Celery应用
from ops import celery_app
print(f"✅ Celery应用导入成功")
print(f"📋 应用名称: {celery_app.main}")
# 检查任务
registered_tasks = list(celery_app.tasks.keys())
print(f"📊 已注册任务数量: {len(registered_tasks)}")
# 检查目标任务
target_tasks = [
'media_learning_by_document',
'media_learning_batch',
'advanced_learning_by_document',
'batch_advanced_learning'
]
print(f"\n🔍 检查目标任务:")
for task_name in target_tasks:
if task_name in registered_tasks:
print(f"{task_name} - 已注册")
else:
print(f"{task_name} - 未注册")
return True
except Exception as e:
print(f"❌ Celery任务测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_task_import():
"""测试任务导入"""
print("\n=== 测试任务导入 ===")
try:
# 测试导入
from knowledge.tasks.media_learning import media_learning_by_document, media_learning_batch
print("✅ media_learning任务导入成功")
from knowledge.tasks.advanced_learning import advanced_learning_by_document, batch_advanced_learning
print("✅ advanced_learning任务导入成功")
# 测试任务属性
print(f"\n🔧 任务信息:")
print(f"📋 media_learning_by_document.name: {media_learning_by_document.name}")
print(f"📋 media_learning_batch.name: {media_learning_batch.name}")
return True
except Exception as e:
print(f"❌ 任务导入失败: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""主测试函数"""
print("🚀 测试Django应用启动和Celery任务注册")
print("=" * 60)
success = True
# 测试Django启动
if not test_django_startup():
success = False
# 测试任务导入
if not test_task_import():
success = False
# 测试Celery任务
if not test_celery_tasks():
success = False
print("\n" + "=" * 60)
if success:
print("🎉 所有测试通过!")
print("\n📋 修复总结:")
print("✅ Django应用启动正常")
print("✅ 任务导入无错误")
print("✅ Celery任务正确注册")
print("✅ 应用启动顺序正确")
else:
print("❌ 部分测试失败")
return success
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View File

@ -1,213 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试修复后的音视频异步处理流程
"""
import time
def test_fixed_media_processing():
"""测试修复后的音视频处理流程"""
print("🔧 测试修复后的音视频异步处理流程")
print("=" * 50)
# 模拟文档信息
test_files = [
{
'name': '会议录音.mp3',
'type': 'audio',
'expected_segments': 3
},
{
'name': '产品演示.mp4',
'type': 'video',
'expected_segments': 3
},
{
'name': '培训录音.wav',
'type': 'audio',
'expected_segments': 3
},
{
'name': '介绍视频.mov',
'type': 'video',
'expected_segments': 3
}
]
for i, file_info in enumerate(test_files, 1):
print(f"\n📄 测试文件 {i}: {file_info['name']}")
print(f"🎵 文件类型: {file_info['type']}")
print(f"📊 预期分段数: {file_info['expected_segments']}")
# 模拟处理流程
print(f"\n🔄 处理流程:")
# 1. 排队中
print(f" 📋 状态: 排队中 (PENDING)")
print(f" 📝 任务已提交到异步队列")
time.sleep(0.5)
# 2. 生成中
print(f" 🔄 状态: 生成中 (STARTED)")
print(f" 🔧 开始生成演示段落(不实际处理音频)")
time.sleep(0.5)
# 3. 索引中
print(f" 📚 状态: 索引中 (STARTED)")
print(f" 📝 创建段落对象")
print(f" 🔍 生成向量索引")
time.sleep(0.5)
# 4. 完成
print(f" ✅ 状态: 完成 (SUCCESS)")
print(f" 📊 生成 {file_info['expected_segments']} 个演示段落")
# 显示演示段落内容
print(f"\n📝 演示段落内容:")
if file_info['type'] == 'audio':
segments = [
"开场介绍 - 包含会议的开场介绍和主要议题的说明",
"项目进展 - 详细讨论了项目的进展情况和下一步的工作计划",
"总结与行动项 - 总结了会议的主要结论和行动项"
]
else:
segments = [
"开场介绍 - 包含视频的开场介绍和主要内容概述",
"功能演示 - 详细展示了产品的功能特性和使用方法",
"总结与联系方式 - 总结了产品的主要优势和适用场景"
]
for j, segment in enumerate(segments, 1):
print(f" {j}. {segment}")
print(f"\n📊 处理统计:")
print(f" 📝 段落数量: {file_info['expected_segments']}")
print(f" 🔤 字符数量: ~{file_info['expected_segments'] * 200}")
print(f" ⏱️ 处理时长: < 1秒演示模式")
print(f" 🏷️ 标记: 演示内容 (is_demo: True)")
print(f"\n" + "-" * 30)
print(f"\n🎉 所有测试文件处理完成!")
def test_error_handling():
"""测试错误处理"""
print(f"\n❌ 测试错误处理场景")
print("=" * 30)
# 模拟错误场景
error_scenarios = [
{
'scenario': '导入错误修复',
'description': 'embedding_by_data_source 导入路径已修复',
'status': '✅ 已解决'
},
{
'scenario': '任务提交失败',
'description': '异步任务提交失败时的处理',
'status': '✅ 已实现'
},
{
'scenario': '文件不存在',
'description': '源文件不存在时的错误处理',
'status': '✅ 已实现'
},
{
'scenario': '处理失败',
'description': '处理过程中的异常处理',
'status': '✅ 已实现'
}
]
for i, scenario in enumerate(error_scenarios, 1):
print(f"\n{i}. {scenario['scenario']}")
print(f" 描述: {scenario['description']}")
print(f" 状态: {scenario['status']}")
time.sleep(0.3)
print(f"\n🔧 错误处理特性:")
print(f" ✅ 详细的错误日志")
print(f" ✅ 状态正确更新为 FAILURE")
print(f" ✅ 支持手动重新处理")
print(f" ✅ 异常捕获和优雅降级")
def test_demo_content_features():
"""测试演示内容特性"""
print(f"\n🎭 测试演示内容特性")
print("=" * 30)
features = [
{
'feature': '智能分段',
'description': '根据文件类型生成合适的演示段落',
'benefit': '更真实的处理体验'
},
{
'feature': '元数据标记',
'description': '每个段落都标记为演示内容 (is_demo: True)',
'benefit': '便于区分真实处理和演示内容'
},
{
'feature': '文件类型识别',
'description': '自动识别音频/视频文件类型',
'benefit': '生成更贴合的演示内容'
},
{
'feature': '时长信息',
'description': '为每个段落添加模拟的时长信息',
'benefit': '更真实的分段效果'
}
]
for i, feature in enumerate(features, 1):
print(f"\n{i}. {feature['feature']}")
print(f" 描述: {feature['description']}")
print(f" 优势: {feature['benefit']}")
time.sleep(0.3)
print(f"\n🎯 演示内容适用场景:")
print(f" 🧪 开发和测试环境")
print(f" 📚 功能演示和展示")
print(f" 🔧 系统集成测试")
print(f" 🎓 用户培训和指导")
def main():
"""主测试函数"""
print("🚀 音视频异步处理修复验证测试")
print("=" * 60)
# 运行测试
test_fixed_media_processing()
test_error_handling()
test_demo_content_features()
print(f"\n" + "=" * 60)
print("🎊 修复验证测试完成!")
print(f"\n📋 修复内容总结:")
print(f"✅ 修复了 embedding_by_data_source 导入错误")
print(f"✅ 实现了演示内容生成(不实际处理音频)")
print(f"✅ 保持了完整的状态流转")
print(f"✅ 完善了错误处理机制")
print(f"✅ 支持多种音视频文件类型")
print(f"\n🔄 状态流程(修复后):")
print(f"📋 排队中 → 🔄 生成中 → 📚 索引中 → ✅ 完成")
print(f"")
print(f"💥 失败")
print(f"\n🎭 演示模式特性:")
print(f"🔧 不实际处理音频文件")
print(f"📝 生成合理的演示段落")
print(f"🏷️ 标记为演示内容")
print(f"⚡ 快速处理,无延迟")
print(f"\n🚀 现在可以正常使用音视频异步处理功能!")
if __name__ == "__main__":
main()

View File

@ -1,59 +0,0 @@
#!/usr/bin/env python
"""
测试图片存储和访问
这个脚本会
1. 创建一个测试图片在存储目录
2. 打印正确的访问URL
"""
import os
import sys
def main():
# 设置存储路径(本地开发环境)
storage_path = os.getenv('MAXKB_STORAGE_PATH', './tmp/maxkb/storage')
print("=" * 60)
print("MaxKB 图片存储和访问测试")
print("=" * 60)
# 创建目录结构
image_dir = os.path.join(storage_path, 'mineru', 'images')
os.makedirs(image_dir, exist_ok=True)
print(f"\n1. 存储目录:{image_dir}")
# 创建测试图片文件
test_image = os.path.join(image_dir, 'ac3681aaa7a346b49ef9c7ceb7b94058.jpg')
with open(test_image, 'wb') as f:
# 写入一个简单的测试内容(实际应该是图片二进制数据)
f.write(b'TEST IMAGE CONTENT')
print(f"2. 创建测试文件:{test_image}")
# 生成访问URL
print("\n3. 访问URL")
print(f" 本地开发http://localhost:8080/storage/mineru/images/ac3681aaa7a346b49ef9c7ceb7b94058.jpg")
print(f" Docker环境http://localhost:8080/storage/mineru/images/ac3681aaa7a346b49ef9c7ceb7b94058.jpg")
# 列出当前存储目录的所有文件
print(f"\n4. 存储目录内容:")
for root, dirs, files in os.walk(storage_path):
level = root.replace(storage_path, '').count(os.sep)
indent = ' ' * level
print(f'{indent}{os.path.basename(root)}/')
subindent = ' ' * (level + 1)
for file in files:
file_path = os.path.join(root, file)
file_size = os.path.getsize(file_path)
print(f'{subindent}{file} ({file_size} bytes)')
print("\n" + "=" * 60)
print("测试完成!")
print("\n注意事项:")
print("1. 确保Django服务器正在运行")
print("2. URL路径现在是 /storage/ 开头,简洁直接")
print("3. 如果使用Docker确保volume正确挂载")
print("=" * 60)
if __name__ == "__main__":
main()

View File

@ -1,289 +0,0 @@
#!/usr/bin/env python3
"""
MaxKB Adapter Import and Basic Functionality Test
This script specifically tests the MaxKB adapter imports and basic functionality.
"""
import sys
import os
from pathlib import Path
# Add the project root to Python path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
# For MaxKB, also add the apps directory to the path
apps_path = project_root / 'apps'
if apps_path.exists():
sys.path.insert(0, str(apps_path))
print(f"✅ Added apps directory to Python path: {apps_path}")
# Setup Django environment if we're in MaxKB
try:
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings')
django.setup()
print("✅ Django environment initialized")
except ImportError:
print(" Django not available - running in standalone mode")
except Exception as e:
print(f" Could not initialize Django: {e}")
def test_imports():
"""Test MaxKB adapter imports"""
print("=" * 60)
print("🔍 Testing MaxKB Adapter Imports")
print("=" * 60)
results = []
# Test 1: Import main adapter module
print("\n1. Testing main adapter import...")
try:
from common.handle.impl.mineru.maxkb_adapter import adapter
print(" ✅ Successfully imported adapter module")
results.append(("adapter module", True))
# Check for required classes
assert hasattr(adapter, 'MaxKBAdapter'), "MaxKBAdapter class not found"
print(" ✅ MaxKBAdapter class found")
assert hasattr(adapter, 'MinerUExtractor'), "MinerUExtractor class not found"
print(" ✅ MinerUExtractor class found")
assert hasattr(adapter, 'MinerUAdapter'), "MinerUAdapter class not found"
print(" ✅ MinerUAdapter class found")
except ImportError as e:
print(f" ❌ Failed to import adapter: {e}")
results.append(("adapter module", False))
except AssertionError as e:
print(f" ❌ Assertion failed: {e}")
results.append(("adapter module", False))
# Test 2: Import file storage client
print("\n2. Testing file storage client import...")
try:
from common.handle.impl.mineru.maxkb_adapter import file_storage_client
print(" ✅ Successfully imported file_storage_client module")
assert hasattr(file_storage_client, 'FileStorageClient'), "FileStorageClient class not found"
print(" ✅ FileStorageClient class found")
results.append(("file_storage_client", True))
except ImportError as e:
print(f" ❌ Failed to import file_storage_client: {e}")
results.append(("file_storage_client", False))
except AssertionError as e:
print(f" ❌ Assertion failed: {e}")
results.append(("file_storage_client", False))
# Test 3: Import model client
print("\n3. Testing model client import...")
try:
from common.handle.impl.mineru.maxkb_adapter import maxkb_model_client
print(" ✅ Successfully imported maxkb_model_client module")
assert hasattr(maxkb_model_client, 'MaxKBModelClient'), "MaxKBModelClient class not found"
print(" ✅ MaxKBModelClient class found")
assert hasattr(maxkb_model_client, 'maxkb_model_client'), "maxkb_model_client instance not found"
print(" ✅ maxkb_model_client instance found")
results.append(("maxkb_model_client", True))
except ImportError as e:
print(f" ❌ Failed to import maxkb_model_client: {e}")
results.append(("maxkb_model_client", False))
except AssertionError as e:
print(f" ❌ Assertion failed: {e}")
results.append(("maxkb_model_client", False))
# Test 4: Import configuration
print("\n4. Testing configuration import...")
try:
from common.handle.impl.mineru.maxkb_adapter import config_maxkb
print(" ✅ Successfully imported config_maxkb module")
assert hasattr(config_maxkb, 'MaxKBMinerUConfig'), "MaxKBMinerUConfig class not found"
print(" ✅ MaxKBMinerUConfig class found")
results.append(("config_maxkb", True))
except ImportError as e:
print(f" ❌ Failed to import config_maxkb: {e}")
results.append(("config_maxkb", False))
except AssertionError as e:
print(f" ❌ Assertion failed: {e}")
results.append(("config_maxkb", False))
# Test 5: Import logger
print("\n5. Testing logger import...")
try:
from common.handle.impl.mineru.maxkb_adapter import logger
print(" ✅ Successfully imported logger module")
results.append(("logger", True))
except ImportError as e:
print(f" ❌ Failed to import logger: {e}")
results.append(("logger", False))
# Test 6: Import base parser (parent module)
print("\n6. Testing base parser import...")
try:
from common.handle.impl.mineru import base_parser
print(" ✅ Successfully imported base_parser module")
assert hasattr(base_parser, 'PlatformAdapter'), "PlatformAdapter class not found"
print(" ✅ PlatformAdapter class found")
assert hasattr(base_parser, 'BaseMinerUExtractor'), "BaseMinerUExtractor class not found"
print(" ✅ BaseMinerUExtractor class found")
results.append(("base_parser", True))
except ImportError as e:
print(f" ❌ Failed to import base_parser: {e}")
results.append(("base_parser", False))
except AssertionError as e:
print(f" ❌ Assertion failed: {e}")
results.append(("base_parser", False))
# Print summary
print("\n" + "=" * 60)
print("📊 Import Test Summary")
print("=" * 60)
passed = sum(1 for _, success in results if success)
failed = len(results) - passed
for module_name, success in results:
status = "✅ PASS" if success else "❌ FAIL"
print(f"{status:10} {module_name}")
print("-" * 60)
print(f"Total: {len(results)} tests")
print(f"Passed: {passed}")
print(f"Failed: {failed}")
if failed == 0:
print("\n🎉 All import tests passed!")
else:
print(f"\n⚠️ {failed} import test(s) failed")
return failed == 0
def test_basic_instantiation():
"""Test basic instantiation of MaxKB adapter classes"""
print("\n" + "=" * 60)
print("🔧 Testing Basic Instantiation")
print("=" * 60)
results = []
# Test 1: Instantiate MaxKBAdapter
print("\n1. Testing MaxKBAdapter instantiation...")
try:
from common.handle.impl.mineru.maxkb_adapter.adapter import MaxKBAdapter
adapter = MaxKBAdapter()
assert adapter is not None, "Adapter is None"
assert adapter.file_storage is not None, "File storage not initialized"
assert adapter.model_client is not None, "Model client not initialized"
print(" ✅ MaxKBAdapter instantiated successfully")
results.append(("MaxKBAdapter", True))
except Exception as e:
print(f" ❌ Failed to instantiate MaxKBAdapter: {e}")
results.append(("MaxKBAdapter", False))
# Test 2: Instantiate MinerUExtractor
print("\n2. Testing MinerUExtractor instantiation...")
try:
from common.handle.impl.mineru.maxkb_adapter.adapter import MinerUExtractor
extractor = MinerUExtractor(
llm_model_id="test_model",
vision_model_id="test_vision"
)
assert extractor is not None, "Extractor is None"
assert extractor.llm_model_id == "test_model", "LLM model ID not set correctly"
assert extractor.vision_model_id == "test_vision", "Vision model ID not set correctly"
print(" ✅ MinerUExtractor instantiated successfully")
results.append(("MinerUExtractor", True))
except Exception as e:
print(f" ❌ Failed to instantiate MinerUExtractor: {e}")
results.append(("MinerUExtractor", False))
# Test 3: Instantiate MinerUAdapter (with mocked init)
print("\n3. Testing MinerUAdapter instantiation...")
try:
from common.handle.impl.mineru.maxkb_adapter.adapter import MinerUAdapter
from unittest.mock import patch
with patch.object(MinerUAdapter, '_init_extractor'):
adapter = MinerUAdapter()
assert adapter is not None, "Adapter is None"
print(" ✅ MinerUAdapter instantiated successfully")
results.append(("MinerUAdapter", True))
except Exception as e:
print(f" ❌ Failed to instantiate MinerUAdapter: {e}")
results.append(("MinerUAdapter", False))
# Print summary
print("\n" + "=" * 60)
print("📊 Instantiation Test Summary")
print("=" * 60)
passed = sum(1 for _, success in results if success)
failed = len(results) - passed
for class_name, success in results:
status = "✅ PASS" if success else "❌ FAIL"
print(f"{status:10} {class_name}")
print("-" * 60)
print(f"Total: {len(results)} tests")
print(f"Passed: {passed}")
print(f"Failed: {failed}")
if failed == 0:
print("\n🎉 All instantiation tests passed!")
else:
print(f"\n⚠️ {failed} instantiation test(s) failed")
return failed == 0
def main():
"""Main test function"""
print("\n" + "🚀 MaxKB Adapter Test Suite" + "\n")
# Run import tests
import_success = test_imports()
# Run instantiation tests only if imports succeeded
if import_success:
instantiation_success = test_basic_instantiation()
else:
print("\n⚠️ Skipping instantiation tests due to import failures")
instantiation_success = False
# Final summary
print("\n" + "=" * 60)
print("🏁 Final Test Results")
print("=" * 60)
if import_success and instantiation_success:
print("✅ All tests passed successfully!")
print("\nThe MaxKB adapter is properly configured and ready to use.")
return 0
else:
print("❌ Some tests failed.")
print("\nPlease review the errors above and ensure all dependencies are installed.")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,193 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试音视频异步处理流程 - 简化版本
"""
import time
def test_async_flow_simulation():
"""模拟异步处理流程"""
print("🚀 音视频异步处理流程演示")
print("=" * 50)
# 模拟文档信息
document_id = "media-doc-001"
file_name = "会议录音.mp3"
stt_model = "whisper-large"
llm_model = "gpt-4"
print(f"📄 文档信息:")
print(f" ID: {document_id}")
print(f" 文件名: {file_name}")
print(f" STT模型: {stt_model}")
print(f" LLM模型: {llm_model}")
# 状态流程演示
print(f"\n🔄 状态变更流程:")
steps = [
{
'status': '排队中',
'code': 'PENDING',
'emoji': '📋',
'description': '任务已提交,等待处理',
'details': '文档已创建,异步任务已加入队列'
},
{
'status': '生成中',
'code': 'STARTED',
'emoji': '🔄',
'description': '正在转写音视频内容',
'details': '调用STT模型进行语音转写LLM模型进行文本优化'
},
{
'status': '索引中',
'code': 'STARTED',
'emoji': '📚',
'description': '正在创建段落和索引',
'details': '创建段落对象,生成向量索引,更新文档统计'
},
{
'status': '完成',
'code': 'SUCCESS',
'emoji': '',
'description': '处理完成',
'details': '音视频内容已成功转写并索引,可供搜索'
}
]
for i, step in enumerate(steps, 1):
print(f"\n{i}. {step['emoji']} {step['status']} ({step['code']})")
print(f" 描述: {step['description']}")
print(f" 详情: {step['details']}")
# 模拟处理时间
if step['status'] == '排队中':
print(" ⏳ 等待工作线程处理...")
time.sleep(1)
elif step['status'] == '生成中':
print(" 🎵 正在转写音频内容...")
print(" 🤖 正在优化转写文本...")
time.sleep(2)
elif step['status'] == '索引中':
print(" 📝 创建段落对象...")
print(" 🔍 生成向量索引...")
time.sleep(1)
elif step['status'] == '完成':
print(" 📊 生成统计信息...")
print(" 🎉 处理完成!")
time.sleep(1)
print(f"\n📊 处理结果:")
print(f" 📝 段落数量: 8")
print(f" 🔤 字符数量: 2,456")
print(f" ⏱️ 处理时长: 15分32秒")
print(f" 📝 内容预览: '今天的会议主要讨论了产品开发进度...'")
print(f"\n🎯 用户可执行的操作:")
print(f" 🔍 搜索文档内容")
print(f" 📖 查看完整转写")
print(f" 📊 查看处理统计")
print(f" 🔄 重新处理(如需要)")
def test_error_scenario():
"""测试错误场景"""
print(f"\n❌ 错误处理场景演示:")
print("=" * 30)
error_steps = [
{
'status': '排队中',
'code': 'PENDING',
'emoji': '📋',
'description': '任务已提交,等待处理'
},
{
'status': '生成中',
'code': 'STARTED',
'emoji': '🔄',
'description': '正在转写音视频内容'
},
{
'status': '失败',
'code': 'FAILURE',
'emoji': '💥',
'description': '处理失败',
'details': 'STT模型调用失败请检查模型配置'
}
]
for i, step in enumerate(error_steps, 1):
print(f"\n{i}. {step['emoji']} {step['status']} ({step['code']})")
print(f" 描述: {step['description']}")
if 'details' in step:
print(f" 详情: {step['details']}")
time.sleep(1)
print(f"\n🔧 错误处理:")
print(f" 📋 自动重试机制")
print(f" 📊 详细的错误日志")
print(f" 🔄 用户可手动重新处理")
print(f" 📧 系统管理员通知")
def test_batch_processing():
"""测试批量处理场景"""
print(f"\n📦 批量处理演示:")
print("=" * 30)
documents = [
{'name': '会议录音1.mp3', 'duration': '15:32'},
{'name': '培训视频.mp4', 'duration': '45:18'},
{'name': '产品介绍.mp3', 'duration': '8:45'},
]
print(f"📋 批量上传 {len(documents)} 个音视频文件:")
for i, doc in enumerate(documents, 1):
print(f"\n{i}. 📄 {doc['name']} ({doc['duration']})")
print(f" 📋 状态: 排队中 (PENDING)")
print(f" 🎬 任务已提交到异步队列")
time.sleep(0.5)
print(f"\n🔄 并行处理中...")
print(f" 🎵 3个工作线程同时处理")
print(f" ⚡ 每个文件独立处理")
time.sleep(2)
print(f"\n✅ 批量处理完成:")
for i, doc in enumerate(documents, 1):
print(f" {i}. {doc['name']}: 完成 (SUCCESS)")
def main():
"""主函数"""
print("🎬 音视频异步处理完整流程演示")
print("=" * 60)
# 运行测试
test_async_flow_simulation()
test_error_scenario()
test_batch_processing()
print(f"\n" + "=" * 60)
print("🎊 演示完成!")
print(f"\n📋 核心特性:")
print(f"✅ 完全异步化处理")
print(f"✅ 详细的状态追踪")
print(f"✅ 错误处理和重试")
print(f"✅ 批量处理支持")
print(f"✅ 复用现有状态系统")
print(f"\n🔄 状态流转:")
print(f"📋 排队中 → 🔄 生成中 → 📚 索引中 → ✅ 完成")
print(f"")
print(f" 💥 失败")
if __name__ == "__main__":
main()

View File

@ -1,249 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试音视频异步处理流程
"""
import os
import sys
import django
import time
from unittest.mock import Mock
# 设置Django环境
sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings')
django.setup()
from django.db.models import QuerySet
from knowledge.models import Document, Paragraph, TaskType, State
from common.event import ListenerManagement
from knowledge.tasks.media_learning import media_learning_by_document
from knowledge.serializers.document import DocumentSerializers
class MockLogger:
"""模拟日志器"""
def info(self, msg):
print(f"[INFO] {msg}")
def warning(self, msg):
print(f"[WARNING] {msg}")
def error(self, msg, exc_info=False):
print(f"[ERROR] {msg}")
def test_status_flow():
"""测试状态流程"""
print("=== 测试音视频异步处理状态流程 ===")
# 创建模拟文档
document_id = "test-media-doc-001"
knowledge_id = "test-knowledge-001"
workspace_id = "test-workspace-001"
stt_model_id = "test-stt-model"
llm_model_id = "test-llm-model"
print(f"📋 测试文档ID: {document_id}")
print(f"🎵 STT模型ID: {stt_model_id}")
print(f"🤖 LLM模型ID: {llm_model_id}")
# 模拟文档对象
mock_document = Mock()
mock_document.id = document_id
mock_document.name = "测试音视频文件.mp3"
mock_document.meta = {'source_file_id': 'test-file-001'}
# 模拟查询集
mock_queryset = Mock()
mock_queryset.filter.return_value.first.return_value = mock_document
# 模拟源文件
mock_file = Mock()
mock_file.file_name = "测试音视频文件.mp3"
mock_file.get_bytes.return_value = b"fake audio content"
# 模拟文件查询
original_file_filter = QuerySet.__dict__['filter']
def mock_filter(self, **kwargs):
if 'id' in kwargs and kwargs['id'] == 'test-file-001':
file_queryset = Mock()
file_queryset.first.return_value = mock_file
return file_queryset
elif 'id' in kwargs and kwargs['id'] == document_id:
doc_queryset = Mock()
doc_queryset.first.return_value = mock_document
return doc_queryset
return mock_queryset
# 临时替换查询方法
QuerySet.filter = mock_filter
try:
print("\n🔄 测试状态更新流程...")
# 1. 测试排队中状态
print("\n1⃣ 设置排队中状态 (PENDING)")
ListenerManagement.update_status(
QuerySet(Document).filter(id=document_id),
TaskType.EMBEDDING,
State.PENDING
)
print(f"✅ 状态已更新为: PENDING")
# 等待1秒模拟排队时间
time.sleep(1)
# 2. 测试生成中状态
print("\n2⃣ 设置生成中状态 (STARTED - 生成中)")
ListenerManagement.update_status(
QuerySet(Document).filter(id=document_id),
TaskType.EMBEDDING,
State.STARTED
)
print(f"✅ 状态已更新为: STARTED (生成中)")
# 等待2秒模拟处理时间
time.sleep(2)
# 3. 测试索引中状态(通过日志区分)
print("\n3⃣ 设置索引中状态 (STARTED - 索引中)")
print("📚 状态保持为STARTED但进入索引中阶段")
# 等待1秒模拟索引时间
time.sleep(1)
# 4. 测试完成状态
print("\n4⃣ 设置完成状态 (SUCCESS)")
ListenerManagement.update_status(
QuerySet(Document).filter(id=document_id),
TaskType.EMBEDDING,
State.SUCCESS
)
print(f"✅ 状态已更新为: SUCCESS")
print("\n🎉 状态流程测试完成!")
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
finally:
# 恢复原始查询方法
QuerySet.filter = original_file_filter
def test_document_creation():
"""测试文档创建流程"""
print("\n=== 测试文档创建和异步任务触发 ===")
# 模拟文档数据
document_data = {
'name': '测试音视频文件.mp3',
'source_file_id': 'test-file-001',
'stt_model_id': 'test-stt-model',
'llm_model_id': 'test-llm-model',
'paragraphs': [], # 异步处理时为空
'is_media_async': True
}
print(f"📄 创建音视频文档: {document_data['name']}")
print(f"🎵 STT模型: {document_data['stt_model_id']}")
print(f"🤖 LLM模型: {document_data['llm_model_id']}")
print(f"⏳ 异步处理: {'' if document_data.get('is_media_async') else ''}")
# 模拟批量保存过程
instance_list = [document_data]
knowledge_id = "test-knowledge-001"
workspace_id = "test-workspace-001"
print("\n🔄 模拟批量保存流程...")
# 模拟文档ID生成
document_id = "generated-doc-001"
document_result_list = [{'id': document_id}]
print(f"📋 生成文档ID: {document_id}")
# 模拟异步任务触发
for idx, document in enumerate(instance_list):
stt_model_id = document.get('stt_model_id')
if idx < len(document_result_list) and stt_model_id:
doc_id = document_result_list[idx].get('id')
print(f"\n🎬 触发音视频异步任务...")
print(f"📋 文档ID: {doc_id}")
print(f"🎵 STT模型: {stt_model_id}")
print(f"📊 状态: PENDING (排队中)")
# 模拟任务提交
print(f"✅ 异步任务已提交到队列")
print("\n🎉 文档创建流程测试完成!")
def test_async_task_simulation():
"""模拟异步任务执行"""
print("\n=== 模拟异步任务执行流程 ===")
document_id = "test-media-doc-001"
print(f"🎬 开始异步处理文档: {document_id}")
# 模拟任务执行步骤
steps = [
("📋", "排队中", "PENDING", "任务已提交,等待处理"),
("🔄", "生成中", "STARTED", "正在转写音视频内容"),
("📚", "索引中", "STARTED", "正在创建段落和索引"),
("", "完成", "SUCCESS", "处理完成"),
]
for emoji, stage, status, description in steps:
print(f"\n{emoji} {stage} ({status})")
print(f" {description}")
if stage == "排队中":
print(" ⏳ 等待工作线程处理...")
elif stage == "生成中":
print(" 🎵 正在调用STT模型转写音频...")
print(" 🤖 正在调用LLM模型优化文本...")
elif stage == "索引中":
print(" 📝 正在创建段落对象...")
print(" 🔍 正在生成向量索引...")
elif stage == "完成":
print(" 🎉 音视频处理完成!")
print(" 📊 段落数量: 5")
print(" 📝 字符数量: 1,234")
# 模拟处理时间
time.sleep(1)
print("\n🎉 异步任务执行流程测试完成!")
def main():
"""主测试函数"""
print("🚀 开始音视频异步处理流程测试")
print("=" * 50)
# 运行测试
test_status_flow()
test_document_creation()
test_async_task_simulation()
print("\n" + "=" * 50)
print("🎊 所有测试完成!")
print("\n📋 状态流程总结:")
print("1. 排队中 (PENDING) - 文档创建,任务提交")
print("2. 生成中 (STARTED) - 音视频转写处理")
print("3. 索引中 (STARTED) - 段落创建和向量化")
print("4. 完成 (SUCCESS) - 处理完成")
print("5. 失败 (FAILURE) - 处理失败")
if __name__ == "__main__":
main()

View File

@ -1,134 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试音视频处理功能
"""
import sys
import os
sys.path.append('apps')
def test_media_handler():
"""测试音视频处理器"""
print("测试音视频处理器...")
try:
from common.handle.impl.media.media_split_handle import MediaSplitHandle
from common.handle.impl.media.media_adapter import MediaAdapter
# 创建处理器
handler = MediaSplitHandle()
print("✓ MediaSplitHandle 创建成功")
# 测试文件类型支持
class MockFile:
def __init__(self, name, content=b'test'):
self.name = name
self.content = content
self.size = len(content)
def read(self):
return self.content
def seek(self, pos):
pass
# 测试音频文件支持
audio_files = ['test.mp3', 'test.wav', 'test.m4a', 'test.flac']
for filename in audio_files:
file = MockFile(filename)
if handler.support(file, lambda x: x.read()):
print(f"{filename} 支持")
else:
print(f"{filename} 不支持")
# 测试视频文件支持
video_files = ['test.mp4', 'test.avi', 'test.mov', 'test.mkv']
for filename in video_files:
file = MockFile(filename)
if handler.support(file, lambda x: x.read()):
print(f"{filename} 支持")
else:
print(f"{filename} 不支持")
# 测试非媒体文件
other_files = ['test.txt', 'test.pdf', 'test.docx']
for filename in other_files:
file = MockFile(filename)
if not handler.support(file, lambda x: x.read()):
print(f"{filename} 正确排除")
else:
print(f"{filename} 错误支持")
print("\n✓ 所有文件类型测试通过")
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
return True
def test_media_adapter():
"""测试媒体适配器"""
print("\n测试媒体适配器...")
try:
from common.handle.impl.media.media_adapter import MediaAdapter
# 创建适配器
adapter = MediaAdapter()
print("✓ MediaAdapter 创建成功")
# 测试配置
if adapter.config:
print("✓ 配置加载成功")
print(f" - STT Provider: {adapter.config.get('stt_provider')}")
print(f" - Max Duration: {adapter.config.get('max_duration')}")
print(f" - Segment Duration: {adapter.config.get('segment_duration')}")
# 测试媒体类型检测
test_cases = [
('test.mp3', 'audio'),
('test.mp4', 'video'),
('test.wav', 'audio'),
('test.avi', 'video'),
]
for filename, expected_type in test_cases:
detected_type = adapter._detect_media_type(filename)
if detected_type == expected_type:
print(f"{filename} -> {detected_type}")
else:
print(f"{filename} -> {detected_type} (期望: {expected_type})")
print("\n✓ 适配器测试通过")
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
return True
if __name__ == '__main__':
print("=" * 50)
print("音视频学习模块测试")
print("=" * 50)
success = True
# 运行测试
if not test_media_handler():
success = False
if not test_media_adapter():
success = False
print("\n" + "=" * 50)
if success:
print("✅ 所有测试通过!")
else:
print("❌ 部分测试失败")
print("=" * 50)

View File

@ -1,116 +0,0 @@
#!/usr/bin/env python
"""
测试 MinerU 异步上下文修复
"""
import os
import sys
import asyncio
import django
# 设置 Django 环境
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'smartdoc.settings')
django.setup()
from apps.common.handle.impl.mineru.maxkb_adapter.maxkb_model_client import maxkb_model_client
async def test_async_model_calls():
"""测试异步模型调用"""
print("测试异步模型调用...")
# 测试获取 LLM 模型
try:
print("\n1. 测试获取 LLM 模型...")
llm_model = await maxkb_model_client.get_llm_model("0198cbd9-c1a6-7b13-b16d-d85ad77ac03d")
if llm_model:
print(" ✓ LLM 模型获取成功")
else:
print(" ✗ LLM 模型获取失败")
except Exception as e:
print(f" ✗ LLM 模型获取出错: {e}")
# 测试获取视觉模型
try:
print("\n2. 测试获取视觉模型...")
vision_model = await maxkb_model_client.get_vision_model("0198cbd9-c1a6-7b13-b16d-d85ad77ac03d")
if vision_model:
print(" ✓ 视觉模型获取成功")
else:
print(" ✗ 视觉模型获取失败")
except Exception as e:
print(f" ✗ 视觉模型获取出错: {e}")
# 测试聊天完成
try:
print("\n3. 测试聊天完成...")
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello, this is a test."}
]
response = await maxkb_model_client.chat_completion(
"0198cbd9-c1a6-7b13-b16d-d85ad77ac03d",
messages
)
if response:
print(f" ✓ 聊天完成成功: {response[:100]}...")
else:
print(" ✗ 聊天完成返回空响应")
except Exception as e:
print(f" ✗ 聊天完成出错: {e}")
# 测试模型验证
try:
print("\n4. 测试模型验证...")
is_valid = await maxkb_model_client.validate_model("0198cbd9-c1a6-7b13-b16d-d85ad77ac03d")
if is_valid:
print(" ✓ 模型验证成功")
else:
print(" ✗ 模型不存在或无效")
except Exception as e:
print(f" ✗ 模型验证出错: {e}")
print("\n测试完成!")
async def test_mineru_image_processing():
"""测试 MinerU 图像处理流程"""
print("\n测试 MinerU 图像处理流程...")
from apps.common.handle.impl.mineru.config_base import MinerUConfig
from apps.common.handle.impl.mineru.image_processor import MinerUImageProcessor
# 创建配置
config = MinerUConfig()
# 创建图像处理器
processor = MinerUImageProcessor(config)
await processor.initialize()
print("✓ 图像处理器初始化成功")
# 清理资源
await processor.cleanup()
print("✓ 图像处理器清理成功")
async def main():
"""主测试函数"""
print("=" * 60)
print("MinerU 异步上下文修复测试")
print("=" * 60)
# 测试异步模型调用
await test_async_model_calls()
# 测试图像处理流程
await test_mineru_image_processing()
print("\n" + "=" * 60)
print("所有测试完成!")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,101 +0,0 @@
#!/usr/bin/env python3
"""
测试模型ID配置是否正确传递
"""
import os
import sys
from pathlib import Path
# Add paths
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
apps_path = project_root / 'apps'
if apps_path.exists():
sys.path.insert(0, str(apps_path))
# 模拟传入的模型ID
TEST_LLM_ID = "0198e029-bfeb-7d43-a6ee-c88662697d3c"
TEST_VISION_ID = "0198e02c-9f2e-7520-a27b-6376ad42d520"
def test_config_creation():
"""测试配置创建"""
print("=" * 60)
print("Testing MaxKBMinerUConfig creation")
print("=" * 60)
from apps.common.handle.impl.mineru.maxkb_adapter.config_maxkb import MaxKBMinerUConfig
# 方法1直接创建使用默认值或环境变量
print("\n1. Default creation:")
config1 = MaxKBMinerUConfig()
print(f" LLM ID: {config1.llm_model_id}")
print(f" Vision ID: {config1.vision_model_id}")
# 方法2使用工厂方法
print("\n2. Factory method creation:")
config2 = MaxKBMinerUConfig.create(
llm_model_id=TEST_LLM_ID,
vision_model_id=TEST_VISION_ID
)
print(f" LLM ID: {config2.llm_model_id}")
print(f" Vision ID: {config2.vision_model_id}")
# 验证
print("\n3. Verification:")
if config2.llm_model_id == TEST_LLM_ID:
print(" ✅ LLM ID correctly set")
else:
print(f" ❌ LLM ID mismatch: expected {TEST_LLM_ID}, got {config2.llm_model_id}")
if config2.vision_model_id == TEST_VISION_ID:
print(" ✅ Vision ID correctly set")
else:
print(f" ❌ Vision ID mismatch: expected {TEST_VISION_ID}, got {config2.vision_model_id}")
return config2
def test_model_selection():
"""测试模型选择逻辑"""
print("\n" + "=" * 60)
print("Testing model selection logic")
print("=" * 60)
config = MaxKBMinerUConfig.create(
llm_model_id=TEST_LLM_ID,
vision_model_id=TEST_VISION_ID
)
# 模拟 call_litellm 中的逻辑
print("\n1. When use_llm=True:")
use_llm = True
if use_llm:
model_id = config.llm_model_id
else:
model_id = config.vision_model_id
print(f" Selected model ID: {model_id}")
print(f" Expected: {TEST_LLM_ID}")
print(f" Match: {model_id == TEST_LLM_ID}")
print("\n2. When use_llm=False:")
use_llm = False
if use_llm:
model_id = config.llm_model_id
else:
model_id = config.vision_model_id
print(f" Selected model ID: {model_id}")
print(f" Expected: {TEST_VISION_ID}")
print(f" Match: {model_id == TEST_VISION_ID}")
if __name__ == "__main__":
print("Testing Model Configuration")
print("=" * 60)
print(f"Test LLM ID: {TEST_LLM_ID}")
print(f"Test Vision ID: {TEST_VISION_ID}")
config = test_config_creation()
test_model_selection()
print("\n" + "=" * 60)
print("Test completed!")
print("=" * 60)

View File

@ -1,166 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试简化异步音频处理功能
"""
import os
import sys
import asyncio
import time
from unittest.mock import Mock
# 添加项目路径
sys.path.insert(0, '/Users/moshui/Documents/felo/moshui/MaxKB')
from apps.common.handle.impl.media.media_adapter.simple_async_audio_processor import SimpleAsyncAudioProcessor
from apps.common.handle.impl.media.media_adapter.logger import MediaLogger
class MockLogger:
"""模拟日志器"""
def info(self, msg):
print(f"[INFO] {msg}")
def warning(self, msg):
print(f"[WARNING] {msg}")
def error(self, msg, exc_info=False):
print(f"[ERROR] {msg}")
async def test_simple_async_processor():
"""测试简化异步处理器"""
print("=== 测试简化异步音频处理器 ===")
# 创建配置
config = {
'queue_size': 10,
'worker_count': 2, # 2个工作线程
'async_processing': True
}
# 创建日志包装器
mock_logger = MockLogger()
logger_wrapper = MediaLogger(mock_logger)
# 创建简化异步处理器
processor = SimpleAsyncAudioProcessor(config, logger_wrapper)
# 模拟音频数据
test_audio_content = b"fake audio content for testing"
test_file_name = "test_audio.mp3"
# 模拟STT和LLM模型
stt_model = Mock()
stt_model.invoke = Mock(return_value="这是测试转写结果")
llm_model = Mock()
llm_model.invoke = Mock(return_value="这是增强后的文本,带有标点符号。")
# 测试选项
options = {
'enable_punctuation': True,
'enable_summary': True,
'segment_duration': 60, # 1分钟分段
'language': 'zh-CN'
}
try:
print("开始测试简化异步音频处理...")
# 模拟音频时长为3分钟
async def mock_get_duration(content):
return 180.0
processor._get_audio_duration_async = mock_get_duration
# 处理音频
start_time = time.time()
result = await processor.process_audio_async(
test_audio_content, test_file_name, stt_model, llm_model, options
)
end_time = time.time()
print(f"处理完成,耗时: {end_time - start_time:.2f}")
print(f"结果状态: {result['status']}")
print(f"音频时长: {result['duration']:.1f}")
print(f"分段数量: {len(result['segments'])}")
print(f"完整文本长度: {len(result['full_text'])}")
print(f"工作线程数: {result['metadata']['worker_count']}")
# 显示队列状态
queue_status = processor.get_queue_status()
print(f"队列状态: {queue_status}")
# 关闭处理器
await processor.shutdown()
print("简化版本测试完成!")
except Exception as e:
print(f"测试失败: {e}")
import traceback
traceback.print_exc()
def test_audio_processor_integration():
"""测试音频处理器集成"""
print("\n=== 测试音频处理器集成 ===")
from apps.common.handle.impl.media.media_adapter.processors.audio_processor import AudioProcessor
# 创建配置
config = {
'async_processing': True, # 启用异步处理
'worker_count': 2
}
# 创建处理器
processor = AudioProcessor(config, MockLogger())
# 模拟音频数据
test_audio_content = b"fake audio content for testing"
test_file_name = "test_audio.mp3"
# 模拟STT和LLM模型
stt_model = Mock()
stt_model.invoke = Mock(return_value="这是测试转写结果")
llm_model = Mock()
llm_model.invoke = Mock(return_value="这是增强后的文本,带有标点符号。")
# 测试选项
options = {
'async_processing': True, # 显式启用异步
'enable_punctuation': True,
'enable_summary': True,
'segment_duration': 60,
'language': 'zh-CN'
}
try:
print("开始测试音频处理器异步集成...")
# 处理音频
start_time = time.time()
result = processor.process(
test_audio_content, test_file_name, stt_model, llm_model, options
)
end_time = time.time()
print(f"处理完成,耗时: {end_time - start_time:.2f}")
print(f"结果状态: {result['status']}")
print(f"音频时长: {result.get('duration', 0):.1f}")
print(f"分段数量: {len(result.get('segments', []))}")
print("音频处理器集成测试完成!")
except Exception as e:
print(f"音频处理器集成测试失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
# 运行测试
asyncio.run(test_simple_async_processor())
test_audio_processor_integration()

View File

@ -1,131 +0,0 @@
#!/usr/bin/env python
"""
测试MinerU图片存储和访问功能
使用方法
1. 在本地开发环境python test_storage.py
2. 在Docker环境docker exec -it maxkb-dev python /opt/maxkb-app/test_storage.py
"""
import os
import sys
import tempfile
import shutil
from pathlib import Path
def test_storage():
"""测试存储功能"""
print("=" * 60)
print("MinerU 图片存储测试")
print("=" * 60)
# 1. 检查存储路径配置
storage_path = os.getenv('MAXKB_STORAGE_PATH', '/opt/maxkb/storage')
print(f"\n1. 存储路径配置:{storage_path}")
# 2. 创建测试目录结构
test_dir = os.path.join(storage_path, 'test', 'images')
print(f"\n2. 创建测试目录:{test_dir}")
os.makedirs(test_dir, exist_ok=True)
# 3. 创建测试图片文件
test_image_path = os.path.join(test_dir, 'test_image.txt')
print(f"\n3. 创建测试文件:{test_image_path}")
with open(test_image_path, 'w') as f:
f.write("This is a test image file for MinerU storage")
# 4. 验证文件创建
if os.path.exists(test_image_path):
print(" ✓ 文件创建成功")
file_size = os.path.getsize(test_image_path)
print(f" 文件大小:{file_size} bytes")
else:
print(" ✗ 文件创建失败")
return False
# 5. 生成访问URL
relative_path = os.path.relpath(test_image_path, storage_path)
access_url = f"/api/storage/{relative_path}"
print(f"\n4. 生成的访问URL{access_url}")
# 6. 列出存储目录内容
print(f"\n5. 存储目录内容:")
for root, dirs, files in os.walk(storage_path):
level = root.replace(storage_path, '').count(os.sep)
indent = ' ' * 2 * level
print(f'{indent}{os.path.basename(root)}/')
subindent = ' ' * 2 * (level + 1)
for file in files:
print(f'{subindent}{file}')
print("\n" + "=" * 60)
print("测试完成!")
print("\n配置建议:")
print("1. 确保Docker volume正确挂载~/.maxkb/storage:/opt/maxkb/storage")
print("2. 确保环境变量设置MAXKB_STORAGE_PATH=/opt/maxkb/storage")
print("3. 访问图片URL格式http://localhost:8080/api/storage/mineru/images/xxx.jpg")
print("=" * 60)
return True
def test_mineru_adapter():
"""测试MinerU适配器"""
print("\n" + "=" * 60)
print("测试MinerU适配器")
print("=" * 60)
# 添加apps目录到Python路径
sys.path.insert(0, '/opt/maxkb-app/apps' if os.path.exists('/opt/maxkb-app/apps') else './apps')
try:
from common.handle.impl.mineru.maxkb_adapter.adapter import MaxKBAdapter
print("\n1. 创建MaxKB适配器实例")
adapter = MaxKBAdapter()
print(f" 存储路径:{adapter.storage_path}")
# 创建临时测试文件
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as tmp:
tmp.write(b"Test image content")
tmp_path = tmp.name
print(f"\n2. 测试upload_file方法")
print(f" 源文件:{tmp_path}")
# 使用异步方式调用
import asyncio
async def test_upload():
result = await adapter.upload_file(tmp_path, options=['test_knowledge'])
return result
# 运行异步测试
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result_url = loop.run_until_complete(test_upload())
print(f" 返回URL{result_url}")
# 清理临时文件
os.unlink(tmp_path)
print("\n✓ MinerU适配器测试成功")
except ImportError as e:
print(f"\n✗ 无法导入MinerU适配器{e}")
print(" 请确保在MaxKB环境中运行此测试")
except Exception as e:
print(f"\n✗ 测试失败:{e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
# 运行存储测试
if test_storage():
# 如果基础存储测试成功,尝试测试适配器
try:
test_mineru_adapter()
except:
print("\n提示适配器测试需要在MaxKB环境中运行")

View File

@ -1,22 +0,0 @@
#!/usr/bin/env python
"""
简单的存储测试 - 创建测试图片
"""
import os
# 创建存储目录
storage_path = './tmp/maxkb/storage/mineru/images'
os.makedirs(storage_path, exist_ok=True)
# 创建测试图片(实际是一个文本文件,但后缀是.jpg
test_file = os.path.join(storage_path, 'ac3681aaa7a346b49ef9c7ceb7b94058.jpg')
with open(test_file, 'wb') as f:
# 写入一个最小的JPEG文件头这样浏览器会识别为图片
# FF D8 FF E0 是JPEG文件的魔术数字
f.write(bytes.fromhex('FFD8FFE000104A46494600010101006000600000FFDB004300080606070605080707070909080A0C140D0C0B0B0C1912130F141D1A1F1E1D1A1C1C20242E2720222C231C1C2837292C30313434341F27393D38323C2E333432FFDB0043010909090C0B0C180D0D1832211C2132323232323232323232323232323232323232323232323232323232323232323232323232323232323232323232323232FFC00011080001000103012200021101031101FFC4001F0000010501010101010100000000000000000102030405060708090A0BFFC400B5100002010303020403050504040000017D01020300041105122131410613516107227114328191A1082342B1C11552D1F02433627282090A161718191A25262728292A3435363738393A434445464748494A535455565758595A636465666768696A737475767778797A838485868788898A92939495969798999AA2A3A4A5A6A7A8A9AAB2B3B4B5B6B7B8B9BAC2C3C4C5C6C7C8C9CAD2D3D4D5D6D7D8D9DAE1E2E3E4E5E6E7E8E9EAF1F2F3F4F5F6F7F8F9FAFFC4001F0100030101010101010101010000000000000102030405060708090A0BFFC400B51100020102040403040705040400010277000102031104052131061241510761711322328108144291A1B1C109233352F0156272D10A162434E125F11718191A262728292A35363738393A434445464748494A535455565758595A636465666768696A737475767778797A82838485868788898A92939495969798999AA2A3A4A5A6A7A8A9AAB2B3B4B5B6B7B8B9BAC2C3C4C5C6C7C8C9CAD2D3D4D5D6D7D8D9DAE2E3E4E5E6E7E8E9EAF2F3F4F5F6F7F8F9FAFFDA000C03010002110311003F00F9FFD9'))
print(f"测试文件已创建:{test_file}")
print(f"文件大小:{os.path.getsize(test_file)} bytes")
print("\n访问URL")
print("http://localhost:8080/storage/mineru/images/ac3681aaa7a346b49ef9c7ceb7b94058.jpg")
print("\n如果Django服务正在运行可以直接在浏览器中访问上述URL")

View File

@ -1,121 +0,0 @@
#!/usr/bin/env python3
"""
测试URL修复 - 验证platform_adapter是否正确传递
"""
import os
import sys
import asyncio
from pathlib import Path
# Add paths
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
apps_path = project_root / 'apps'
if apps_path.exists():
sys.path.insert(0, str(apps_path))
# Set environment variables for testing
os.environ['MAXKB_BASE_URL'] = 'http://xbase.aitravelmaster.com'
os.environ['MINERU_API_TYPE'] = 'cloud' # Force cloud mode for testing
async def test_url_generation():
"""Test that URLs are generated correctly"""
# Import after setting environment
from apps.common.handle.impl.mineru.maxkb_adapter.adapter import MaxKBAdapter
# Create adapter
adapter = MaxKBAdapter()
# Create a test file
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.pdf', delete=False) as f:
f.write('test')
test_file = f.name
try:
# Test upload_file
print("Testing MaxKBAdapter.upload_file()...")
url = await adapter.upload_file(test_file, ['test_knowledge_id'])
print(f"\n✅ Generated URL: {url}")
# Verify URL format
if url.startswith('http://') or url.startswith('https://'):
print("✅ URL is properly formatted for Cloud API")
else:
print(f"❌ URL is not valid for Cloud API: {url}")
# Check if MAXKB_BASE_URL is used
base_url = os.environ.get('MAXKB_BASE_URL', '')
if base_url and url.startswith(base_url):
print(f"✅ URL correctly uses MAXKB_BASE_URL: {base_url}")
else:
print(f"❌ URL does not use MAXKB_BASE_URL")
finally:
# Clean up
if os.path.exists(test_file):
os.unlink(test_file)
async def test_api_client_with_adapter():
"""Test that MinerUAPIClient receives platform_adapter correctly"""
from apps.common.handle.impl.mineru.api_client import MinerUAPIClient
from apps.common.handle.impl.mineru.maxkb_adapter.adapter import MaxKBAdapter
from apps.common.handle.impl.mineru.maxkb_adapter.config_maxkb import MaxKBMinerUConfig
print("\nTesting MinerUAPIClient with platform_adapter...")
# Create components
adapter = MaxKBAdapter()
config = MaxKBMinerUConfig()
# Create API client with adapter
api_client = MinerUAPIClient(config, adapter)
# Check if adapter is set
if api_client.platform_adapter is not None:
print("✅ platform_adapter is correctly set in MinerUAPIClient")
else:
print("❌ platform_adapter is None in MinerUAPIClient")
# Test _upload_file_to_accessible_url
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.pdf', delete=False) as f:
f.write('test')
test_file = f.name
try:
# Test upload through API client
async with api_client:
url = await api_client._upload_file_to_accessible_url(test_file, 'test_src_id')
print(f"✅ URL from _upload_file_to_accessible_url: {url}")
if url.startswith('http://') or url.startswith('https://'):
print("✅ API client generates valid URL for Cloud API")
else:
print(f"❌ API client generates invalid URL: {url}")
finally:
if os.path.exists(test_file):
os.unlink(test_file)
if __name__ == "__main__":
print("=" * 60)
print("Testing MinerU Cloud API URL Fix")
print("=" * 60)
# Check environment
print("\nEnvironment:")
print(f"MAXKB_BASE_URL: {os.environ.get('MAXKB_BASE_URL', 'NOT SET')}")
print(f"MINERU_API_TYPE: {os.environ.get('MINERU_API_TYPE', 'NOT SET')}")
# Run tests
asyncio.run(test_url_generation())
asyncio.run(test_api_client_with_adapter())
print("\n" + "=" * 60)
print("Test completed!")
print("=" * 60)

View File

@ -1,94 +0,0 @@
#!/usr/bin/env python3
"""
简单测试URL生成逻辑
"""
import os
import tempfile
import shutil
import uuid
# 设置环境变量
os.environ['MAXKB_BASE_URL'] = 'http://xbase.aitravelmaster.com'
def test_url_generation():
"""模拟adapter.py中的upload_file逻辑"""
# 创建测试文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.pdf', delete=False) as f:
f.write('test')
file_path = f.name
try:
# 模拟upload_file的逻辑
storage_path = '/tmp/storage' # 模拟存储路径
# 创建存储目录
sub_dir = 'mineru'
storage_dir = os.path.join(storage_path, sub_dir, 'images')
os.makedirs(storage_dir, exist_ok=True)
# 生成文件名
file_ext = os.path.splitext(file_path)[1]
file_name = f"{uuid.uuid4().hex}{file_ext}"
dest_path = os.path.join(storage_dir, file_name)
# 复制文件
shutil.copy2(file_path, dest_path)
# 生成URL这是关键部分
relative_path = os.path.relpath(dest_path, storage_path)
relative_path = relative_path.replace(os.path.sep, '/')
# 检查环境变量
base_url = os.getenv('MAXKB_BASE_URL', '')
print(f"MAXKB_BASE_URL from env: '{base_url}'")
print(f"Relative path: {relative_path}")
if base_url:
result_url = f"{base_url.rstrip('/')}/storage/{relative_path}"
print(f"✅ Generated full URL: {result_url}")
else:
result_url = f"/storage/{relative_path}"
print(f"⚠️ Generated relative URL: {result_url}")
# 验证URL格式
if result_url.startswith(('http://', 'https://')):
print("✅ URL is valid for Cloud API")
else:
print("❌ URL is NOT valid for Cloud API (must start with http:// or https://)")
return result_url
finally:
# 清理
if os.path.exists(file_path):
os.unlink(file_path)
# 清理存储目录
if os.path.exists('/tmp/storage'):
shutil.rmtree('/tmp/storage')
if __name__ == "__main__":
print("=" * 60)
print("Testing URL Generation Logic")
print("=" * 60)
print()
# 测试1有MAXKB_BASE_URL
print("Test 1: With MAXKB_BASE_URL set")
print("-" * 40)
url1 = test_url_generation()
print("\n" + "=" * 60)
# 测试2没有MAXKB_BASE_URL
print("\nTest 2: Without MAXKB_BASE_URL")
print("-" * 40)
os.environ['MAXKB_BASE_URL'] = ''
url2 = test_url_generation()
print("\n" + "=" * 60)
print("Summary:")
print(f"With MAXKB_BASE_URL: {url1}")
print(f"Without MAXKB_BASE_URL: {url2}")
print("=" * 60)