import asyncio from typing import List, Optional import logging logger = logging.getLogger('app') class AgentPool: """Assistant instance pool manager.""" def __init__(self, pool_size: int = 5): """ Initialize the assistant instance pool. Args: pool_size: Number of instances in the pool, defaults to 5 """ self.pool_size = pool_size self.pool: asyncio.Queue = asyncio.Queue(maxsize=pool_size) self.semaphore = asyncio.Semaphore(pool_size) self.agents = [] # Keep references to all created instances async def initialize(self, agent_factory): """ Initialize the instance pool using the assistant factory function. Args: agent_factory: Factory function for creating assistant instances """ logger.info(f"Initializing assistant instance pool, size: {self.pool_size}") for i in range(self.pool_size): try: agent = agent_factory() await self.pool.put(agent) self.agents.append(agent) logger.info(f"Assistant instance {i+1}/{self.pool_size} created successfully") except Exception as e: logger.error(f"Failed to create assistant instance {i+1}: {e}") raise logger.info("Assistant instance pool initialization completed") async def get_agent(self, timeout: Optional[float] = 30.0): """ Get an available assistant instance. Args: timeout: Timeout in seconds, defaults to 30 Returns: Assistant instance Raises: asyncio.TimeoutError: Raised when acquisition times out """ try: # Use the semaphore to control concurrency await asyncio.wait_for(self.semaphore.acquire(), timeout=timeout) # Get an instance from the pool agent = await asyncio.wait_for(self.pool.get(), timeout=timeout) logger.debug(f"Successfully acquired assistant instance, remaining pool size: {self.pool.qsize()}") return agent except asyncio.TimeoutError: logger.error(f"Timed out while acquiring assistant instance ({timeout}s)") raise async def release_agent(self, agent): """ Return an assistant instance to the pool. Args: agent: Assistant instance to release """ try: await self.pool.put(agent) self.semaphore.release() logger.debug(f"Released assistant instance, current pool size: {self.pool.qsize()}") except Exception as e: logger.error(f"Failed to release assistant instance: {e}") # Release the semaphore even if putting back fails self.semaphore.release() def get_pool_stats(self) -> dict: """ Get pool status statistics. Returns: Dictionary containing pool status information """ return { "pool_size": self.pool_size, "available_agents": self.pool.qsize(), "total_agents": len(self.agents), "in_use_agents": len(self.agents) - self.pool.qsize() } async def shutdown(self): """Shut down the instance pool and clean up resources.""" logger.info("Shutting down assistant instance pool...") # Clear the queue while not self.pool.empty(): try: agent = self.pool.get_nowait() # Call cleanup if the instance provides it if hasattr(agent, 'cleanup'): await agent.cleanup() except asyncio.QueueEmpty: break logger.info("Assistant instance pool has been shut down") # Global instance pool singleton _global_agent_pool: Optional[AgentPool] = None def get_agent_pool() -> Optional[AgentPool]: """Get the global assistant instance pool.""" return _global_agent_pool def set_agent_pool(pool: AgentPool): """Set the global assistant instance pool.""" global _global_agent_pool _global_agent_pool = pool async def init_global_agent_pool(pool_size: int = 5, agent_factory=None): """ Initialize the global assistant instance pool. Args: pool_size: Pool size agent_factory: Instance factory function """ global _global_agent_pool if _global_agent_pool is not None: logger.warning("Global assistant instance pool already exists, skipping initialization") return if agent_factory is None: raise ValueError("The agent_factory argument is required") _global_agent_pool = AgentPool(pool_size=pool_size) await _global_agent_pool.initialize(agent_factory) logger.info("Global assistant instance pool initialization completed") async def get_agent_from_pool(timeout: Optional[float] = 30.0): """ Get an assistant instance from the global pool. Args: timeout: Acquisition timeout in seconds Returns: Assistant instance """ if _global_agent_pool is None: raise RuntimeError("Global assistant instance pool has not been initialized") return await _global_agent_pool.get_agent(timeout) async def release_agent_to_pool(agent): """ Release an assistant instance back to the global pool. Args: agent: Assistant instance to release """ if _global_agent_pool is None: raise RuntimeError("Global assistant instance pool has not been initialized") await _global_agent_pool.release_agent(agent)