maxkb/apps/common/handle/impl/mineru/parallel_processor_pool.py
朱潮 51f436d7f7
Some checks are pending
sync2gitee / repo-sync (push) Waiting to run
Typos Check / Spell Check with Typos (push) Waiting to run
midyf model_id
2025-08-26 14:10:15 +08:00

117 lines
4.6 KiB
Python

"""
Global parallel processor pool for MinerU.
This module provides a singleton pool of parallel processors to avoid
creating multiple thread pools when processing multiple files.
"""
import threading
from typing import Optional
from .logger import get_module_logger
logger = get_module_logger('parallel_processor_pool')
from .parallel_processor import ParallelMinerUProcessor
from .config_base import MinerUConfig
class ParallelProcessorPool:
"""Singleton pool for managing parallel processors"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._processors = {}
self._pool_lock = threading.Lock()
self.logger = logger
def get_processor(self, learn_type: int, platform_adapter=None, config=None) -> ParallelMinerUProcessor:
"""
Get or create a parallel processor for the given learn_type.
Args:
learn_type: Model type for AI processing
platform_adapter: Platform-specific adapter for operations
config: Configuration instance to use (optional)
Returns:
ParallelMinerUProcessor instance
"""
with self._pool_lock:
# Create a cache key that includes config identifiers if available
cache_key = learn_type
if config and hasattr(config, 'llm_model_id') and hasattr(config, 'vision_model_id'):
# Include model IDs in cache key to ensure different configs get different processors
cache_key = f"{learn_type}_{config.llm_model_id}_{config.vision_model_id}"
self.logger.info(f"Cache key for processor: {cache_key}")
if cache_key not in self._processors:
self.logger.info(f"Creating new parallel processor for cache_key={cache_key}, learn_type={learn_type}")
# Use provided config or create default
if config is None:
config = MinerUConfig()
# Log the config being used
if hasattr(config, 'llm_model_id') and hasattr(config, 'vision_model_id'):
self.logger.info(f"Using config with LLM={getattr(config, 'llm_model_id', 'N/A')}, Vision={getattr(config, 'vision_model_id', 'N/A')}")
processor = ParallelMinerUProcessor(config, learn_type, platform_adapter)
self._processors[cache_key] = processor
else:
self.logger.info(f"Reusing cached processor for cache_key={cache_key}")
# Verify cached processor has expected config
cached_processor = self._processors[cache_key]
if hasattr(cached_processor, 'config') and hasattr(cached_processor.config, 'llm_model_id'):
self.logger.info(f"Cached processor config: LLM={cached_processor.config.llm_model_id}, Vision={cached_processor.config.vision_model_id}")
return self._processors[cache_key]
async def shutdown_all(self):
"""Shutdown all processors in the pool"""
self.logger.info("Shutting down all parallel processors...")
with self._pool_lock:
for cache_key, processor in self._processors.items():
try:
await processor.shutdown()
self.logger.info(f"Shutdown processor for cache_key={cache_key}")
except Exception as e:
self.logger.error(f"Error shutting down processor {cache_key}: {e}")
self._processors.clear()
self.logger.info("All processors shutdown complete")
# Global instance
_processor_pool = ParallelProcessorPool()
def get_parallel_processor(learn_type: int, platform_adapter=None, config=None) -> ParallelMinerUProcessor:
"""
Get a parallel processor from the global pool.
Args:
learn_type: Model type for AI processing
platform_adapter: Platform-specific adapter for operations
config: Configuration instance to use (optional)
Returns:
ParallelMinerUProcessor instance
"""
return _processor_pool.get_processor(learn_type, platform_adapter, config)
async def shutdown_processor_pool():
"""Shutdown the global processor pool"""
await _processor_pool.shutdown_all()