A new API to make it easier to work with the sender asynchronously with true parallelism.
from questdb.ingress.pool import SenderPool
with SenderPool('http::addr=localhost:9000;') as pool:
# Buffers can be safely constructed independently,
# also on separate threads.
buf1 = pool.transaction('tbl1')
buf1.row(...)
buf2 = pool.transaction('tbl2')
buf2.dataframe(...)
# parallelism
fut1 = buf1.commit()
fut2 = buf2.commit()
await fut1
await fut2
Details
The buffer can only accumulate rows for a given table.
Each flush represents an atomic database transaction.
Flush operations can happen in parallel (network operations release the GIL).
The ownership of the buffer is "moved" to the pool.
By introducing parallelism alleviates the performance penalties of using ILP/HTTP: Network roundtrip times.
API downsides
This is a new "parallel" API for more advanced use cases. Creates an API split:
Server-style applications written in Python would use this new API.
Simpler "jupyter notebook" style stuff would continue using the existing API.
Both APIs would continue being supported (since this new one is just a wrapper around the other anyway).
Shoe-horning these features into the regular API would be a struggle.
This API drops auto-flushing completely, since auto-flushing
creates silent network-blocking operations in the API.
Thread safety and Parallelism
Once a pool object is created, it can be shared between threads.
The pool.next_buffer() and pool.flush() methods are thread safe.
This allows for N:M concurrency
N buffer writer threads
M threads responsible for concurrently writing to the database (inside the pool).
Tasks
[ ] Review the API. Is this even a good idea?
[ ] Split out SenderPool into new questdb.ingress.pool module.
[ ] Improve test coverage, including TransactionalBuffer.dataframe(..).
[ ] Triple-check thread safety of pool.next_buffer() and pool.flush implementations.
Overview
A new API to make it easier to work with the sender asynchronously with true parallelism.
Details
flush
represents an atomic database transaction.API downsides
Thread safety and Parallelism
pool.next_buffer()
andpool.flush()
methods are thread safe.N:M
concurrencyN
buffer writer threadsM
threads responsible for concurrently writing to the database (inside the pool).Tasks
SenderPool
into newquestdb.ingress.pool
module.TransactionalBuffer.dataframe(..)
.pool.next_buffer()
andpool.flush
implementations.asyncio.wrap_future
indef flush()
directly (since that's how it's implemented anyway): https://github.com/python/cpython/blob/8ad88984200b2ccddc0a08229dd2f4c14d1a71fc/Lib/asyncio/base_events.py#L896 - this allows implementing.flush()
in terms of.flush_to_future()
and cut code duplication.Closes https://github.com/questdb/py-questdb-client/issues/64