liftbridge-io / liftbridge-api

Protobuf definitions for the Liftbridge gRPC API. https://github.com/liftbridge-io/liftbridge
Apache License 2.0
15 stars 13 forks source link

Add support for Optimistic Concurrency Control #46

Closed LaPetiteSouris closed 3 years ago

LaPetiteSouris commented 3 years ago

In order to support https://github.com/liftbridge-io/liftbridge/issues/54, my idea is to simply perform the LastKnownOffset check on the partition's leader. If the published message includes LastKnowOffset, it should be equal to the NewestOffset known on the partition leader.

In case of rejection, I intend to let the client known about the error via the existing AckInbox.

Optimis

I am thinking about a few options:

  1. Somehow make the Partition Leader's Newest Offset known to all follower via Raft . In that case even before publishing the message to NATS topic, even the partition's follower can perform the check on-site and return an explicit AsyncError to the client stream. However this seems rather complex and may involves lots of changes. It's also add extra latency to the message publish mechanism.

  2. A dedicated ErrorInbox separated from AckInbox, but that also requires changes for most of the client, since the client will have to not only expect error on the publish async stream, expect Ack message but also an extra error channel.

Thus I may propose to send the error directly to AckInbox, the client may have to check for error in the AckMessage explicitly.

Any thoughts on that ?

tylertreat commented 3 years ago

A few thoughts...

Instead of LastKnownOffset, I think we should use ExpectedOffset. The reason is if you have multiple publishers, they will race on LastKnownOffset, whereas ExpectedOffset would be synchronized at the commit log level. If the ExpectedOffset sent on the publish differs from the assigned offset, reject the publish.

Regarding ack errors, I think we probably should model it after how async errors are sent on PublishResponse. The trick here, from an API perspective, is how to reconcile an error sent on Ack with the PublishAsyncError sent on PublishResponse. Since PublishResponse also includes the Ack, this would mean the client would need to check for errors in two locations, which isn't ideal.

LaPetiteSouris commented 3 years ago

A few thoughts...

Instead of LastKnownOffset, I think we should use ExpectedOffset. The reason is if you have multiple publishers, they will race on LastKnownOffset, whereas ExpectedOffset would be synchronized at the commit log level. If the ExpectedOffset sent on the publish differs from the assigned offset, reject the publish.

It means that the publisher should get the NewestOffset from FetchPartitionMetadata and calculate the ExpectedOffset before publishing ?

Regarding ack errors, I think we probably should model it after how async errors are sent on PublishResponse. The trick here, from an API perspective, is how to reconcile an error sent on Ack with the PublishAsyncError sent on PublishResponse. Since PublishResponse also includes the Ack, this would mean the client would need to check for errors in two locations, which isn't ideal.

For this part I am still thinking about it. Reason is that PublishAsyncError is done right on the server that receives the publish request. Normally it's the leader, but it can be other ones as well. In case the receiving end is not leader, it has to, somehow get known the NewestOffset of the leader before hand.

As you mentioned above, the check shall be done at commit level ( or log append level ?), and this happens only on partition's leader. This makes it a bit tricky.

Or should we have a separate PublishAsyncWithAck method, which does not only publish async but also explicitly wait for AckErr and does a PublishAsyncError in case AckError is detected ?

tylertreat commented 3 years ago

It means that the publisher should get the NewestOffset from FetchPartitionMetadata and calculate the ExpectedOffset before publishing ?

Right.

For this part I am still thinking about it. Reason is that PublishAsyncError is done right on the server that receives the publish request. Normally it's the leader, but it can be other ones as well. In case the receiving end is not leader, it has to, somehow get known the NewestOffset of the leader before hand.

Yes, one way of thinking about it is the PublishAsyncError behaves as a "failed precondition" check, i.e. error out before the message is actually published, while the error on an Ack would be more of an "after the fact" error, e.g. "expected offset does not match."

I'm leaning towards just putting the error on the Ack.

LaPetiteSouris commented 3 years ago

For the ExpectedOffset, should it be at log append level instead of log commit level ? I guess even before appending the log , the partition leader should check for offset matching, otherwise once appended, all of the other replicas may start replicating it.

If the error is dropped directly in to Ack, should we make the partition leader also subscribe to Ack ? That way we can somehow generate a PublishAsyncError, though it is possible, I am wondering should it be a good practice to do so ?

tylertreat commented 3 years ago

For the ExpectedOffset, should it be at log append level instead of log commit level ? I guess even before appending the log , the partition leader should check for offset matching, otherwise once appended, all of the other replicas may start replicating it.

That's right. The offset is assigned on leader append, so this is where the check would need to happen.

If the error is dropped directly in to Ack, should we make the partition leader also subscribe to Ack ? That way we can somehow generate a PublishAsyncError, though it is possible, I am wondering should it be a good practice to do so ?

Not sure I follow. The server doing the publish already subscribes to the Ack to dispatch it back to the client. See here.

LaPetiteSouris commented 3 years ago

For the ExpectedOffset, should it be at log append level instead of log commit level ? I guess even before appending the log , the partition leader should check for offset matching, otherwise once appended, all of the other replicas may start replicating it.

That's right. The offset is assigned on leader append, so this is where the check would need to happen.

I will make the changes for this part.

If the error is dropped directly in to Ack, should we make the partition leader also subscribe to Ack ? That way we can somehow generate a PublishAsyncError, though it is possible, I am wondering should it be a good practice to do so ?

Not sure I follow. The server doing the publish already subscribes to the Ack to dispatch it back to the client. See here.

Indeed, I missed that part. Looks great actually, then the server can easily reconcile error found on Ack with PublishAsyncError

LaPetiteSouris commented 3 years ago

I added in the latest commit :

Still wondering if I taken into account everything.....