thijsmie / pocketbase

An async Python 3.11+ PocketBase SDK
MIT License
40 stars 6 forks source link

Bug: Realtime connection timeout after inactivity in PocketBase #7

Closed matthieuEv closed 8 months ago

matthieuEv commented 8 months ago

Description

When I use the subscribe method of the package and run it in an infinite loop, if no modifications are made to the specified Collection/record_id within 10 minutes, I encounter the following error: httpx.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read).

Steps to reproduce

Using this test code:

from pocketbase import PocketBase
from pocketbase.services.realtime import RealtimeEvent
from datetime import datetime
import asyncio

CONNECTION_URL = "CONNECTION_URL"
ADMIN_EMAIL = "ADMIN_EMAIL"
ADMIN_PASSWORD = "ADMIN_PASSWORD"
COLLECTION_NAME = "COLLECTION_NAME"

async def callback(event: RealtimeEvent) -> None:
    """Callback function for handling Realtime events.

    Args:
        event (RealtimeEvent): The event object containing information about the record change.
    """

    at = datetime.now().isoformat()
    print(f"[{at}] {event['action'].upper()}: {event['record']}")

async def realtime_updates():
    """Establishes a PocketBase connection, authenticates, and subscribes to Realtime events."""

    try:
        # Instantiate the PocketBase connector
        pb = PocketBase(CONNECTION_URL)

        # Authenticate as an admin
        await pb.admins.auth.with_password(ADMIN_EMAIL, ADMIN_PASSWORD)

        # Get the collection object
        col = pb.collection(COLLECTION_NAME)

        # Subscribe to Realtime events for the specific record ID
        unsub = await col.subscribe(
            callback=callback,
            record_id="*"
            )

        # Infinite loop to wait for events (adjusted from the second snippet)
        while True:
            await asyncio.sleep(10)  # Sleep for an hour to avoid hitting PocketBase's rate limits

    except Exception as e:
        print(f"Error: {e}")

    finally:
        # Unsubscribe if still active
        if unsub:
            try:
                await unsub()
            except Exception as e:
                print(f"Error unsubscribing: {e}")

if __name__ == "__main__":
    asyncio.run(realtime_updates())

I run the code and i wait for 5-10 mins.

Expected behavior

It should not Close the connection

[Optional] Stacktrace

Task exception was never retrieved
future: <Task finished name='Task-3' coro=<RealtimeService._make_connection() done, defined at /mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/pocketbase/services/realtime.py:48> exception=RemoteProtocolError('peer closed connection without sending complete message body (incomplete chunked read)')>
Traceback (most recent call last):
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
    yield
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 254, in __aiter__
    async for part in self._httpcore_stream:
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 367, in __aiter__
    raise exc from None
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 363, in __aiter__
    async for part in self._stream:
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/http11.py", line 349, in __aiter__
    raise exc
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/http11.py", line 341, in __aiter__
    async for chunk in self._connection._receive_response_body(**kwargs):
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/http11.py", line 210, in _receive_response_body
    event = await self._receive_event(timeout=timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/http11.py", line 220, in _receive_event
    with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
  File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc) from exc
httpcore.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_client.py", line 1624, in stream
    yield response
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx_sse/_api.py", line 70, in aconnect_sse
    yield EventSource(response)
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/pocketbase/services/realtime.py", line 51, in _make_connection
    async for message in sse.aiter_sse():
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx_sse/_api.py", line 39, in aiter_sse
    async for line in self._response.aiter_lines():
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_models.py", line 963, in aiter_lines
    async for text in self.aiter_text():
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_models.py", line 950, in aiter_text
    async for byte_content in self.aiter_bytes():
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_models.py", line 929, in aiter_bytes
    async for raw_bytes in self.aiter_raw():
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_models.py", line 987, in aiter_raw
    async for raw_stream_bytes in self.stream:
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_client.py", line 149, in __aiter__
    async for chunk in self._stream:
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 253, in __aiter__
    with map_httpcore_exceptions():
  File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)

Environment

thijsmie commented 8 months ago

Right, missed that part in the spec, I'll have to add a reconnect mechanism.

matthieuEv commented 8 months ago

I think i've seen the same error here with a possible solution

https://github.com/vaphes/pocketbase/issues/78

thijsmie commented 8 months ago

Hmm I've been thinkering with this but it is really hard to guarantee "exactly-once" delivery without pocketbase providing any message ids or allowing a new connection to reuse an old client id... I might have to give up on that and do it in a "best effort" kind of way.

matthieuEv commented 8 months ago

Yeah i know. I looked at the js-sdk to see how they make the subscribe function but the way the python code works is not the same:

For the python code, I can see the error appening in:

#realtime.py Line 54
            async with aconnect_sse(self._in.client, "GET", self.__base_sub_path__, timeout=900) as sse:
                async for message in sse.aiter_sse():
                    ...