questdb / py-questdb-client

Python client for QuestDB InfluxDB Line Protocol
https://py-questdb-client.readthedocs.io
Apache License 2.0
50 stars 7 forks source link

Should flush on `SystemExit` exception #18

Closed sklarsa closed 1 year ago

sklarsa commented 1 year ago

See the following lines:

def __exit__(self, exc_type, _exc_val, _exc_tb):
        """
        Flush pending and disconnect at the end of a ``with`` block.
        If the ``with`` block raises an exception, any pending data will
        *NOT* be flushed.
        This is implemented by calling :func:`Sender.close`.
        """
        self.close(not exc_type)
amunra commented 1 year ago

Abandoning change: Will not implement.

The idea of this change was to flush one last time at application exit when the process is killed via SIGTERM. My (invalid) recollection is that running threads are halted during shutdown via the SystemExit exception. This is not actually the case.

Whist we could implement a signal handler that works in many cases, it's difficult to do this in a way that will work reliably across threads and without disrupting logic of our existing library users.

If you're a user and you're reading this and you want to flush one last time before application shutdown, we recommend you implement this yourself: import signal and then register a signal handler for SIGTERM and implement custom shutdown logic for the whole application. You might need to set a flag in this handler or you might need to raise an exception or you might need to join some threads.

The important thing to understand from the perspective of shutdown are the semantics of the questdb.ingress library's sender with block:

with Sender(...) as sender:
    logic()

will call sender.flush() after logic() when the with block is exited without exceptions. In case of exceptions, flush() is not called.

If your logic is an application send loop, e.g.:

with Sender(...) as sender:
    while True:
        logic()

You might want to use a control variable for your loop:

with Sender(...) as sender:
    while self._running:
        logic()
    else:
        sender.flush()   # send pending bufferred messages before exiting.

And then set that self._running control variable flag to False in a signal handler or by other means for a clean shutdown to ensure pending messages are sent.