Open aditya-r-m opened 4 years ago
cc @jihoonson @mgill25
Hi @aditya-r-m, thank you for the proposal!
The proposed extension will work in a manner very similar to Seekable Stream Supervisors & Tasks that we currently have & will be a simplified version of the same in most cases.
I think this explanation is quite vague and I'm not sure how the Pubsub indexing service would work especially with the different architecture of Pub/Sub from Kafka/Kinesis. Would you please add more details including the below?
Hi @jihoonson
We are proposing a 2 step approach:
- Make a naive pubsub indexing service which provides all the guarantees that a regular pubsub consumer would do - that is, at-least once message semantics. This would in in-line with any normal pubsub consumer would work.
- Do some basic research into how systems such as dataflow achieve exactly once processing with pubsub. It is clearly possible to achieve this, since dataflow does it with pubsub (although the details of precisely how are not yet clear to us). This will be more of an exploratory work.
Description on the overall algorithm including what the supervisor and its tasks do, respectively.
The Supervisor looks pretty similar to the KafkaStreamSupervisor's basic functions - creation and management of tasks
If more tasks are required to maintain active task count, it submits new tasks.
A single task would be doing the following basic things:
Does the proposed indexing service provide linear scalability? If so, how does it provide?
The service can keep launching new tasks to process data from subscriptions, as needed. The supervisor can do periodic checks for pubsub metrics, and if the rate of message consumption is falling behind compared to the production rate, it can launch new tasks across the cluster.
How does it handle transient failures such as task failures?
Exactly Once case: I think it's fair to say we currently don't have an extremely clear understanding of making exactly once work, but we know other systems do claim to provide those guarantees. I'm interested in trying to see if we can achieve the same with Druid, but for that to happen, the foundation as described above needs to be built first, IMHO.
There are unanswered questions here that we haven't fleshed out yet. Would be happy to brainstorm. :)
Hi @jihoonson I have updated the description with additional details that @mgill25 mentioned above. Please review if anything else should be added or updated. More details we can add as we move forward with the implementation & testing.
GCP has recently introduced a PubSub variant that roughly operates like kafka, called PubSub Lite - wherein you have a topic, and you have a bunch of partitions, and you consume from the partitions with a given offset.
https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite
Lite topics consist of partitions; a partition is a log of messages. Publishers assign each message to a partition, and the Pub/Sub Lite service appends each message to the log. The Pub/Sub Lite service delivers the messages from a partition in order.
Given this very close model to Kafka, I'm thinking that a PubSub Lite indexing service would actually be fairly straightforward to implement.
Discord would like to fund these efforts, if anyone wants to take this on. Feel free to e-mail me jake@discord.com (or reply here if that's OK) if you're interested.
Motivation
For streaming ingestion, Kafka/Kinesis queues are the two primary options for Druid as of now. Proposal is to write an extension that will allow Data ingestion from Google Cloud Pubsub.
Proposed changes
The proposed extension will work in a manner very similar to Seekable Stream Supervisors & Tasks that we currently have & will be a simplified version of the same in most cases.
Key differences between pubsub & kafka queues in context of this implementation are as follows,
One key design decision that we are suggesting is to have a completely independent extension that does not share any logic with the Seekable Stream Ingestion extensions.
One key challenge to note is that PubSub does not provide exactly-once semantics like Kafka. This means that in cases of high lag or failures, consumers may pull the same packet more than once from the subscription. One reasonable approach to tackle this is best effort deduplication. There are techniques to minimize duplication, a few of them explained as follows,
It could be possible to provide perfect deduplication using a shared key-value store but that would be out of scope for the first version of the extension.
Additional details are as follows,
At-least once semantics: The tasks will send acknowledgements to pubsub only when batch persist succeeds. Thus, in case of failures the packets will be re-sent from the cloud & at-least once semantics will be guraranteed.
Basic description of the algorithms: The extension consists of supervisors & tasks, which are explained as follows,
Supervisor runs in an infinite loop which handles creation and management of tasks If more tasks are required to maintain active task count, it submits new tasks & constantly polls the state of previously started tasks. Supervisors itself can be started & stopped.
A single task would pull in a batch from pubsub (relevant tuning parameters should be available in config), hands off the packets for persistence & sends back an acknowledgement messages to pubsub for the batch on successful persist.
Linear scalability: The system can be scaled just by adding more tasks distributed to whatever druid nodes are discovered. Since multiple consumer instances can pull from the same subscription, we can distribute as many tasks on different machines as we require to keep the lag low.
Failure handling: Failure scenarios & corresponding handling approaches are as follows,
Rationale
There were 2 approaches that were under consideration for this feature.
1) Consume from Pubsub in a manner similar to Kafka. https://github.com/apache/druid/pull/9116
2) Consume from Apache Beam (GCP dataflow) which can pull from pubsub & push to Druid. Since this will be a push based approach while Druid works much more nicely with pull based ingestion, it was discarded for now. This approach opens up a lot of possibilities but at the cost of having intermediate workers running Apache Beam pipelines.
Operational impact
Since the extension is a completely new feature with different pathways, it does not have operational impact on running druid clusters. The dependencies that are globally being updated are the Guava & Guice versions which are very outdated & do not work with latest pubsub libraries. Any possible regression from it should be caught by automated tests.
Test plan
TODO
Future work
Configurable options for perfect deduplication strategies are among the possibilities for future improvements.