googleapis / nodejs-bigquery-storage

BigQuery Storage Node.js client
Apache License 2.0
34 stars 17 forks source link

Add sample and utilities for doing Change Data Capture with BigQuery Write API #470

Closed alvarowolfx closed 2 months ago

alvarowolfx commented 4 months ago

Document how users can get started with the Change Data Capture - https://cloud.google.com/bigquery/docs/change-data-capture.

Solves issue reported on https://github.com/googleapis/nodejs-bigquery/issues/1386

synchrone commented 3 months ago

@alvarowolfx any hints how to make CDC work?

There're other people out there struggling with this e.g https://github.com/googleapis/googleapis/discussions/891, and SO#78330527

hieumdd commented 3 months ago
  • Naively adding _CHANGE_TYPE to rows in writer.appendRows() in the example silently pushes duplicate rows to the table.

    • After adding _CHANGE_TYPE column definition to protobuf schema - appendRows() throws an RPC error The given value is not a valid CHANGE_TYPE in insert mode: UPSERT.

@alvarowolfx any hints how to make CDC work?

There're other people out there struggling with this e.g googleapis/googleapis#891, and SO#78330527

This is what I've been doing.

  1. Adjust the table schema to "include" _CHANGE_TYPE
    
    const writeStream = await writeClient.getWriteStream({
    streamId: `${destinationTable}/streams/_default`,
    view: protos.google.cloud.bigquery.storage.v1.WriteStreamView.FULL,
    });

const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor( { fields: [ ...writeStream.tableSchema!.fields!, { name: '_CHANGE_TYPE', type: protos.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING, }, ] }, 'root', );

2. Then it is just adding the column upon appending rows
```ts
writer.appendRows([{...row, _CHANGE_TYPE: 'UPSERT'}]);
synchrone commented 3 months ago

@hieumdd thank you for sharing the snippet.

This is exactly what I've been doing. In my experience The given value is not a valid CHANGE_TYPE in insert mode: UPSERT does not get thrown in JS as error, but instead it's silently available as operation result.

Can you check the return value after .appendRows().getResult() promise resolves?

writer
  .appendRows([{...row, _CHANGE_TYPE: 'UPSERT'}])
  .getResult().then(r => {
      console.log(r.appendResult);
  });
hieumdd commented 3 months ago

@hieumdd thank you for sharing the snippet.

This is exactly what I've been doing. In my experience The given value is not a valid CHANGE_TYPE in insert mode: UPSERT does not get thrown in JS as error, but instead it's silently available as operation result.

Can you check the return value after .appendRows().getResult() promise resolves?

writer
  .appendRows([{...row, _CHANGE_TYPE: 'UPSERT'}])
  .getResult().then(r => {
      console.log(r.appendResult);
  });

I believe it is how it works, it returns results, regardless of it being successful or not. I use this to check errors (if there are any)

const writeResult = await writer.appendRows([]).getResult();
if (writeResult.error) {
  throw new Error('xyz');
}

For The given value is not a valid CHANGE_TYPE in insert mode: UPSERT, can you validate if your destination table has added Primary Keys as contraints?

synchrone commented 3 months ago

@hieumdd yes, I ran ALTER TABLE project.dataset.mytable ADD PRIMARY KEY(id) not enforced; and I can see PK label in UI as well as Primary key(s): id in Details tab.

Anything else the table needs? It appears a BQ reservation is optional with this feature, and upserts should work without it?

hieumdd commented 3 months ago

@hieumdd yes, I ran ALTER TABLE project.dataset.mytable ADD PRIMARY KEY(id) not enforced; and I can see PK label in UI as well as Primary key(s): id in Details tab.

Anything else the table needs? It appears a BQ reservation is optional with this feature, and upserts should work without it?

IIRC there is nothing more to be added to the table other than PK. BQ reservation is not needed here. Here is my full implementation if that can be of your assistance

const projectId = await writeClient.getClient().getProjectId();
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const writeStream = await writeClient.getWriteStream({
    streamId: `${destinationTable}/streams/_default`,
    view: protos.google.cloud.bigquery.storage.v1.WriteStreamView.FULL,
});

const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
    {
        fields: [
            ...writeStream.tableSchema!.fields!,
            {
                name: '_CHANGE_TYPE',
                type: protos.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING,
            },
        ],
    },
    'root',
);
const connection = await writeClient.createStreamConnection({
    streamId: managedwriter.DefaultStream,
    destinationTable,
});
const writer = new managedwriter.JSONWriter({ connection, protoDescriptor });
synchrone commented 3 months ago

Ok, it seems the error is thrown if you mix non-CDC rows (without _CHANGE_TYPE) and CDC within the same connection.

Here's a working example:

import {adapt, managedwriter, protos} from "@google-cloud/bigquery-storage";

const projectId = 'project';
const datasetId = 'dataset';
const tableId = 'table';

/**
 * CREATE TABLE `project.dataset.table`
 * (
 *   id STRING NOT NULL,
 *   createdAt TIMESTAMP NOT NULL,
 *   updatedAt TIMESTAMP NOT NULL,
 *   PRIMARY KEY (id) NOT ENFORCED
 * );
 */

const writeClient = new managedwriter.WriterClient({projectId});
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const writeStream = await writeClient.getWriteStream({
  streamId: `${destinationTable}/streams/_default`,
  view: protos.google.cloud.bigquery.storage.v1.WriteStreamView.FULL,
});

const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
  {
    fields: [
      ...writeStream.tableSchema.fields,
      {
        name: '_CHANGE_TYPE',
        type: protos.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING,
      },
    ],
  },
  'root',
);

const connection = await writeClient.createStreamConnection({
  streamId: managedwriter.DefaultStream,
  destinationTable,
});
const writer = new managedwriter.JSONWriter({ connection, protoDescriptor });

const upsertConnection = await writeClient.createStreamConnection({
  streamId: managedwriter.DefaultStream,
  destinationTable,
});
const upsertWriter = new managedwriter.JSONWriter({ connection: upsertConnection, protoDescriptor });

Promise.resolve().then(async () => {
  const createdAt =  (+new Date)*1000;
  const id = `id${createdAt}`;

  writer
    .appendRows([{id, createdAt, updatedAt: createdAt }])
    .getResult().then(r => console.log(new Date, r))

  await new Promise(r => setTimeout(r, 500));

  upsertWriter
    .appendRows([{id, createdAt, updatedAt: (+new Date)*1000,  _CHANGE_TYPE: 'UPSERT'}])
    .getResult().then(r => console.log(new Date, r))
})

importantly, in my example there's a potential duplication of rows. writer will insert a row, but upsertWriter might also insert one, in some cases (observed in a larger deployment from several pods).

I would recommend anyone to avoid mixing up non-CDC and CDC writers within one application. Using the less-documented _CHANGE_TYPE: 'INSERT' if necessary.