liftbridge-io / liftbridge

Lightweight, fault-tolerant message streams.
https://liftbridge.io
Apache License 2.0
2.58k stars 107 forks source link

Cursor persistence for consumer offset checkpointing #214

Closed Jmgr closed 4 years ago

Jmgr commented 4 years ago

This is a feature request and API proposal to allow cursor persistence for consumer offset checkpointing.

Use-case

Ably stores messages that are sent by customers. Those messages are published to a Liftbridge cluster for reliable storage and need to be processed by one or multiple consumers (processors). This processing needs to be performed on each message. Message consumers can be interrupted for various reasons, but in case of a crash a new instance of it (or an older instance continuing the work) must be able to continue processing at the position where the other consumer was interrupted. We need to have a way to reliably store per-consumer cursors to a position within a stream.

Rather than re-implementing a reliable storage system for cursors we would like to use Liftbridge to store a certain number of cursors per stream. Those cursors would then be replicated across the Liftbridge cluster and would be stored and retrieved by consumers. A cursor's offset should only be allowed to monotonically increase, so that if multiple consumers try to set the offset of the same cursor only the latest one is taken into account.

Processed message are not required anymore and can be truncated. We would also like to have a feature allowing to truncate all messages in a stream that are older than the earliest cursor, i.e. message that have been processed by all consumers.

API proposal

New RPCs to the API service:

// SetCursor places a cursor on a stream using an offset to the beginning of
// the stream. This cursor is identified by a string. Returns a NoSuchStream
// error code if the given stream does not exist, and an InvalidCursorOffset
// error code if the provided offset is out of bounds.
rpc SetCursor(SetCursorRequest) returns SetCursorResponse) {}

// GetCursor returns a cursor's offset to the beginning of the stream.
// Returns a NoSuchStream error code if the given stream does not exist, and
// an InvalidCursorOffset error code if the provided offset is out of bounds.
rpc GetCursor(GetCursorRequest) returns GetCursorResponse) {}

// QueryEarliestCursor queries for the earliest cursor in a stream and
// returns its id and offset to the beginning of the stream. An empty id and
// an offset of -1 is returned if there is no cursor on this stream. Returns
// a NoSuchStream error code if the given stream does not exist.
rpc QueryEarliestCursor(QueryEarliestCursorRequest) returns QueryEarliestCursorResponse) {}

// TruncateFromEarliestCursor truncates a stream, starting at the earliest
// cursor. Returns a NoSuchStream error code if the given stream does not
// exist. Does nothing if there is no cursor on that stream.
rpc TruncateFromEarliestCursor(TruncateFromEarliestCursorRequest) returns TruncateFromEarliestCursorResponse) {}

New messages:

// SetCursorRequest is send to set a stream cursor to an offset.
message SetCursorRequest {
    string stream   = 1; // Stream name
    string cursorId = 2; // User-supplied value to uniquely identify this cursor
    int64  offset   = 3; // Offset where the cursor should be placed
}

// SetCursorResponse is sent by server after setting a stream cursor.
message SetCursorResponse {
    // Intentionally empty.
}

// GetCursorRequest is sent to retrieve a stream cursor's offset.
message GetCursorRequest {
    string stream   = 1; // Stream name
    string cursorId = 2; // User-supplied value to uniquely identify this cursor
}

// GetCursorResponse contains the requested stream cursor offset.
message GetCursorResponse {
    int64 offset = 1; // Offset where the cursor is located
}

// QueryEarliestCursorRequest is send to query for the earliest cursor in a
// stream.
message QueryEarliestCursorRequest {
    string stream   = 1; // Stream name
}

// QueryEarliestCursorResponse contains the queried stream cursor offset.
message QueryEarliestCursorResponse {
    string cursorId = 1; // User-supplied value to uniquely identify this cursor
    int64  offset   = 2; // Offset where the cursor is located
}

// TruncateFromEarliestCursorRequest is send to truncate messages from a stream,
// starting at the the earliest cursor.
message TruncateFromEarliestCursorRequest {
    string stream   = 1; // Stream name
}

// TruncateFromEarliestCursorResponse is sent by the server after truncating
// messages from a stream, starting at the the earliest cursor.
message TruncateFromEarliestCursorResponse {
    // Intentionally empty.
}

Go API

New functions to the Client interface:

