wepay / kafka-connect-bigquery

DEPRECATED. PLEASE USE https://github.com/confluentinc/kafka-connect-bigquery. A Kafka Connect BigQuery sink connector
Apache License 2.0
155 stars 191 forks source link

Add upsert/delete support via periodic MERGE statements #264

Open C0urante opened 4 years ago

C0urante commented 4 years ago

Although it's not a common use case for BigQuery, it's come up that it'd be nice to support upserts and deletes with this connector. I'd like to propose the following way to allow the connector to optionally support that functionality, without negatively impacting existing users of the connector in append-only mode:

  1. Derive the key fields for the upsert and/or delete from the key of the message in Kafka, which will be required to be either a map or a struct. This will allow us to use the field names from that key and map them to columns in the destination BigQuery table.
  2. Insert all records received by the connector into an intermediate table, possibly named something like ${table}_tmp, where ${table} is the name of the table that the records are destined for.
  3. Periodically perform a MERGE operation that uses the intermediate table as its source and includes logic for upsert (if the value of the record is non-null) and/or delete (if the value of the record is null) based on the columns present in the record key.

This should only take place if the user enables upserts or deletes; otherwise, the connector will continue streaming directly to the intended table (or GCS bucket) as it does now.

There's definitely some details that need to be ironed out here, such as what will happen if the fields in the record key change over time, how this would interact with table partitioning, and how/where the MERGE would be performed, but this is the overall idea.

@mtagle, @criccomini, what are your thoughts on this?

C0urante commented 4 years ago

@mtagle @criccomini bump :) Would you guys be interested in this and willing to review a PR for it?

criccomini commented 4 years ago

🧠 (thinking)

whynick1 commented 4 years ago

If this feature is to prevent duplicate records, it is not very helpful, because we can add a view on top of BQ table with deduplication logic.

If this feature is to help with table compaction. Then I see value of it. (if too many duplicates exists in BQ, query table without compaction might be costly in both time and money)

But I also have some concerns about MERGE. As one of the UML statement Google provides, there is a quota limit of 1500 to 4000 per day.

Even we take full advantage of it, still means KCBQ connector (under upsert/delete mode) will no longer be real-time streaming. Meanwhile, I suspect if want to do so, because BQ(OLAP) is deigned for append-only. Though UML statements available, it's kind of mis-use to run them frequently.

Last but not least, as intermediate table grow, MERGE become more expensive. Are we going to clear it at any point, like after each MERGE complete? If so, would that affect data streaming?

criccomini commented 4 years ago

In general, I'm concerned about the benefit that this feature would provide vs. the complexity it introduces. Do you have a concrete need for this, or is it more of a hypothetical use case?

mtagle commented 4 years ago

I'm actually working on something like this for our own BQ data, but outside of the connector.

My concern with putting this within the connector is that it just becomes incredibly complex (and very separate from the "normal" data flow). The connector isn't actually just doing stuff per-message anymore, it's doing stuff per-message, and and then also occasionally scheduling some other task that has nothing to do with incoming messages? Even if you put all this logic into the connector, you already are going to need some externally created view to squish these two tables together if you actually want a real-time view of the data that actually takes advantage of the sparser table.

I'm also curious about the intent/benefit of this change.

C0urante commented 4 years ago

Thanks guys! Responses for each of you:

@whynick1 -

@criccomini -

@mtagle -

mtagle commented 4 years ago

I agree the GCS batch loading is similar, but the GCS batch loading process from GCS -> BQ doesn't have a schedule. It just runs asynchronously from the kafka to GCS load job. As a result, I have a better understanding of what happens if the connector fails in the GCS loading case than I do with this. Is the schedule some sort of chron-like schedule? What if the connector is down during that time? If instead it's on some sort of internal timer, what happens if the connector is taken down before that internal timer has elapsed? In general, how often do we expect these merges to happen?

I'm not opposed to this change, but I feel like there are a lot of clarifications that need to be made with regards to implementation.

C0urante commented 4 years ago

Thanks, Moira. I agree that there are a ton of details that need to be worked out for how to implement this and would be happy to draw up and collaborate on a design doc on how we might go about it.

With this issue I just wanted to gauge interest here and see if you guys would be interested in this kind of feature at all. Is it safe to say that, if it can be implemented reasonably, you'd be amenable to seeing this in the connector?

With regards to your specific questions about timing: I was thinking of periodically triggering merges from the connector itself on a configurable interval; users could adjust the interval based on what kind of throughput vs. latency they'd like to see from the connector. A lower interval would provide lower latency but, if too low, may cause issues where merges start tripping over each other. A high interval might provide higher throughput but would delay the write of new rows into the destination table. Depending on how many intermediate tables we would want to use (1, 2, or n), we could define different expectations for synchrony. If there's only one intermediate table, the connector would have to cease consuming from Kafka until a pending merge has completed; if there's more than one table, the connector could keep consuming and switch from the to-be-merged intermediate table to a fresh one, letting the merge take place in the background.

As far as failure goes, since upsert and delete are idempotent, I think we can just assume that all writes have failed until we've managed to finish a merge and commit the offsets for those records to Kafka. If the connector dies anywhere in the middle of that, on restart we can just pick up from where we last committed offsets and try again; reprocessing already-written data shouldn't affect the contents of the destination table.

criccomini commented 4 years ago

Is it safe to say that, if it can be implemented reasonably, you'd be amenable to seeing this in the connector?

Yep. I suggest a more detailed design document as a next step. That way, we can all get on the same page about implementation details.

C0urante commented 4 years ago

Sounds good! I'll see if I can a Google doc together sometime in the next few days.

C0urante commented 4 years ago

Alright, design doc is up and anyone can comment on it here: https://docs.google.com/document/d/1p8_rLQqR9GIALIruB3-MjqR8EgYdaEw2rlFF1fxRJf0/edit

brankoiliccc commented 3 years ago

@C0urante Thank you for your effort in making this happen. Do you have info on when will this be merged to master? I can see it has already been merged to 2.x. ? Or should we just use 2.x?