python-trio / triopg

PostgreSQL client for Trio based on asyncpg
Other
45 stars 8 forks source link

Trio-style .listen() helper #13

Closed shamrin closed 3 years ago

shamrin commented 3 years ago

This is an attempt to implement a friendly Trio-style conn.listen() helper. It's surprisingly complex due to the fact that asyncpg does not seem to support applying backpressure when we can't keep up with notifications. Without backpressure we only have two options:

  1. Unbounded buffer, which Trio manual warns against.
  2. Limited buffer, with a way to signal that we are not keeping up.

This PR implements both options: .listen(..., max_buffer_size=inf) vs .listen(..., max_buffer_size=1). However, handling "not-keeping-up" error proved to be complicated, because asyncpg uses callback for its LISTEN implementation.

Update: I've simplified the solution using NOTIFY_OVERFLOW marker, suggested by @njsmith .

codecov[bot] commented 3 years ago

Codecov Report

Merging #13 (231dc3b) into master (cca3a4c) will increase coverage by 0.39%. The diff coverage is 100.00%.

@@            Coverage Diff             @@
##           master      #13      +/-   ##
==========================================
+ Coverage   98.20%   98.60%   +0.39%     
==========================================
  Files           6        6              
  Lines         279      358      +79     
  Branches       13       14       +1     
==========================================
+ Hits          274      353      +79     
  Misses          2        2              
  Partials        3        3              
Impacted Files Coverage Δ
triopg/__init__.py 100.00% <100.00%> (ø)
triopg/_tests/test_triopg.py 100.00% <100.00%> (ø)
triopg/_triopg.py 96.73% <100.00%> (+0.46%) :arrow_up:
shamrin commented 3 years ago

I've marked the PR as draft again. I would like to give it more time. The current implementation is complex and does not even solve the backpressure problem. Yes, open_memory_channel(inf) is a lot simpler, but Trio manual does warn against using unbounded buffers.

njsmith commented 3 years ago

Backpressure is good, but (a) if asyncpg simply doesn't support it, then I don't feel like asyncpg wrappers should worry about supporting it until that gets fixed, (b) there's an inherent challenge here b/c postgres (like most broadcast systems) doesn't really have a good way to communicate backpressure further upstream to the actual source. So like, a client can refuse to read from the postgres socket and push back to the postgres server, but then the postgres server doesn't have a way to push back on the clients that are calling NOTIFY. (From a quick search, it looks like the way they handle this is that if there are >8 GiB of notifications outstanding, then NOTIFY starts failing? not super helpful really.)

So hmm. I guess maybe I've argued myself into thinking that the ideal way of handling this in a postgres client library is

The advantages are:

On Mon, Jan 11, 2021 at 1:07 AM Alexey Shamrin notifications@github.com wrote:

I've marked the PR as draft again. I would like to give it more time. The current implementation is complex and does not even solve the backpressure problem. Yes, open_memory_channel(inf) is a lot simpler, but Trio manual does warn against using unbounded buffers.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/python-trio/triopg/pull/13#issuecomment-757763529, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAEU42GBJVKJKKY3PXTJ32TSZK5VPANCNFSM4V4BPMUA .

-- Nathaniel J. Smith -- https://vorpus.org http://vorpus.org

shamrin commented 3 years ago

@njsmith Thank you, your summary makes total sense. I especially like your idea about "notify overflow" record.

How could a client implement real-time pattern? Say the client is only interested in recent notifications. And it's better to drop old records, rather than new records.

shamrin commented 3 years ago

@njsmith And most importantly, how could I insert "notify overflow" record when the channel is already full? :)

shamrin commented 3 years ago

@njsmith Hmm, I think I could do something like this:

stats = send_channel.statistics()
if len(stats.current_buffer_used) >= stats.max_buffer_size - 1:
    send_channel.send_nowait(Overflow())
njsmith commented 3 years ago

yeah, that's probably the simplest option. (Note that current_buffer_used is an int, not a sequence)

On Mon, Jan 11, 2021 at 2:42 AM Alexey Shamrin notifications@github.com wrote:

@njsmith https://github.com/njsmith How could I insert "notify overflow" record when the channel is already full? :)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/python-trio/triopg/pull/13#issuecomment-757866576, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAEU42ALHOFP7L3XTGSZRLLSZLIYVANCNFSM4V4BPMUA .

-- Nathaniel J. Smith -- https://vorpus.org http://vorpus.org

shamrin commented 3 years ago

@njsmith I've implemented your suggestion. It's very simple indeed. I've also took the liberty of reusing few sentences from your explanation for the README. I hope you don't mind. Please let me know if I should have asked you first.

(It would be nice if there was a way for NOTIFY_OVERFLOW to jump the memory channel queue are appear ahead of ordinary values. This will let clients know about overflows as soon as possible. It could be a trio.PriorityQueue or a mechanism to wait for two queues at the same time. I did emulate it at first, by using three memory channels: listen callback channel, overflow error channel, and output channel. And a task to join the first two and produce the last one. But it was too complicated, so I removed it.)