Open balena opened 4 years ago
Yes, the messages in the recv_queue should be consumed manually. If we don't do that the message will accumulate. Then it will be killed by the Erlang runtime. Could you give me some advice? I'm working on a forked repo of chumak which is forced on more stable and more compatible with Erlang' design principle.
Feel free to hack on this repo too. I'll gladly add you as a maintainer.
Have this problem solved?
No. Eventually I've started using the inner parts of this code so that the process management is totally handled by an external module.
Chumak is being used just as a ZeroMQ serializer ATM.
Are you saying the high-water mark is not implemented for PUBSUB?
As fair as I remember, no.
At the same time, there's a long while I don't work with it, so there is a chance I'm wrong.
But if I'm right, and you decide to implement it, keep in mind these losses would have proper metrics in place, so the user knows when the app is losing events.
The code I've mentioned above wraps the inner functions in a particular GenServer that blocks the publisher. So the events are lost at the publisher side, that didn't have proper metrics, so we never knew if we were losing events neither how much we were losing.
In terms of modeling, I would say that adopting a behavior is better instead of having a separated process to receive events. Then you leave the option to enqueue (up to a limit) or blocking the publisher to the user, instead of choosing one of them in advance. There are use cases for both.
The zeromq pubsub
protocol protocol says there should a queue per peer and the dropping of message happens for the peer which reaches High water mark (hwm
), not for others. But the way it has been implemented now (queue in the socket, each peer's message send to the socket and there is no way of setting hwm
for a peer.
Here is the chumak’s process tree:
chumak_peer
reads frames fromgen_tcp
in active mode (more specifically{active, once}
, eventually enqueueing messages inrecv_queue
), expecting that{your process}
callschumak:recv
fast enough to consume all of that.If your consumer isn’t capable of handling all messages as they arrive at
chumak_socket
; they will accumulate indefinitely!