liftbridge-io / liftbridge

Lightweight, fault-tolerant message streams.
https://liftbridge.io
Apache License 2.0
2.57k stars 107 forks source link

Add higher-level stream partitioning #81

Closed tylertreat closed 5 years ago

tylertreat commented 5 years ago

Liftbridge currently has a couple mechanisms for horizontal scaling of message consumption:

  1. Creating multiple streams bound to the same NATS subject: This allows adding more brokers such that the streams are spread across them which in turn allows adding more consumers. The downside of this is that it's redundant processing of messages—consumers of each stream are all processing the same set of messages. The other issue is because each stream is independent of each other, they have separate guarantees. Separate leaders/followers, separate ISR, and separate acking means the logs for each stream are not guaranteed to be identical, even though they are bound to the same NATS subject.

  2. Streams can join a load-balance group, which load balances a NATS subject among the streams in the group. This effectively partitions a NATS subject across a set of streams. This means we can add more streams to the group, which are distributed across the cluster, and consumers can independently consume different parts of the subject. The downside here is the partitioning is random since load-balance groups rely on NATS queue groups.

I would like to add support for higher-level stream partitioning, similar to Kafka's topic partitioning. The idea being we would map Liftbridge stream partitions to NATS subjects which would individually map to Liftbridge streams. Here is how this would work:

  1. Client creates a "partitioned" stream using a new StreamOption which specifies the number of partitions to create.
client.CreateStream(ctx, "foo.bar", "my-stream", lift.Partitioned(3))

Internally, this would create three streams in Liftbridge mapped to the following NATS subjects: foo.bar.0, foo.bar.1, and foo.bar.2.

  1. Client publishes messages to the NATS subject using a new MessageOption which specifies a partition mapping policy. This may be based on the message key, round robin, or even a custom mapping policy. Alternatively, the client could explicitly specify which partition to publish to. Internally, the client will need to fetch the number of partitions for a subject from the metadata leader. If no partition mapping policy is specified, the existing publish behavior is used, i.e. publish to the subject literal foo.bar instead of foo.bar.0, foo.bar.1, or foo.bar.2.
client.Publish(ctx, "foo.bar", []byte("hello world!"),
        lift.Key([]byte("foo")),
        lift.PartitionKey(),
)
  1. Client subscribes to stream partition using a new SubscriptionOption which specifies the partition to consume from. Additionally, we will implement consumer groups similar to Kafka to consume an entire stream such that partitions are balanced among consumers.
client.Subscribe(ctx, "foo.bar", "my-stream", func(msg *proto.Message, err error) {
        // ...
}, lift.Partition(1))
tylertreat commented 5 years ago

Addressed by #93.