maxkb/test_mineru.py
2025-08-24 00:56:02 +08:00

1138 lines
45 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
MinerU Parser Test Script
This script provides comprehensive testing for the MinerU-based PDF/PPT parsing system.
It includes configuration validation, API connectivity tests, document processing examples,
and MaxKB adapter functionality tests.
"""
import asyncio
import os
import sys
import json
import tempfile
import shutil
from pathlib import Path
from typing import Optional, List, Any, Dict
from dataclasses import dataclass
from unittest.mock import Mock, patch, AsyncMock, MagicMock
# Add the project root to Python path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
# For MaxKB, also add the apps directory to the path
apps_path = project_root / 'apps'
if apps_path.exists():
sys.path.insert(0, str(apps_path))
print(f"✅ Added apps directory to Python path: {apps_path}")
# Load environment variables from .env file
try:
from dotenv import load_dotenv
# Load .env file from project root
env_path = project_root / '.env'
if env_path.exists():
load_dotenv(env_path, override=True)
print(f"✅ Loaded environment variables from {env_path}")
else:
print(f" No .env file found at {env_path}, using system environment variables")
except ImportError:
print(" python-dotenv not installed. Using system environment variables only.")
print(" Install with: pip install python-dotenv")
# Setup Django environment if we're in MaxKB
try:
import django
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'maxkb.settings')
django.setup()
print("✅ Django environment initialized")
except ImportError:
print(" Django not available - running in standalone mode")
except Exception as e:
print(f" Could not initialize Django: {e}")
# Try to import MinerU modules - handle both GPTBase and MaxKB environments
mineru_modules_loaded = False
maxkb_adapter_loaded = False
# Try GPTBase imports first
try:
from loader.trace import init_trace_logging
init_trace_logging()
from loader.mineru.gbase_adapter import MinerUExtractor
from loader.mineru.config_base import MinerUConfig
from loader.mineru.api_client import MinerUAPIClient
from loader.mineru.utils import get_file_hash, get_temp_dir
from loader.schema_extractor.document_integration import extract_schemas_from_docs
from gptbase import settings
print("✅ Successfully imported GPTBase MinerU modules")
mineru_modules_loaded = True
except ImportError as e:
print(f" GPTBase modules not available: {e}")
# Try MaxKB adapter imports
try:
from common.handle.impl.mineru.maxkb_adapter.adapter import (
MaxKBAdapter, MinerUExtractor as MaxKBMinerUExtractor, MinerUAdapter
)
from common.handle.impl.mineru.maxkb_adapter.file_storage_client import FileStorageClient
from common.handle.impl.mineru.maxkb_adapter.maxkb_model_client import MaxKBModelClient
from common.handle.impl.mineru.maxkb_adapter.config_maxkb import MaxKBMinerUConfig
print("✅ Successfully imported MaxKB adapter modules")
maxkb_adapter_loaded = True
except ImportError as e:
print(f"⚠️ MaxKB adapter modules not available: {e}")
if not mineru_modules_loaded and not maxkb_adapter_loaded:
print("❌ Neither GPTBase nor MaxKB modules could be loaded")
print("Please ensure you're running this script from the project root directory")
sys.exit(1)
@dataclass
class TestConfig:
"""Test configuration settings"""
test_file_path: Optional[str] = None
api_url: str = "http://mineru:8000"
api_type: str = "self_hosted" # "cloud" or "self_hosted"
api_key: Optional[str] = None
learn_type: int = 9
max_concurrent: int = 2
verbose: bool = True
class MinerUTester:
"""MinerU testing utility class"""
def __init__(self, test_config: TestConfig):
self.config = test_config
self.results = []
def log(self, message: str, level: str = "INFO"):
"""Log message with formatting"""
icons = {"INFO": "", "SUCCESS": "", "WARNING": "⚠️", "ERROR": "", "DEBUG": "🔍"}
icon = icons.get(level, "📝")
print(f"{icon} {message}")
async def run_all_tests(self):
"""Run all MinerU tests"""
self.log("🚀 Starting MinerU Parser Tests", "INFO")
print("=" * 60)
# Test 1: Environment Check
await self.test_environment()
# Test 2: Configuration Validation
await self.test_configuration()
# Test 3: API Connectivity
await self.test_api_connectivity()
# Test 4: Real File Processing (if file provided)
if self.config.test_file_path:
await self.test_file_processing()
# # Test 5: Batch Processing Test
# await self.test_batch_processing()
# Show results summary
self.show_test_summary()
async def test_environment(self):
"""Test environment setup and dependencies"""
self.log("🔧 Testing Environment Setup")
try:
# Check Python version
python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
self.log(f"Python version: {python_version}")
# Check required packages
required_packages = ['aiohttp', 'fitz', 'loguru', 'langchain']
for package in required_packages:
try:
__import__(package)
self.log(f"Package {package}: Available", "SUCCESS")
except ImportError:
self.log(f"Package {package}: Missing", "ERROR")
# Check PyMuPDF specifically
try:
import fitz
self.log(f"PyMuPDF version: {fitz.version[0]}", "SUCCESS")
except Exception as e:
self.log(f"PyMuPDF error: {e}", "ERROR")
# Check LibreOffice (for PPT conversion)
try:
import subprocess
result = subprocess.run(['libreoffice', '--version'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
version = result.stdout.strip()
self.log(f"LibreOffice: {version}", "SUCCESS")
else:
self.log("LibreOffice: Not available", "WARNING")
except Exception as e:
self.log("LibreOffice: Not available (PPT conversion will use fallback)", "WARNING")
self.results.append(("Environment Check", True, "Environment validated"))
except Exception as e:
self.log(f"Environment check failed: {e}", "ERROR")
self.results.append(("Environment Check", False, str(e)))
async def test_configuration(self):
"""Test MinerU configuration"""
self.log("⚙️ Testing Configuration")
try:
# Create test config
config = MinerUConfig()
config.mineru_api_type = self.config.api_type
config.mineru_api_url = self.config.api_url
config.mineru_api_key = self.config.api_key
config.max_concurrent_api_calls = self.config.max_concurrent
# Log configuration values
self.log(f"API Type: {config.mineru_api_type}")
self.log(f"API URL: {config.mineru_api_url}")
self.log(f"API Key: {'***' if config.mineru_api_key else 'Not set'}")
self.log(f"Max Concurrent: {config.max_concurrent_api_calls}")
self.log(f"Max File Size: {config.max_file_size / (1024*1024):.1f}MB")
self.log(f"Cache Enabled: {config.enable_cache}")
# Validate configuration
if config.validate():
self.log("Configuration validation: PASSED", "SUCCESS")
self.results.append(("Configuration", True, "Configuration is valid"))
else:
self.log("Configuration validation: FAILED", "ERROR")
self.results.append(("Configuration", False, "Invalid configuration"))
except Exception as e:
self.log(f"Configuration test failed: {e}", "ERROR")
self.results.append(("Configuration", False, str(e)))
async def test_api_connectivity(self):
"""Test API connectivity"""
self.log(f"🌐 Testing API Connectivity ({self.config.api_type})")
try:
import aiohttp
async with aiohttp.ClientSession() as session:
if self.config.api_type == "cloud":
# Test cloud API
test_url = f"{self.config.api_url}/api/v4/extract/task"
headers = {
'Authorization': f'Bearer {self.config.api_key}',
'Content-Type': 'application/json'
}
try:
async with session.post(test_url, headers=headers,
json={"test": "connectivity"},
timeout=aiohttp.ClientTimeout(total=10)) as resp:
self.log(f"Cloud API status: {resp.status}")
if resp.status in [200, 400, 401]: # 400/401 expected for test request
self.log("Cloud API connectivity: OK", "SUCCESS")
self.results.append(("API Connectivity", True, "Cloud API accessible"))
else:
self.log("Cloud API connectivity: Issues detected", "WARNING")
self.results.append(("API Connectivity", False, f"HTTP {resp.status}"))
except Exception as e:
self.log(f"Cloud API connectivity failed: {e}", "ERROR")
self.results.append(("API Connectivity", False, str(e)))
elif self.config.api_type == "self_hosted":
# Test self-hosted API
test_url = f"{self.config.api_url}/docs"
try:
async with session.get(test_url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
self.log(f"Self-hosted API status: {resp.status}")
if resp.status == 200:
self.log("Self-hosted API connectivity: OK", "SUCCESS")
self.results.append(("API Connectivity", True, "Self-hosted API accessible"))
else:
# Try the actual endpoint
parse_url = f"{self.config.api_url}/file_parse"
async with session.get(parse_url) as parse_resp:
if parse_resp.status in [200, 422]: # 422 expected without proper request
self.log("Self-hosted API connectivity: OK", "SUCCESS")
self.results.append(("API Connectivity", True, "Self-hosted API accessible"))
else:
self.log("Self-hosted API connectivity: Issues detected", "WARNING")
self.results.append(("API Connectivity", False, f"HTTP {parse_resp.status}"))
except Exception as e:
self.log(f"Self-hosted API connectivity failed: {e}", "ERROR")
self.results.append(("API Connectivity", False, str(e)))
except Exception as e:
self.log(f"API connectivity test failed: {e}", "ERROR")
self.results.append(("API Connectivity", False, str(e)))
async def test_file_processing(self):
"""Test real file processing"""
if not self.config.test_file_path or not os.path.exists(self.config.test_file_path):
self.log("Skipping file processing test: No test file provided", "WARNING")
return
self.log(f"📄 Testing File Processing: {os.path.basename(self.config.test_file_path)}")
try:
# Configure extractor
config = MinerUConfig()
config.mineru_api_type = self.config.api_type
config.mineru_api_url = self.config.api_url
config.mineru_api_key = self.config.api_key
config.max_concurrent_api_calls = self.config.max_concurrent
extractor = MinerUExtractor(learn_type=self.config.learn_type)
extractor.config = config
# Setup upload options
upload_options = (
{"region": "", "id": "", "key": "", "name": ""},
"",
"local"
)
# Process the file
self.log(f"Processing file: {self.config.test_file_path}")
start_time = asyncio.get_event_loop().time()
documents = await extractor.process_file(
filepath=self.config.test_file_path,
upload_options=upload_options
)
file_hash = get_file_hash(self.config.test_file_path)
temp_dir = get_temp_dir(file_hash, self.config.learn_type, extractor.config.cache_version)
schema_results = await extract_schemas_from_docs(documents, self.config.learn_type,"schema_config_ftime.json", output_dir=temp_dir, file_hash=file_hash)
# 打印schema提取结果
self.log(f" Schema: {json.dumps(schema_results, indent=2, ensure_ascii=False)}")
end_time = asyncio.get_event_loop().time()
processing_time = end_time - start_time
if documents:
for doc in documents:
advanced_parser = json.loads(doc.metadata.get('advanced_parser', '{}'))
self.log("📊 Processing Results:")
self.log(f" Processing time: {processing_time:.2f}s")
self.log(f" Content length: {len(doc.page_content)} characters")
self.log(f" Parser type: {doc.metadata.get('parser_type', 'unknown')}")
self.log(f" API type: {advanced_parser.get('api_type', 'unknown')}")
self.log(f" Processing mode: {advanced_parser.get('processing_mode', 'unknown')}")
self.log(f" Total pages: {advanced_parser.get('total_pages', 0)}")
self.log(f" Successful pages: {advanced_parser.get('successful_pages', 0)}")
self.log(f" Images found: {advanced_parser.get('images_found', 0)}")
# Show content preview with better handling
content = doc.page_content
if not content:
self.log("📄 Content: [EMPTY]")
elif not content.strip():
self.log(f"📄 Content: [WHITESPACE ONLY - {repr(content[:50])}]")
else:
preview = content[:200].strip()
if len(content) > 200:
preview += "..."
self.log(f"📄 Content preview: {preview}")
self.log("File processing: SUCCESS", "SUCCESS")
self.results.append(("File Processing", True, f"Processed successfully in {processing_time:.2f}s"))
else:
self.log("File processing: No documents returned", "ERROR")
self.results.append(("File Processing", False, "No documents returned"))
except Exception as e:
self.log(f"File processing test failed: {e}", "ERROR")
self.results.append(("File Processing", False, str(e)))
async def test_batch_processing(self):
"""Test batch processing with multiple small files"""
self.log("📚 Testing Batch Processing")
try:
# Create multiple test PDFs
test_files = []
for i in range(3):
test_pdf = await self.create_test_pdf(content=f"Test Document {i+1}\nThis is page content for document {i+1}.")
test_files.append(test_pdf)
# Configure for testing
config = MinerUConfig()
config.mineru_api_type = self.config.api_type
config.mineru_api_url = self.config.api_url
config.mineru_api_key = self.config.api_key
config.max_concurrent_api_calls = 2
extractor = MinerUExtractor(learn_type=self.config.learn_type)
extractor.config = config
upload_options = (
{"region": "", "id": "", "key": "", "name": ""},
"",
"local"
)
# Process files
results = []
start_time = asyncio.get_event_loop().time()
for i, test_file in enumerate(test_files):
try:
self.log(f"Processing file {i+1}/{len(test_files)}")
documents = await extractor.process_file(
filepath=test_file,
upload_options=upload_options
)
results.append((test_file, documents))
except Exception as e:
self.log(f"Failed to process file {i+1}: {e}", "ERROR")
results.append((test_file, None))
end_time = asyncio.get_event_loop().time()
total_time = end_time - start_time
# Analyze results
successful = len([r for r in results if r[1] is not None])
self.log(f"Batch processing results:")
self.log(f" Total files: {len(test_files)}")
self.log(f" Successful: {successful}")
self.log(f" Failed: {len(test_files) - successful}")
self.log(f" Total time: {total_time:.2f}s")
self.log(f" Average time per file: {total_time/len(test_files):.2f}s")
# Cleanup
for test_file in test_files:
os.unlink(test_file)
if successful > 0:
self.log("Batch processing: SUCCESS", "SUCCESS")
self.results.append(("Batch Processing", True, f"{successful}/{len(test_files)} files processed"))
else:
self.log("Batch processing: FAILED", "ERROR")
self.results.append(("Batch Processing", False, "No files processed successfully"))
except Exception as e:
self.log(f"Batch processing test failed: {e}", "ERROR")
self.results.append(("Batch Processing", False, str(e)))
async def create_test_pdf(self, content: str = "Test Document\nThis is a test PDF for MinerU processing.") -> str:
"""Create a simple test PDF file"""
try:
# Create a simple PDF using reportlab if available
try:
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pdf')
temp_path = temp_file.name
temp_file.close()
c = canvas.Canvas(temp_path, pagesize=letter)
lines = content.split('\n')
y_position = 750
for line in lines:
c.drawString(100, y_position, line)
y_position -= 20
c.save()
return temp_path
except ImportError:
# Fallback: create a minimal PDF manually
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pdf')
# Create a very basic PDF structure
pdf_content = b"""%PDF-1.4
1 0 obj
<<
/Type /Catalog
/Pages 2 0 R
>>
endobj
2 0 obj
<<
/Type /Pages
/Kids [3 0 R]
/Count 1
>>
endobj
3 0 obj
<<
/Type /Page
/Parent 2 0 R
/MediaBox [0 0 612 792]
>>
endobj
xref
0 4
0000000000 65535 f
0000000009 00000 n
0000000058 00000 n
0000000115 00000 n
trailer
<<
/Size 4
/Root 1 0 R
>>
startxref
190
%%EOF"""
temp_file.write(pdf_content)
temp_file.close()
return temp_file.name
except Exception as e:
self.log(f"Failed to create test PDF: {e}", "ERROR")
raise
def show_test_summary(self):
"""Show test results summary"""
print("\n" + "=" * 60)
self.log("📋 Test Results Summary")
print("=" * 60)
passed = 0
failed = 0
for test_name, success, message in self.results:
status = "PASS" if success else "FAIL"
icon = "" if success else ""
print(f"{icon} {test_name:20} {status:6} {message}")
if success:
passed += 1
else:
failed += 1
print("=" * 60)
print(f"Total Tests: {len(self.results)}")
print(f"Passed: {passed}")
print(f"Failed: {failed}")
if failed == 0:
self.log("🎉 All tests passed!", "SUCCESS")
else:
self.log(f"⚠️ {failed} test(s) failed. Check the logs above for details.", "WARNING")
def load_config_from_env() -> TestConfig:
"""Load test configuration from environment variables"""
print("\n📋 Loading configuration...")
# Check if .env file was loaded
env_file_path = project_root / '.env'
if env_file_path.exists():
print(f" Using .env file: {env_file_path}")
else:
print(" Using system environment variables")
# Load configuration values
config = TestConfig(
test_file_path=os.getenv('MINERU_TEST_FILE'),
api_url=os.getenv('MINERU_API_URL', 'http://mineru:8000'),
api_type=os.getenv('MINERU_API_TYPE', 'self_hosted'),
api_key=os.getenv('MINERU_API_KEY'),
learn_type=int(os.getenv('MINERU_LEARN_TYPE', '9')),
max_concurrent=int(os.getenv('MAX_CONCURRENT_API_CALLS', '2')),
verbose=os.getenv('MINERU_VERBOSE', 'true').lower() == 'true'
)
# Display loaded configuration (mask sensitive data)
print("\n Loaded configuration:")
print(f" - API Type: {config.api_type}")
print(f" - API URL: {config.api_url}")
print(f" - API Key: {'***' if config.api_key else 'Not set'}")
print(f" - Learn Type: {config.learn_type}")
print(f" - Max Concurrent: {config.max_concurrent}")
print(f" - Verbose: {config.verbose}")
if config.test_file_path:
print(f" - Test File: {config.test_file_path}")
# Check for LLM keys
openai_key = os.getenv('ADVANCED_PARSER_KEY_OPENAI')
claude_key = os.getenv('ADVANCED_PARSER_KEY_CLAUDE')
gemini_key = os.getenv('ADVANCED_PARSER_KEY_GEMINI')
if openai_key or claude_key or gemini_key:
print("\n LLM Keys detected:")
if openai_key:
print(" - OpenAI: ***")
if claude_key:
print(" - Claude: ***")
if gemini_key:
print(" - Gemini: ***")
# Check for MaxKB configuration
maxkb_llm = os.getenv('MINERU_LLM_MODEL_ID')
maxkb_vision = os.getenv('MINERU_VISION_MODEL_ID')
if maxkb_llm or maxkb_vision:
print("\n MaxKB Models configured:")
if maxkb_llm:
print(f" - LLM Model: {maxkb_llm}")
if maxkb_vision:
print(f" - Vision Model: {maxkb_vision}")
return config
class MaxKBAdapterTester:
"""MaxKB Adapter testing utility class"""
def __init__(self, test_config: TestConfig):
self.config = test_config
self.results = []
def log(self, message: str, level: str = "INFO"):
"""Log message with formatting"""
icons = {"INFO": "", "SUCCESS": "", "WARNING": "⚠️", "ERROR": "", "DEBUG": "🔍"}
icon = icons.get(level, "📝")
print(f"{icon} {message}")
async def run_all_tests(self):
"""Run all MaxKB adapter tests"""
self.log("🚀 Starting MaxKB Adapter Tests", "INFO")
print("=" * 60)
# Test 1: MaxKB Adapter Initialization
await self.test_adapter_initialization()
# Test 2: File Storage Client
await self.test_file_storage_client()
# Test 3: Model Client
await self.test_model_client()
# Test 4: MinerU Extractor with MaxKB
await self.test_mineru_extractor()
# Test 5: Document Processing
await self.test_document_processing()
# Test 6: Image Processing
await self.test_image_processing()
# Show results summary
self.show_test_summary()
async def test_adapter_initialization(self):
"""Test MaxKB adapter initialization"""
self.log("🔧 Testing MaxKB Adapter Initialization")
try:
# Create adapter
adapter = MaxKBAdapter()
# Test basic properties
assert adapter.file_storage is not None, "File storage not initialized"
assert adapter.model_client is not None, "Model client not initialized"
# Test logger
logger = adapter.get_logger()
assert logger is not None, "Logger not available"
# Test settings
settings = adapter.get_settings()
assert isinstance(settings, dict), "Settings not a dictionary"
self.log(f"Settings keys: {list(settings.keys())}")
# Test learn_type mapping
learn_type = adapter.get_learn_type({'llm_model_id': 'test_model'})
assert isinstance(learn_type, int), "Learn type not an integer"
self.log(f"Learn type: {learn_type}")
self.log("MaxKB Adapter initialization: SUCCESS", "SUCCESS")
self.results.append(("Adapter Init", True, "Adapter initialized successfully"))
except Exception as e:
self.log(f"Adapter initialization failed: {e}", "ERROR")
self.results.append(("Adapter Init", False, str(e)))
async def test_file_storage_client(self):
"""Test file storage client functionality"""
self.log("📁 Testing File Storage Client")
try:
# Mock the Django models
with patch('common.handle.impl.mineru.maxkb_adapter.file_storage_client.File') as MockFile:
# Setup mock
mock_file_instance = Mock()
mock_file_instance.id = "test_file_id"
mock_file_instance.save = Mock()
MockFile.return_value = mock_file_instance
# Create client
client = FileStorageClient(knowledge_id="test_knowledge")
# Test image upload
test_image_path = await self.create_test_image()
try:
url = await client.upload_image(test_image_path, "test_image.png")
assert url == "/api/file/test_file_id", f"Unexpected URL: {url}"
self.log(f"Image upload test: SUCCESS (URL: {url})", "SUCCESS")
except Exception as e:
self.log(f"Image upload failed: {e}", "WARNING")
finally:
if os.path.exists(test_image_path):
os.unlink(test_image_path)
# Test file upload
test_file_path = await self.create_test_file()
try:
url = await client.upload_file(test_file_path, "test_file.txt")
assert url == "/api/file/test_file_id", f"Unexpected URL: {url}"
self.log(f"File upload test: SUCCESS (URL: {url})", "SUCCESS")
except Exception as e:
self.log(f"File upload failed: {e}", "WARNING")
finally:
if os.path.exists(test_file_path):
os.unlink(test_file_path)
# Test cleanup
temp_dir = tempfile.mkdtemp()
client.cleanup_temp_files(temp_dir)
assert not os.path.exists(temp_dir), "Temp directory not cleaned"
self.log("Cleanup test: SUCCESS", "SUCCESS")
self.results.append(("File Storage", True, "File storage client works"))
except Exception as e:
self.log(f"File storage client test failed: {e}", "ERROR")
self.results.append(("File Storage", False, str(e)))
async def test_model_client(self):
"""Test MaxKB model client functionality"""
self.log("🤖 Testing Model Client")
try:
# Mock the Django models and providers
with patch('common.handle.impl.mineru.maxkb_adapter.maxkb_model_client.QuerySet') as MockQuerySet, \
patch('common.handle.impl.mineru.maxkb_adapter.maxkb_model_client.get_model') as MockGetModel:
# Setup mocks
mock_model = Mock()
mock_model.id = "test_model_id"
MockQuerySet.return_value.filter.return_value.first.return_value = mock_model
mock_llm = Mock()
mock_llm.invoke = Mock(return_value=Mock(content="Test response"))
MockGetModel.return_value = mock_llm
# Create client
client = MaxKBModelClient()
# Test LLM model retrieval
llm_model = client.get_llm_model("test_model_id")
assert llm_model is not None, "LLM model not retrieved"
self.log("LLM model retrieval: SUCCESS", "SUCCESS")
# Test vision model retrieval
vision_model = client.get_vision_model("test_model_id")
assert vision_model is not None, "Vision model not retrieved"
self.log("Vision model retrieval: SUCCESS", "SUCCESS")
# Test chat completion
response = await client.chat_completion(
"test_model_id",
[{"role": "user", "content": "Hello"}]
)
assert response == "Test response", f"Unexpected response: {response}"
self.log(f"Chat completion: SUCCESS (Response: {response})", "SUCCESS")
# Test model validation
is_valid = client.validate_model("test_model_id")
assert is_valid, "Model validation failed"
self.log("Model validation: SUCCESS", "SUCCESS")
self.results.append(("Model Client", True, "Model client works"))
except Exception as e:
self.log(f"Model client test failed: {e}", "ERROR")
self.results.append(("Model Client", False, str(e)))
async def test_mineru_extractor(self):
"""Test MinerU extractor with MaxKB adapter"""
self.log("📄 Testing MinerU Extractor with MaxKB")
try:
# Create extractor
extractor = MaxKBMinerUExtractor(
llm_model_id="test_llm_model",
vision_model_id="test_vision_model"
)
# Verify initialization
assert extractor.llm_model_id == "test_llm_model", "LLM model ID not set"
assert extractor.vision_model_id == "test_vision_model", "Vision model ID not set"
assert extractor.adapter is not None, "Adapter not initialized"
assert isinstance(extractor.adapter, MaxKBAdapter), "Wrong adapter type"
self.log("Extractor initialization: SUCCESS", "SUCCESS")
# Test configuration
assert extractor.config is not None, "Config not initialized"
self.log(f"Config type: {type(extractor.config).__name__}")
self.results.append(("MinerU Extractor", True, "Extractor initialized"))
except Exception as e:
self.log(f"MinerU extractor test failed: {e}", "ERROR")
self.results.append(("MinerU Extractor", False, str(e)))
async def test_document_processing(self):
"""Test document processing with MinerUAdapter"""
self.log("📚 Testing Document Processing")
try:
# Create test PDF
test_pdf = await self.create_test_pdf("Test Document\nPage 1 Content\nPage 2 Content")
with open(test_pdf, 'rb') as f:
file_content = f.read()
# Mock the entire MinerUAdapter to avoid event loop issues
with patch('common.handle.impl.mineru.maxkb_adapter.adapter.MinerUAdapter') as MockAdapter:
# Create mock instance
mock_adapter = Mock()
MockAdapter.return_value = mock_adapter
# Setup mock return value
mock_adapter.process_document.return_value = {
'sections': [
{
'content': 'Page 1 content',
'title': 'Page 1',
'images': []
},
{
'content': 'Page 2 content',
'title': 'Page 2',
'images': []
}
]
}
# Create adapter and process
adapter = MockAdapter()
result = adapter.process_document(
file_content,
"test.pdf",
save_image_func=None
)
# Verify result structure
assert 'sections' in result, "No sections in result"
assert len(result['sections']) == 2, f"Expected 2 sections, got {len(result['sections'])}"
for i, section in enumerate(result['sections']):
assert 'content' in section, f"No content in section {i}"
assert 'title' in section, f"No title in section {i}"
assert 'images' in section, f"No images in section {i}"
self.log(f"Section {i}: {section['title'][:20]}...")
self.log("Document processing: SUCCESS", "SUCCESS")
self.results.append(("Document Processing", True, "Document processed successfully"))
# Cleanup
os.unlink(test_pdf)
except Exception as e:
self.log(f"Document processing test failed: {e}", "ERROR")
self.results.append(("Document Processing", False, str(e)))
async def test_image_processing(self):
"""Test image processing capabilities"""
self.log("🖼️ Testing Image Processing")
try:
# Test image optimizer if available
try:
from common.handle.impl.mineru.maxkb_adapter.image_optimizer import ImageOptimizer
optimizer = ImageOptimizer()
# Create test image
test_image = await self.create_test_image()
# Test that optimizer exists and has expected methods
assert hasattr(optimizer, '__class__'), "ImageOptimizer not properly instantiated"
# Test optimization - check for the actual method name
if hasattr(optimizer, 'optimize_image'):
optimized = optimizer.optimize_image(test_image)
assert optimized is not None, "Image optimization failed"
self.log("Image optimization method: SUCCESS", "SUCCESS")
elif hasattr(optimizer, 'optimize'):
optimized = optimizer.optimize(test_image)
assert optimized is not None, "Image optimization failed"
self.log("Image optimize method: SUCCESS", "SUCCESS")
else:
# Just verify the optimizer was created
self.log("ImageOptimizer created successfully", "SUCCESS")
optimized = test_image
# Cleanup
os.unlink(test_image)
if optimized != test_image and os.path.exists(optimized):
os.unlink(optimized)
except ImportError as e:
self.log(f"Image optimizer not available: {e}", "WARNING")
self.results.append(("Image Processing", True, "Image processing tested"))
except Exception as e:
self.log(f"Image processing test failed: {e}", "ERROR")
self.results.append(("Image Processing", False, str(e)))
async def create_test_pdf(self, content: str = "Test Document") -> str:
"""Create a simple test PDF file"""
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pdf')
# Create minimal PDF
pdf_content = f"""%PDF-1.4
1 0 obj<</Type/Catalog/Pages 2 0 R>>endobj
2 0 obj<</Type/Pages/Kids[3 0 R]/Count 1>>endobj
3 0 obj<</Type/Page/Parent 2 0 R/MediaBox[0 0 612 792]/Contents 4 0 R>>endobj
4 0 obj<</Length 44>>stream
BT /F1 12 Tf 100 700 Td ({content}) Tj ET
endstream endobj
xref
0 5
0000000000 65535 f
0000000009 00000 n
0000000056 00000 n
0000000108 00000 n
0000000201 00000 n
trailer<</Size 5/Root 1 0 R>>
startxref
289
%%EOF""".encode()
temp_file.write(pdf_content)
temp_file.close()
return temp_file.name
async def create_test_image(self) -> str:
"""Create a simple test image file"""
# Create a simple PNG file (1x1 pixel, red)
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png')
# Minimal PNG data (1x1 red pixel)
png_data = bytes.fromhex(
'89504e470d0a1a0a0000000d49484452000000010000000108060000001f15c489'
'0000000d49444154789c62f8cfc00000000103010112d2dd790000000049454e44ae426082'
)
temp_file.write(png_data)
temp_file.close()
return temp_file.name
async def create_test_file(self) -> str:
"""Create a simple test text file"""
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt', mode='w')
temp_file.write("Test file content\nLine 2\nLine 3")
temp_file.close()
return temp_file.name
def show_test_summary(self):
"""Show test results summary"""
print("\n" + "=" * 60)
self.log("📋 MaxKB Adapter Test Results Summary")
print("=" * 60)
passed = 0
failed = 0
for test_name, success, message in self.results:
status = "PASS" if success else "FAIL"
icon = "" if success else ""
print(f"{icon} {test_name:20} {status:6} {message}")
if success:
passed += 1
else:
failed += 1
print("=" * 60)
print(f"Total Tests: {len(self.results)}")
print(f"Passed: {passed}")
print(f"Failed: {failed}")
if failed == 0:
self.log("🎉 All MaxKB adapter tests passed!", "SUCCESS")
else:
self.log(f"⚠️ {failed} test(s) failed. Check the logs above for details.", "WARNING")
def print_usage():
"""Print usage instructions"""
print("""
🧪 MinerU Parser Test Script
This script tests the MinerU PDF/PPT parsing system with various configurations.
USAGE:
python test_mineru.py [OPTIONS]
OPTIONS:
--gptbase Run GPTBase MinerU tests (default if available)
--maxkb Run MaxKB adapter tests
--all Run all available tests
-h, --help Show this help message
CONFIGURATION:
The script reads configuration from (in order of priority):
1. .env file in the project root (if exists)
2. System environment variables
To use a .env file:
1. Copy .env.example to .env
2. Edit .env with your configuration
3. Run the test script
Quick start:
cp .env.test .env # Use minimal test configuration
python test_mineru.py --maxkb
ENVIRONMENT VARIABLES:
🏠 For Self-Hosted MinerU:
export MINERU_API_TYPE=self_hosted
export MINERU_API_URL=http://mineru:8000
☁️ For Cloud MinerU:
export MINERU_API_TYPE=cloud
export MINERU_API_URL=https://mineru.net
export MINERU_API_KEY=your_api_key_here
🔧 Optional Configuration:
export MINERU_TEST_FILE=/path/to/test.pdf # Test file path
export MINERU_LEARN_TYPE=9 # AI model type
export MAX_CONCURRENT_API_CALLS=2 # Concurrent processing
export MINERU_VERBOSE=true # Verbose output
📚 LLM Configuration (for image processing):
export ADVANCED_PARSER_KEY_OPENAI=your_openai_key
export ADVANCED_PARSER_KEY_CLAUDE=your_claude_key
export ADVANCED_PARSER_KEY_GEMINI=your_gemini_key
🤖 MaxKB Configuration:
export MINERU_LLM_MODEL_ID=your_llm_model_id
export MINERU_VISION_MODEL_ID=your_vision_model_id
export MAXKB_API_KEY=your_maxkb_api_key
export MAXKB_API_URL=https://api.maxkb.com
EXAMPLES:
# Test with self-hosted MinerU
export MINERU_API_TYPE=self_hosted
export MINERU_API_URL=http://localhost:30001
python test_mineru.py
# Test with cloud MinerU
export MINERU_API_TYPE=cloud
export MINERU_API_KEY=your_api_key
python test_mineru.py
# Test with a specific file
export MINERU_TEST_FILE=/path/to/your/document.pdf
python test_mineru.py
# Test MaxKB adapter
python test_mineru.py --maxkb
# Run all tests
python test_mineru.py --all
TEST COVERAGE:
📦 GPTBase MinerU Tests:
✅ Environment and dependencies check
✅ Configuration validation
✅ API connectivity testing
✅ Real file processing (if file provided)
✅ Batch processing capabilities
🚀 MaxKB Adapter Tests:
✅ Adapter initialization
✅ File storage client functionality
✅ Model client integration
✅ MinerU extractor with MaxKB
✅ Document processing pipeline
✅ Image processing capabilities
""")
async def main():
"""Main test function"""
# Parse command line arguments
run_gptbase = False
run_maxkb = False
if len(sys.argv) > 1:
if sys.argv[1] in ['-h', '--help', 'help']:
print_usage()
return
elif sys.argv[1] == '--gptbase':
run_gptbase = True
elif sys.argv[1] == '--maxkb':
run_maxkb = True
elif sys.argv[1] == '--all':
run_gptbase = mineru_modules_loaded
run_maxkb = maxkb_adapter_loaded
else:
print(f"Unknown option: {sys.argv[1]}")
print("Use -h or --help for usage information")
return
else:
# Default: run MaxKB tests only as requested
run_maxkb = maxkb_adapter_loaded
# Load configuration
config = load_config_from_env()
# Run GPTBase tests if requested and available
if run_gptbase and mineru_modules_loaded:
print("\n" + "="*60)
print("Running GPTBase MinerU Tests")
print("="*60)
tester = MinerUTester(config)
await tester.run_all_tests()
elif run_gptbase and not mineru_modules_loaded:
print("❌ GPTBase modules not available, cannot run GPTBase tests")
# Run MaxKB adapter tests if requested and available
if run_maxkb and maxkb_adapter_loaded:
print("\n" + "="*60)
print("Running MaxKB Adapter Tests")
print("="*60)
tester = MaxKBAdapterTester(config)
await tester.run_all_tests()
elif run_maxkb and not maxkb_adapter_loaded:
print("❌ MaxKB adapter modules not available, cannot run MaxKB tests")
# If nothing was run
if not (run_gptbase or run_maxkb):
print(" No tests were run. Use --help for usage information.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n🛑 Test interrupted by user")
except Exception as e:
print(f"❌ Test script error: {e}")
sys.exit(1)