googleapis / java-bigquerystorage

Apache License 2.0
61 stars 84 forks source link

BigQuery: Multiple rows with same primary key in single CDC-enabled Storage Write API request #2404

Closed C0urante closed 8 months ago

C0urante commented 9 months ago

I've noticed some interesting behavior when multiple rows with the same primary key are written in a CDC-enabled Storage Write API request. It seems like the first row for each given primary key is given precedence. It's debatable whether this is a bug, but it'd be nice if there could be some clarification on this behavior:

I ask because when consuming from, e.g., a stream of Kafka records generated by Debezium, this scenario may arise, and it seems like there are three options to deal with it:

  1. BigQuery client and/or backend behavior changes to give the last row in each request precedence, instead of the first (which IIUC would mirror the behavior if each row were written in its own request)
  2. This behavior is intentional, will not be changed, and can be relied upon, in which case we might work with it by reversing the order of rows in each request we send
  3. This behavior cannot be relied upon, in which case we'll probably have to split up each batch we write to only include one row for each primary key value

Environment details

  1. BigQuery
  2. OS type and version: MacOS Sonoma 14.0
  3. Java version: 11

Steps to reproduce

  1. Follow steps similar to the CDC Json writer example, but provide multiple rows with the same primary key in a single write request (see code example)
  2. Observe that the first row with a given primary key in each write request appears to "win". In the code example, the end state of the table is that the "data" column has the value "CDC row 1" since that was the value in the first row, while every other row in the request had the value "CDC row 2".

Code example

package io.aiven.kafka.connect.bigquery;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Clustering;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.PrimaryKey;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableConstraints;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import org.json.JSONArray;
import org.json.JSONObject;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class CdcBugRepro {

  private static final String PK_FIELD = "pk";
  private static final String DATA_FIELD = "data";
  private static final String CDC_FIELD = "_CHANGE_TYPE";

  public static void main(String[] args) throws Throwable {
    if (args.length != 3) {
      System.err.println("Usage: CdcBugRepro <project> <dataset> <table>");
      System.exit(1);
    }

    String project = args[0];
    String dataset = args[1];
    String table = args[2];

    TableName tableName = TableName.of(project, dataset, table);
    TableId tableId = TableId.of(project, dataset, table);

    BigQuery bigQuery = BigQueryOptions.newBuilder()
        .setProjectId(project)
        .setCredentials(GoogleCredentials.getApplicationDefault())
        .build()
        .getService();

    if (bigQuery.getTable(tableId) == null) {
      createTable(bigQuery, tableId);
      // Wait for table creation to propagate
      Thread.sleep(5_000L);
    } else {
      System.out.println("Table already exists");
    }

    writeToTable(tableName);
  }

  private static void createTable(BigQuery bigQuery, TableId tableId) {
    System.out.println("Creating table");
    Schema createdTableSchema = Schema.of(
        Field.newBuilder(PK_FIELD, StandardSQLTypeName.INT64)
            .setMode(Field.Mode.REQUIRED)
            .build(),
        Field.of(DATA_FIELD, StandardSQLTypeName.STRING)
    );
    TableDefinition tableDefinition = StandardTableDefinition.newBuilder()
        .setSchema(createdTableSchema)
        .setClustering(Clustering
            .newBuilder()
            .setFields(Collections.singletonList(PK_FIELD))
            .build()
        )
        .setTableConstraints(
            TableConstraints
                .newBuilder()
                .setPrimaryKey(PrimaryKey.newBuilder()
                    .setColumns(Collections.singletonList(PK_FIELD))
                    .build()
                )
                .build()
        )
        .build();
    TableInfo tableInfo = TableInfo.of(tableId, tableDefinition);
    bigQuery.create(tableInfo);
    System.out.println("Created table");
  }

  private static void writeToTable(TableName tableName) throws Throwable {
    TableSchema writtenTableSchema = TableSchema.newBuilder()
        .addFields(TableFieldSchema.newBuilder()
            .setName(PK_FIELD)
            .setType(TableFieldSchema.Type.INT64)
            .setMode(TableFieldSchema.Mode.REQUIRED)
            .build()
        ).addFields(TableFieldSchema.newBuilder()
            .setName(DATA_FIELD)
            .setType(TableFieldSchema.Type.STRING)
            .setMode(TableFieldSchema.Mode.NULLABLE)
            .build()
        ).addFields(TableFieldSchema.newBuilder()
            .setName(CDC_FIELD)
            .setType(TableFieldSchema.Type.STRING)
            .setMode(TableFieldSchema.Mode.NULLABLE)
            .build()
        ).build();

    Map<String, Object> cdcRow1 = new HashMap<String, Object>() {
      {
        put(PK_FIELD, 1);
        put(DATA_FIELD, "CDC row 1");
        put(CDC_FIELD, "UPSERT");
      }
    };
    Map<String, Object> cdcRow2 = new HashMap<String, Object>() {
      {
        put(PK_FIELD, 1);
        put(DATA_FIELD, "CDC row 2");
        put(CDC_FIELD, "UPSERT");
      }
    };

    List<JSONObject> values = new ArrayList<>(
        Collections.nCopies(1024 * 255, new JSONObject(cdcRow2))
    );
    values.set(0, new JSONObject(cdcRow1));

    try (JsonStreamWriter streamWriter = JsonStreamWriter
        .newBuilder(tableName.toString(), writtenTableSchema)
        .build()) {
      streamWriter.append(new JSONArray(values)).get();
    }
  }

}

External references such as API reference guides

C0urante commented 9 months ago

Hi @Neenu1995! Sorry for the direct ping, but is there any chance you or someone else on this project could take a look?

anahan0369 commented 8 months ago

Hi Chris,

Thanks for the question!

  1. Is it intentional? No, please see explanation blow.
  2. Can it be relied on? No.

Detailed explanation:

The current API sorts the records by I/O system ingestion time, the row with largest I/O ingestion time 'wins'. For the example you presented, all the rows which are appended in the same request have the same I/O ingestion time. In theory, the system just pick the one randomly.

You might inquire about how to ensure the ordering from the client side: Option 1: streamWriter.append("key1", row1) streamWriter.append("key1", row2) The latter append will have larger I/O ingestion time.

Option 2: (not available yet, stay tuned) Clients can specify an ordering number on the client side, and the system will utilize the provided ordering to sort the records.

Please let me know if you have further questions.

C0urante commented 8 months ago

Ah, neat! Thanks for the insight, this is really helpful.

Option 1 does seem feasible--some follow-up questions:

Option 2 would be lovely, please let me know if/when it lands! If you'd accept a small suggestion--it might be nice to have the stream writer automatically number rows in ascending order.

I should also note that pre-compaction (i.e., only sending the latest-available record for each primary key in each upstream batch to BigQuery) seems like a viable option too--thoughts?

anahan0369 commented 8 months ago
  1. Is there a potential impact on throughput if we take the naive approach and issue a single StreamWriter::append call for each individual row?

Yes, this is not preferred due to performance reason.

  1. What's the granularity of the ingestion time? Milliseconds.

  2. Does this change the atomicity of insertions? It will, since it is a single request per row.

  3. Pre-compaction would work perfectly with your case, but we should do that work for you if you just specify which row you want to pick per key by assigning the ordering as mentioned in the option 2.

C0urante commented 8 months ago

Wonderful, thanks @anahan0369! I think because of the performance limitations and potential gotchas with timestamp granularity we'll use pre-compaction for now, but option 2 would definitely be fantastic if/when it's available.