slact / nchan

Fast, horizontally scalable, multiprocess pub/sub queuing server and proxy for HTTP, long-polling, Websockets and EventSource (SSE), powered by Nginx.
https://nchan.io/
Other
3.01k stars 292 forks source link

notify subscribers of expiry events #303

Closed seddonm1 closed 7 years ago

seddonm1 commented 7 years ago

I have been looking to implement a custom expiry function so i can notify subscribers of a message expiry.

I cannot figure out an easy way to do this so was planning to:

  1. periodic loop through the channel messages from oldest to newest and check if ttl < threshold
  2. if ttl < threshold then deserialise contents (costly) and apply custom expiry rules. if custom logic expiry valid then publish new message to queue for client notification.

is there a better expiry subscriber notification pattern you can think of? like somehow using the redis keyspace events?

slact commented 7 years ago

What's your use case here? Do you need exact timings, or is lazy and on-demand expiry evaluation enough? Nchan uses the latter method, and it would be quite easy to add a message expiry to a channel-events channel. If you want precisely timed message expiry events, that would require adding some more indexing of messages by sticking them into an expiry rbtree or timing wheel.

seddonm1 commented 7 years ago

Hi slact, Sorry for the delayed reply.

My use case is that I have subscribers which run a local database (offline first mobile) and I want to notify them of expiry using the nchan internal mechanism.

I have used an interval job (ngx.timer.at) in OpenResty to monitor the :messages nchan lists and push a new message to the nchan publisher if expired. It has worked for a few days now so I think you can close this issue.

slact commented 7 years ago

Ok, I see. That's workable, but the list is only updated when the channel is accessed by Nchan. To be precise, you should see if the corresponding message key in Redis exists. You can check the lua scripts used in Nchan for the key format. (It's <namespace:>{channel:<channel_id>}:msg:<msgid>.)

seddonm1 commented 7 years ago

Thanks.

My process is:

  1. Scan redis keys for the nchan "*}" pattern
  2. Get 100 last elements from above match with ":messages"
  3. Loop through oldest first and exec redis TTL.
  4. Ignore TTL = -2 (I.e. Message removed but nchan unaware until update)
  5. If less than threshold (mine is 120 sec) get message and broadcast with deleted flag.
  6. If greater than threshold exit loop. Because list is ordered we don't need to check past first element with TTL > threshold.