Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

There may be some issues with the connection pool in the document #646

Open
Smawexi opened this issue Oct 27, 2024 · 0 comments
Open

There may be some issues with the connection pool in the document #646

Smawexi opened this issue Oct 27, 2024 · 0 comments

Comments

@Smawexi
Copy link

Smawexi commented Oct 27, 2024

After specifying the size, when the connection/channel is closed, the count created will not decrease, and unavailable connections/channels will not be removed/released from the queue. When running for a long time, such as when max_2 is specified, the obtained connections/channels may be unavailable.

from aio_pika.pool import Pool

async def get_connection() -> AbstractRobustConnection:
        return await aio_pika.connect_robust("amqp://guest:guest@localhost/")

    connection_pool: Pool = Pool(get_connection, max_size=2)

    async def get_channel() -> aio_pika.Channel:
        async with connection_pool.acquire() as connection:
            return await connection.channel()

I tried to solve this problem in my own code, but after specifying the size, it blocked when getting the channel. I don't know how to solve it.
The following is part of my code:

self.connection_pool: Pool = Pool(self._get_connection, max_size=self.pool_size, loop=self.loop)
self.channel_pool: Pool = Pool(self._get_channel, max_size=self.channel_size)
self._channel_pool_item_ctx_manager = self.channel_pool.acquire()
self._pool_item_ctx_manager = self.connection_pool.acquire()

    async def _get_connection(self) -> aio_pika.RobustConnection:
        while True:
            try:
                return await aio_pika.connect_robust(self.url)
            except aiormq.exceptions.AMQPConnectionError:
                self.logger.info('Attempting to reconnect..., start sleep 2s.')
                await asyncio.sleep(2)

    async def _get_channel(self) -> aio_pika.Channel:
        while True:
            async with self._pool_item_ctx_manager as connection:
                try:
                    return await connection.channel()
                except (RuntimeError, aiormq.exceptions.AMQPError) as e:
                    self.logger.error('Attempting to reacquire channel..., start sleep 2s.')
                    await connection.close()
                    self._pool_item_ctx_manager.pool._Pool__item_set.discard(connection)
                    self._pool_item_ctx_manager.pool._Pool__created -= 1
                    self._pool_item_ctx_manager.item = None
                    await asyncio.sleep(2)

    async def acquire_channel(self) -> aio_pika.RobustChannel:
        while True:
            channel: aio_pika.RobustChannel = await self._channel_pool_item_ctx_manager.pool._get()
            if not channel.is_closed:
                return channel
            else:
                self._channel_pool_item_ctx_manager.pool._Pool__item_set.discard(channel)
                self._channel_pool_item_ctx_manager.pool._Pool__created -= 1

    async def release_channel(self, channel: aio_pika.Channel):
        if not channel.is_closed:
            self._channel_pool_item_ctx_manager.pool.put(channel)
        else:
            self.logger.debug('-------------channel closed-----------------')
            self._channel_pool_item_ctx_manager.pool._Pool__item_set.discard(channel)
            self._channel_pool_item_ctx_manager.pool._Pool__created -= 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant