Description:
We have encountered a memory leak issue when using PubNubAsyncio in our FastAPI application. The issue seems to be related to the creation and failure to release event loops after they are no longer needed.
Steps to Reproduce:
- Create a
PubNubAsyncio instance using asyncio.new_event_loop().
- Use the
PubNubAsyncio instance to publish messages.
- Stop the
PubNubAsyncio instance using await client.stop().
- Observe that the created event loop is not released, leading to a memory leak.
Code Example:
import os
from contextlib import asynccontextmanager
from typing import AsyncGenerator, List
from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub_asyncio import PubNubAsyncio
from sqlalchemy.ext.asyncio import AsyncSession
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from pubnub.exceptions import PubNubException
from pubnub.models.consumer.v3.channel import Channel
class Pubnub:
GRANT_TTL = 1440 # minutes
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(PubNubException))
def __build_client(self, *, user_id: str) -> PubNubAsyncio:
config = PNConfiguration()
config.subscribe_key = "your_subscribe_key"
config.publish_key = "your_publish_key"
config.secret_key = "your_secret_key"
config.user_id = user_id
config.ssl = True
return PubNubAsyncio(config)
@asynccontextmanager
async def pubnub_client_context(self, user_id: str) -> AsyncGenerator[PubNubAsyncio, None]:
client = self.__build_client(user_id=user_id)
try:
yield client
finally:
await client.stop()
async def publish_message(self, session: AsyncSession, *, input_: dict):
channel = f"id.{input_['to_user_uuid']}.event"
async with self.pubnub_client_context(user_id=f"server:{os.getenv('HOSTNAME')}") as client:
token = await self.__grant_token(client=client, authorized_channels=[channel])
client.set_token(token=token)
await client.publish(channel=channel, message=input_).future()
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(PubNubException))
async def __grant_token(self, *, client: PubNubAsyncio, authorized_channels: List[str]) -> str:
channels = [Channel.id(channel).read().write() for channel in authorized_channels]
envelope = await client.grant_token(channels=channels, ttl=self.GRANT_TTL).future()
return envelope.result.token
Observed Behavior:
After stopping the PubNubAsyncio instance using await client.stop(), the event loop created by asyncio.new_event_loop() is not released, causing a memory leak. Over time, this results in increased memory consumption.
Expected Behavior:
The event loop created by asyncio.new_event_loop() should be properly released when the PubNubAsyncio instance is stopped to prevent memory leaks.
Environment:
- Python version: 3.12
- PubNub version: 9.1.0
- FastAPI version: 0.109.2
Additional Information:
We have observed this issue consistently in our production environment, leading to increased memory usage over time. Proper release of the event loop after stopping the PubNubAsyncio instance would help mitigate this issue.
Description:
We have encountered a memory leak issue when using
PubNubAsyncioin our FastAPI application. The issue seems to be related to the creation and failure to release event loops after they are no longer needed.Steps to Reproduce:
PubNubAsyncioinstance usingasyncio.new_event_loop().PubNubAsyncioinstance to publish messages.PubNubAsyncioinstance usingawait client.stop().Code Example:
Observed Behavior:
After stopping the
PubNubAsyncioinstance usingawait client.stop(), the event loop created byasyncio.new_event_loop()is not released, causing a memory leak. Over time, this results in increased memory consumption.Expected Behavior:
The event loop created by
asyncio.new_event_loop()should be properly released when thePubNubAsyncioinstance is stopped to prevent memory leaks.Environment:
Additional Information:
We have observed this issue consistently in our production environment, leading to increased memory usage over time. Proper release of the event loop after stopping the
PubNubAsyncioinstance would help mitigate this issue.