apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.16k stars 3.57k forks source link

Cloning subscriptions #6161

Closed bezmax closed 4 years ago

bezmax commented 4 years ago

Is your feature request related to a problem? Please describe. We are designing a stateful stream processing system with periodic snapshots of the state. Only after the state is snapshotted - we acknowledge all the pending messages in pulsar's subscription, allowing us to process the stream in a fault-tolerant way. If the processing fails - we can always redeliver unacknowledged messages and start from previous state snapshot.

However, currently, we are constrained to being able to only restore the last snapshot. There's no way to go to the state before that, as the subscription has already advanced. The easiest solution to this would be a way to clone a subscription at the time of the snapshot, therefore retaining a previous version of it for the previous snapshot. Then if we need to recover 2 versions back - we would just use the subscription correlated to the snapshot we took.

Describe the solution you'd like Pulsar Admin command that copies a subscription with a new name. For example topics copy-subscription -s existingsub -c newsub persistent://..../topicname

Describe alternatives you've considered Probably the only alternative would be to abandon subscription model and track watermark manually using a Reader.

codelipenghui commented 4 years ago

Reader supports seek method https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java#L133 . Is it a achieve for your requirement?

bezmax commented 4 years ago

Yes, as I mentioned in my last paragraph, Reader is the only alternative that would work. I would like to avoid using it if possible, as we would need to reimplement full functionality of subscriptions but on the client side, as we need an out-of-order acknowledgements as well.

Maybe explaining the full flow of the system (Apache Flink based) would be useful:

Message is read from pulsar and sent asynchronously down the processing pipeline. Given that some messages go down the pipeline faster than others - ordering might be lost. At a point of time, a synchronization signal is sent for all processors to "checkpoint" their state changes, similar to a "commit" of a transaction. At this moment our reader needs to ack the messages that have reached the end of the processing pipeline (which again, might miss a few late ones).

Every "commit" that the system has made is in theory restorable. That is, for every other component we can say "restore your state from 3 commits before now". Except Pulsar subscriber. In pulsar subscriber the only "commit" I can go back is the set of messages that have been acked by sending a redeliverUnacked command.

That's where this feature request comes in. By copying a subscription we would essentially be able to make a "snapshot" of the state of subscription that we would be able to go back later if needed.

Thanks, hope that clarifies the feature request.

codelipenghui commented 4 years ago

But essentially the copied subscription is copy a starting point of original subscription, isn't it? We just need to store last N committed message id so that we can re-consume message from that point. Remind me if i miss something here, thanks.

I'm not familiar with Apache Flink, I would like add @yjshen to join the discussion since he has done a lot of work on Pulsar and Flink integration.

jiazhai commented 4 years ago

@bezmax Would you please help answer @codelipenghui's comments above?

yjshen commented 4 years ago

If I understand it correctly, by cloning a subscription, you want to "bookmark" a reading position for a partition, and you could start from it later on? If this is the situation, I suggest you use the Reader API instead of Consumer in tasks and maintain read position for each commit internally in your app.

Here is what I've done in flink-pulsar-connector:

  1. an app-wide durable subscription for each partition, used to prevent Pulsar from deleting messages eagerly.
  2. For each partition you gonna read, use reader API to do message processing, by seeking to a position (either restored from a checkpoint or from an initial position), and report its reading position when a checkpoint is triggered by JobManager.
  3. When a checkpoint is done, reset the cursor of the durable subscription to the new position, notify Pulsar that the app has consume the batch of messages successfully and Pulsar is free to delete those messages.

You could check https://github.com/streamnative/pulsar-flink to see if it fulfills your requirement to use flink over pulsar. A more detailed description of the connector could be found here: https://medium.com/streamnative/use-apache-pulsar-as-streaming-table-with-8-lines-of-code-39033a93947f

bezmax commented 4 years ago

@yjshen Yes, that would work, however unless I'm missing something it will result in a lot of duplicates reprocessed. I'm ok with duplicates but in my case the messages might have completely different processing times (some need enrichment, some do not) and this would result in hundreds of thousands duplicates being reprocessed.

For example, if message 1 needs a second long enrichment (with async operator), and in that time 100k messages get processed, if snapshotted before the first message has finished being enriched - the reader position will still be before it, as we can't advance the cursor durably while the message is stuck in front. So in case of failure - instead of processing just one message, whole 100k will need to be processed again.

Again, maybe my understanding of Flink/Pulsar is incomplete, but I feel like the only way to avoid this behavior is through copying the state of whole subscription (it's real backlog + watermark).

bezmax commented 4 years ago

I have reread how asynchronous operators work in Flink and yes, Reader approach would suffice the way @yjshen has described. I'll close this feature request.

I was under assumption that async operators are inherently stateless, meaning that if my asynchronous processing of message fails - source would be responsible to redeliver original on recovery. Now I found out that async operators actually store full set of unprocessed messages and redeliver them on failure - effectively implementing what I was asking for here but on Flink side. Probably the only downside of this is - I can not have multiple sources for same partition, but with high enough partition count this is also not a big deal.

Thanks for the help and sorry for taking your time.