z3z1ma / target-bigquery

target-bigquery is a Singer target for BigQuery. It supports storage write, GCS, streaming, and batch load methods. Built with the Meltano SDK.
MIT License
28 stars 37 forks source link

Discussion: Parallelism in target-bigquery #60

Closed mattsax closed 4 months ago

mattsax commented 1 year ago

Parallelism in target-bigquery

I'm opening this issue to discuss the current implementation of parallelism.

Current Workflow

The target currently supports both CPU parallelism and Thread parallelism. When a sink reaches the batch_size, it is considered full and is drained, creating a Job object that contains all the rows of the batch. These jobs are enqueued in a FIFO Queue.

The Target manages a pool of workers. Each time a sink is drained, the Target resizes the worker pool by killing dead workers. It takes into account the following factors:

Each worker pulls jobs from the queue and sends the batch to a GCP API based on the chosen ingestion mode.

Concerns about Parallelism

I have some concerns about using parallelism as it seems to conflict with the spirit of Meltano:

  1. Increased Complexity: Introducing parallelism adds a significant amount of complexity to this package, making it less maintainable. While trying to address issue #43, it became evident that we would need to introduce a Lock mechanism between Sink and Workers, further reducing code readability.

  2. Inconsistency in Data Pipeline: It becomes challenging to handle cases where a single worker fails to send a batch, particularly in storage_write_api mode. Connections may be lost without the stream being committed, while other workers may commit streams and apply a new state, resulting in data loss.

  3. Effectiveness of Parallelism: Based on local tests, the target doesn't scale beyond 1 worker, except when encountering the bug described in issue #43.

  4. Modifications to singer_sdk logic: To make the worker pool logic work, we need to override singer_sdk methods that are marked as final, such as drain_one() and drain_all().

Let's discuss these concerns and evaluate whether parallelism is the best approach. Any suggestions or alternative solutions are welcome.

z3z1ma commented 1 year ago

The problem is conflated by the fact that the Storage Write method is already inherently much more complex than all the other methods. Looking at the batch, legacy streaming, and gcs methods -- the code is actually very simple, readable, and consistently works.

The most bug prone parts of the package are by far going to be storage write, which I think after the last PR is probably in a much better state (we internally leveraged batch job / gcs primarily), and the denormalization / view generation.

If we flagged the storage write as less stable, and the batch as the most stable (and is in vast majority of cases in my experience, the fastest on a single node) we would probably not have as many issues. Also as a best practice we should flag the denormalization: true with convoluted schemas being less resilient than denormalization: false which was originally the only way to load.

Given these 2 pieces of information, I think most users could self service the right usage pattern for their input data. It was also a requirement for us to be able to scale out loads on a single node because of the volume of data we were dealing with (billions of rows). I can attest to the fact that not only does the autoscaling pool work, it works very well. I think the storage write method is just a rough point (and was not a feature we utilized beyond me developing it).

Thoughts @mattsax ?