liftbridge-io / liftbridge

Lightweight, fault-tolerant message streams.
https://liftbridge.io
Apache License 2.0
2.57k stars 107 forks source link

Single-stream fanout enhancements #103

Closed tylertreat closed 4 years ago

tylertreat commented 4 years ago

In addition to stream partitioning and load-balance groups, there are a couple other enhancements that could help with fanning out a stream to a large number of consumers.

Opt-in ISR replica reads

Allow consumers to read from ISR followers rather than having to read from the leader. This behavior should be opt-in since there are consistency implications, but in cases where this isn't an issue it would allow distributing load amongst ISR replicas.

Read-replica support

Allow Liftbridge servers to act as read replicas for stream partitions without participating in the ISR. This would allow fanout without impacting the ISR. This should also be opt-in behavior.

LaPetiteSouris commented 4 years ago

Hi, I have some thoughts to share:

LaPetiteSouris commented 4 years ago

Also, for read-only replica server, if I understand correctly, we may want to start a liftbridge server with opt-in to act as follower ONLY and do not have ability to elect itself as partition leader ? And this one should also not joining any ISR operations ?

tylertreat commented 4 years ago

For opt-in ISR replica reads, isn't it already the case ? Currently the client can subscribe directly to any server , which may be a replica as well, and still able to read streams.

No, this is not the case. The client must subscribe to the partition leader to read the partition. Client libraries, such as go-liftbridge, should handle resolving and connecting to the leader behind the scenes.

Also, for read-only replica server, if I understand correctly, we may want to start a liftbridge server with opt-in to act as follower ONLY and do not have ability to elect itself as partition leader ? And this one should also not joining any ISR operations ?

That is roughly correct. However, I'm not sure if this should be a "global" setting on the server or if it should be specific to a stream (or both). I.e. it's possible we'd want to have a server that acts as a full participant of stream A but only as a read replica of stream B. In this case, we wouldn't be starting the server as a read replica but rather specifying that our stream needs X number of read replicas and the cluster handles assignments. Make sense?

LaPetiteSouris commented 4 years ago

No, this is not the case. The client must subscribe to the partition leader to read the partition. Client libraries, such as go-liftbridge, should handle resolving and connecting to the leader behind the scenes.

Indeed. It is the go-liftbridge client that is handling the process of getting address of partition leader behind the scene That is roughly correct. However, I'm not sure if this should be a "global" setting on the server or if it should be specific to a stream (or both). I.e. it's possible we'd want to have a server that acts as a full participant of stream A but only as a read replica of stream B. In this case, we wouldn't be starting the server as a read replica but rather specifying that our stream needs X number of read replicas and the cluster handles assignments. Make sense?

option 2 looks much better. It gives a lot of flexibility. It may actually avoid the bottleneck if there is a dedicated "global" replica.

tylertreat commented 4 years ago

FYI here is a good reference for ISR replica reads: https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica

Liftbridge's replication protocol will prevent inconsistent reads from a follower. The issue is more around latency and spurious out of range errors.

A great future enhancement would be allowing to read from the replica closest to the client.

LaPetiteSouris commented 4 years ago

FYI here is a good reference for ISR replica reads: https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica

Liftbridge's replication protocol will prevent inconsistent reads from a follower. The issue is more around latency and spurious out of range errors.

A great future enhancement would be allowing to read from the replica closest to the client.

Thanks for sharing. The article looks really useful.

In case of an ISR replica, I see that a regular tick is sent out periodically and if a replica fails it will be removed from the ISR set in the partition. But it seems that these replicas , which is removed from ISR set, are still kept in the partitions`s replica set. Is that correct?

For future enhancement, as currently the replicas for a partition are chosen pretty much at random level. Would it be nice to have a general kind of Load Balancing to replace this random mechanism ?

tylertreat commented 4 years ago

In case of an ISR replica, I see that a regular tick is sent out periodically and if a replica fails it will be removed from the ISR set in the partition. But it seems that these replicas , which is removed from ISR set, are still kept in the partitions`s replica set. Is that correct?

That's correct. The replica set is simply the set of replicas participating in replication for the partition. The ISR or in-sync replica set is a subset of the replica set which is the set of replicas up-to-date with the leader.

For future enhancement, as currently the replicas for a partition are chosen pretty much at random level. Would it be nice to have a general kind of Load Balancing to replace this random mechanism ?

What I'm wondering is how to keep the replica selection extensible? Kafka does the selection server-side using a ReplicaSelector plugin which is a Java class loaded at runtime into the server.

The problem for a consumer is figuring out which replica is preferred. The two options are to either let the consumer find the preferred replica itself using metadata from the brokers (i.e. rackId, host information), or to let the broker decide the preferred replica based on information from the client. We propose here to do the latter.

The benefit of letting the broker decide which replica is preferred for a client is that it can take load into account. For example, the broker can round robin between nearby replicas in order to spread the load. There are many such considerations that are possible, so we we propose to allow users to provide a ReplicaSelector plugin to the broker in order to handle the logic. This will be exposed to the broker with the replica.selector.class configuration. In order to make use of this plugin, we will extend the Fetch API so that clients can provide their own location information. In the response, the broker will indicate a preferred replica to fetch from.

KIP-392: Allow consumers to fetch from closest replica

I would like to stay away from that approach for now, so maybe we just defer that decision?

LaPetiteSouris commented 4 years ago

I would like to stay away from that approach for now, so maybe we just defer that decision?

The other approach would be to let the client to decide. However it would be too much to achieve that at once. I have not proposals at the moment, so yes, just close eyes and look away for now :)

tylertreat commented 4 years ago

Closing this since #181 was merged. We can add ISR read-replica support in the future if needed.