apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.88k stars 4.26k forks source link

[Feature Request]: Support draining Spanner Change Stream connectors #30167

Open ianb-pomelo opened 9 months ago

ianb-pomelo commented 9 months ago

What would you like to happen?

Right now one of the known limitations of the Spanner change stream source is it can't be drained 1. Is there a way to allow draining this connector?

Currently our use case is we have a job that consumes change stream value but the structure of this jobs changes frequently. To handle this, we try to do in-place updates and if those fail, drain and start a new job. This works with Pub/Sub sources but to get around the fact that the change streams can't be drained, we have an intermediate job that converts the Spanner changes into Pub/Sub messages and then the changing job consumes that. However, this has caused a huge increase in latency, the commit time -> change stream read is pretty consistently 200ms but when we add this Pub/Sub layer, it increases the latency to ~5s.

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

liferoad commented 9 months ago

cc @nielm

Abacn commented 9 months ago

Draining streaming should work in general. If it's not draining there is something missing in the SDF, a fix similar to #25716 may work

nielm commented 9 months ago

cc @thiagotnunes (change streams author) for comment, But I believe ChangeStreams does not use SDFs, which is probably why the drain is not working. #

The partitions are generated by Spanner itself and are read by a normal DoFn. (SpannerIO:1751)

thiagotnunes commented 9 months ago

cc @nancyxu123 , current owner here

efalkenberg commented 9 months ago

Hey @ianb-pomelo

Thanks for the feedback! Draining is something that we have in our backlog, but not prioritized yet. I really appreciate the context that you provided, I will add that to our internal ticket and we'll update here when this gets prioritized.

Thanks!

Eike

ianb-pomelo commented 9 months ago

Thanks for the update, looking forward to seeing it prioritized!

bangau1 commented 1 month ago

Hi all,

I also recently experimented a bit with SpannerIO's changestream to GCS Storage (from the provided template from google: https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-cloud-storage). I've been trying to dig into any documentation that I can found, and to realized that the draining operation isn't supported. But I can confirm that the update in place is working.

The other thing that I found is that while cancelling the job is working, submitting another job with the same jobname and same metadata's table name doesn't work. I expect that it can continue ingesting the changestream from the previous checkpoint (that's what the metadata table is for CMIIW?).

I asked in the stackoverflow about the detail here: https://stackoverflow.com/questions/79027920/restarting-the-spannerios-changestream-to-gcs-text-json-pipeline-got-error

Abacn commented 1 month ago

submitting another job with the same jobname and same metadata's table name doesn't work.

This is working as intended. Dataflow cannot have two jobs with the same job name unless one is in Done status (not running, cancelling, draining, etc)

bangau1 commented 1 month ago

submitting another job with the same jobname and same metadata's table name doesn't work.

This is working as intended. Dataflow cannot have two jobs with the same job name unless one is in Done status (not running, cancelling, draining, etc)

@Abacn I meant I cancelled it (stopped it), then proceed by submit a new pipeline with the same jobName and metadata table. But it returns error.

Abacn commented 1 week ago

@Abacn I meant I cancelled it (stopped it), then proceed by submit a new pipeline with the same jobName and metadata table. But it returns error.

It takes time to have job move to "Done" status, usually minutes or longer.

bangau1 commented 1 week ago

@Abacn I meant I cancelled it (stopped it), then proceed by submit a new pipeline with the same jobName and metadata table. But it returns error.

It takes time to have job move to "Done" status, usually minutes or longer.

@Abacn just want to clarify if my comment wasn't clear: i submitted the second same jobName once the previous job was completely stopped (cancelled). The second job got error

Should I submit different issue for this? I already asked in stackoverflow, which error being shown up, etc.