// SetCursor places a cursor on a stream using an offset to the beginning of the stream.
// The cursor is identified by a string. Returns ErrNoSuchStream if the given stream does not
// exist, and ErrInvalidCursorOffset if the provided offset is out of bounds.
SetCursor(ctx context.Context, stream, id string, offset int64) error
// GetCursor returns a cursor's offset to the beginning of the stream. Returns ErrNoSuchStream
// if the given stream does not exist, and ErrInvalidCursorOffset if the provided offset is out of bounds.
GetCursor(ctx context.Context, stream, id string) (int64, error)
// QueryEarliestCursor queries for the earliest cursor in a stream and returns its id and
// offset to the beginning of the stream. An empty id and an offset of -1 is returned if
// there is no cursor on this stream. Returns ErrNoSuchStream if the given stream does not exist.
QueryEarliestCursor(ctx context.Context, stream string) (string, int64, error)
// TruncateFromEarliestCursor truncates a stream, starting at the earliest cursor.
// Returns ErrNoSuchStream if the given stream does not exist. Does nothing if there is no
// cursor on that stream.
TruncateFromEarliestCursor(ctx context.Context, stream string) error

Related to #46.

tylertreat commented 4 years ago

Thanks for the proposal. I'll need some time to think it through. A few questions that immediately come to mind:

  1. Is coordination of cursors out of scope with this? E.g. if you have a "primary" consumer and a "backup" consumer, how do you handle the case of failing over to the backup? What happens if two consumers are updating the same cursor concurrently? Is the intention here something external would handle this coordination?

  2. Related to the above question, how would you see this fitting in to the vision of consumer groups in Liftbridge? Would this API serve as the primitives on which consumer groups are implemented?

  3. I think it's OK to expose these primitives directly through the client API interface, but I imagine we also would want to provide higher-level functionality than can abstract cursor management away from the user? E.g. subscribe functionality that handles checkpointing the cursor automatically.

  4. TruncateFromEarliestCursor is probably the piece I feel most unsure about since it seems closely related to stream compaction and retention. Is the problem this is attempting to solve not solvable using retention policies? I suppose the issue is guaranteeing that all consumers have processed rather than relying on time- or space-based retention rules? I would definitely like to explore this addition a bit more. For example, could Liftbridge be smarter around retention and do some kind of "interest-based" retention without users having to manage that themselves?

  5. Have you put any thought into implementation? My plan for cursor storage was to use internal streams that are compacted based on the stream/partition/consumer. My thought was this internal stream would be partitioned, so auto-pausing of partitions could be useful in the case where cursor partitions are idle. Curious if you have thoughts here.

spenczar commented 4 years ago

Should cursors be associated with a stream, or with a partition of a stream? I thought partitions were the ones with a defined ordering.

How long do cursors live for? It sounds like they never get deleted. I think that seems too long.

What is QueryEarliestCursor for? What about a ListCursors instead, and let the client sort the result?

Is there a way to guarantee that SetCursor has propagated throughout the Liftbridge cluster before returning? I'd be interested in what sort of transactionality claims can be made.

tylertreat commented 4 years ago

Should cursors be associated with a stream, or with a partition of a stream? I thought partitions were the ones with a defined ordering.

Good catch @spenczar. That is correct in my mind.

How long do cursors live for? It sounds like they never get deleted. I think that seems too long.

My thought would be to store them in a compacted stream, so only the newest cursor position is stored for each consumer. Beyond that, I'm not sure you can make any assumptions when they should be destroyed, other than perhaps in the case of a consumer group being deleted/closed? Curious what @Jmgr's thoughts are.

Is there a way to guarantee that SetCursor has propagated throughout the Liftbridge cluster before returning? I'd be interested in what sort of transactionality claims can be made.

This should be solved if we rely on Liftbridge streams to store the cursors.

Jmgr commented 4 years ago

Thanks for the feedback!

1.

Is coordination of cursors out of scope with this? E.g. if you have a "primary" consumer and a "backup" consumer, how do you handle the case of failing over to the backup?

Yes, we were not planning to solve that in this issue.

What happens if two consumers are updating the same cursor concurrently?

