real-logic / aeron

Efficient reliable UDP unicast, UDP multicast, and IPC message transport
Apache License 2.0
7.37k stars 888 forks source link

Wondering if this use case is currently optimally supported in Aeron #43

Closed RajivKurian closed 9 years ago

RajivKurian commented 9 years ago

I have the following use case: A fairly small number of sender client processes, say N processes each on their own host. A similar or slightly larger number of receivers processes, say M processes, again each on their own host. I have around 1024 topics (numbered 0 - 1024) and some external arbiter assigns these topics to the M receiver ( M <<< 1024) processes in some out of band way. This arbiter setup is part of the control plane and these assignments change infrequently. I have logic already to detect these changes and act on them. All the data plane processing (I hope to use Aeron for this) is idempotent too. My message velocity through the system is moderately high (in multi-millions of messages / second split between the N sender processes).

My current plan is to still have the 1024 topics in my application process, but create a stream per receiver process ( i.e M streams). I then plan to use the arbiter assigned (topic -> receiver host) mapping to assign each message to one of the the M streams. As the arbiter changes (topic -> receiver host) mappings, my receivers can update their subscriptions with each of the N sender processes. Each sender process thus has a stream per receiver process and each receiver is subscribed to it's corresponding stream on every sender process. Does that seem like a reasonable model? Any suggestions?

I was also wondering what I could do when the number of receiving processes M does get to a much higher number say a 100? Multiplexing these 100 receivers to a lower number of topics would be wasteful since it implies that each of these 100 receivers would then receive a lot of data that they do not require. Given my cluster wide multi-million messages / second scenario this could be a lot of waste.

mjpt777 commented 9 years ago

Does your application receive all 1024 topics on each host? I think you are saying this is not the case but "My current plan is to still have the 1024 topics in my application process" suggests otherwise.

RajivSignal commented 9 years ago

Sorry - for the confusing wording. Not every subscriber host will receive all 1024 topics. In fact the subscribers will divide the 1024 topics amongst themselves and consume them based on an arbiter assigned subscriber -> List<Topic> mapping. But every producer will produce to 1024 logical topics. Every message that the producer outputs is basically hashed onto one of these 1024 topics, and put on the topic. Each consumer consumes from a disjoint set of these topics based on the arbiter assigned subscriber -> List<Topic> mapping. This ensures (more or less) that messages with the same id are always processed by the same subscriber process. This is required because the subscribers hold in memory state and processing of messages requires prior state and mutates this state during processing. So in a typical brokered system this would look like the following: 1) Many producers put messages on the same logical topic. So every producer writes to every topic. 2) Consumers consume from a disjoint set of topics and hence read messages from each producer that published on that topic. No two consumers however consume the same topic.

tmontgomery commented 9 years ago

For situations like this where it is not possible to know apriori the optimal mapping, it is fairly useful to do simple striping across whatever boundaries the messaging system provides. In Aeron, that is channel (assuming multicast), then streamId. My recommendation is to take something similar to what you have (hash(topic) -> channel + streamId) controlling the hash function to evenly spread out the topic onto a range of channel and streamIds that is a property of the hash. As long as the hash is stable, you should be fine. For channel, consider the hash to generate multicast group address ranges or (for unicast) port ranges.

RajivKurian commented 9 years ago

Thanks Todd. That makes sense.