One solution could be that SetCursor (or maybe UpsertCursor) is idempotent and only moves the cursor forward (so maybe AdvanceCursor could be a better name, even if it does create a cursor if it doesn't exist).

2.

Related to the above question, how would you see this fitting in to the vision of consumer groups in Liftbridge? Would this API serve as the primitives on which consumer groups are implemented?

This could be used as such a primitive, yes. The idea for this proposal is to have a low-level API first.

  1. I think it's OK to expose these primitives directly through the client API interface, but I imagine we also would want to provide higher-level functionality than can abstract cursor management away from the user? E.g. subscribe functionality that handles checkpointing the cursor automatically.

That would be another option, yes. But then each client/consumer would have to provide some sort of id to be uniquely identifiable?

4.

TruncateFromEarliestCursor is probably the piece I feel most unsure about since it seems closely related to stream compaction and retention. Is the problem this is attempting to solve not solvable using retention policies? I suppose the issue is guaranteeing that all consumers have processed rather than relying on time- or space-based retention rules? I would definitely like to explore this addition a bit more. For example, could Liftbridge be smarter around retention and do some kind of "interest-based" retention without users having to manage that themselves?

Yes, there could be a flag that automatically performs this truncation. This could also be part of another API, more related to compaction and retention.

5.

Have you put any thought into implementation? My plan for cursor storage was to use internal streams that are compacted based on the stream/partition/consumer. My thought was this internal stream would be partitioned, so auto-pausing of partitions could be useful in the case where cursor partitions are idle. Curious if you have thoughts here.

Not really, but that looks like a good solution.

Should cursors be associated with a stream, or with a partition of a stream? I thought partitions were the ones with a defined ordering.

Yes, they should be associated with a steam partition. That would require adding a partition parameter to the requests.

My thought would be to store them in a compacted stream, so only the newest cursor position is stored for each consumer.

That would make sense.

Beyond that, I'm not sure you can make any assumptions when they should be destroyed, other than perhaps in the case of a consumer group being deleted/closed? Curious what @Jmgr's thoughts are.

That would be one case. There could also be a stream flag that enables removing cursors when truncating.

What is QueryEarliestCursor for? What about a ListCursors instead, and let the client sort the result?

That would also work.

Is there a way to guarantee that SetCursor has propagated throughout the Liftbridge cluster before returning? I'd be interested in what sort of transactionality claims can be made.

I suppose that would be solved if we make SetCursor idempotent (see above).

spenczar commented 4 years ago

Ah, I thought idempotency was already specified. This line from the original post seemed to ensure it:

A cursor's offset should only be allowed to monotonically increase, so that if multiple consumers try to set the offset of the same cursor only the latest one is taken into account.

I strongly feel that idempotency is required for this design. I assume that "latest" refers to "highest offset," and is not a reference to a wall-time clock.

Jmgr commented 4 years ago

I assume that "latest" refers to "highest offset," and is not a reference to a wall-time clock.

Yes, exactly.

tylertreat commented 4 years ago

I propose we set aside the truncation piece to a separate proposal in order to focus this discussion and narrow the scope. IMO that is a separate concern. Specifically, I think TruncateFromEarliestCursor and QueryEarliestCursor can come in a follow-up proposal based on how we decide to proceed with cursor persistence. I think there is also more to discuss there, such as possible alternatives like "interest-based retention", etc.

tylertreat commented 4 years ago

Just an update on this: I am in the process of implementing automatic partition pausing (#218), which I consider to be a requisite for cursor persistence.

ryan-stinson-elembio commented 4 years ago

Newcomer to this project and LOVE it!

Shouldn't consumer offset be automatically handled via message ACK?

When a consumer ACKs a message that consumer's offset should automatically be updated. For the case that messages are not ACKd, the the consumer's offset should be incremented upon delivery. This will greatly reduce the client complexity which is one of the benefits of this tech over something like kafka.

tylertreat commented 4 years ago

Shouldn't consumer offset be automatically handled via message ACK?

Liftbridge doesn't have consumer acks. In that sense, it's the same type of model as Kafka—the server doesn't do any bookkeeping as such.

I do believe client libraries should provide higher-level consumer functionality to checkpoint offsets (similar to Kafka's high-level consumer), but I'm open to hearing people's thoughts/ideas around this.

LaPetiteSouris commented 4 years ago
  1. Have you put any thought into implementation? My plan for cursor storage was to use internal streams that are compacted based on the stream/partition/consumer. My thought was this internal stream would be partitioned, so auto-pausing of partitions could be useful in the case where cursor partitions are idle. Curious if you have thoughts here.

Do you mean you would have one single internal stream to store the cursor of existing streams ?

Let say, in order to define cursor, you need Stream, Partition and CursorID;

message SetCursorRequest {
    string stream    =  1; // Stream name
    string cursorId  =  2; // User-supplied value to uniquely identify this cursor
    int64  offset      =  3; // Offset where the cursor should be placed  
    int    partition    = 4; // Partition

}

// GetCursorRequest is sent to retrieve a stream cursor's offset.
message GetCursorRequest {
    string stream    = 1; // Stream name
    string cursorId  = 2; // User-supplied value to uniquely identify this cursor
    int    partition = 3; // Partition
}

Then on the internal stream, let call it InternalCursorStorage for example, the key for each cursor would be something like stream_partition_cursorID and the InternalCursorStorage would use that stream_partition_cursorID for stream compaction.

Would that make sense ?

LaPetiteSouris commented 4 years ago

I strongly feel that idempotency is required for this design. I assume that "latest" refers to "highest offset," and is not a reference to a wall-time clock.

I guess that the cursor would be an AdvanceOnlyCursor and the offset would be set to the highest offset then.

tylertreat commented 4 years ago

Would that make sense ?

@LaPetiteSouris Yes, that is roughly what I am thinking. I think there is also opportunity for the broker to cache cursors for faster lookups, perhaps with an LRU or some such, but the data would be backed by a partitioned stream.

tylertreat commented 4 years ago

FYI, I'm planning to start working on this soon.

tylertreat commented 4 years ago

Addressed in #268