googleapis / nodejs-bigquery-storage

BigQuery Storage Node.js client
Apache License 2.0
34 stars 17 forks source link

How to handle and safely ignore errors when using committed type stream #467

Closed convers39 closed 1 month ago

convers39 commented 4 months ago

Thanks for stopping by to ask us a question! Please make sure to include:

PLEASE READ: If you have a support contract with Google, please create an issue in the support console instead of filing on GitHub. This will ensure a timely response.

What you're trying to do

I migrate my code from tabledata.insertAll API to storage write API, as I found that tabledata.insertAll will occasionally insert the same data twice.

I implemented it as the document and code example shows with CommittedStream, however I have some trouble handling errors.

What I am trying to do is to ignore errors as the doc suggested but meanwhile catch errors that should not be ignored.

When you specify an offset, the write operation is idempotent, which makes it safe to retry due to network errors or unresponsiveness from the server. Handle the following errors related to offsets:

  • ALREADY_EXISTS (StorageErrorCode.OFFSET_ALREADY_EXISTS): The row was already written. You can safely ignore this error.
  • OUT_OF_RANGE (StorageErrorCode.OFFSET_OUT_OF_RANGE): A previous write operation failed. Retry from the last successful write.

What code you've already tried

Here is the code of my implementation.

    const writeStream = await writeClient.createWriteStreamFullResponse({
      streamType,
      destinationTable,
    });
    if (writeStream.name == null) {
      throw new Error('writeStream undefined');
    }
    const streamId = writeStream.name;
    logger.info(`Stream created: ${streamId}`);
    if (writeStream.tableSchema == null) {
      throw new Error('table schema undefined');
    }

    const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
      writeStream.tableSchema,
      'root',
    );

    const connection = await writeClient.createStreamConnection({
      streamId,
    });
    const writer = new managedwriter.JSONWriter({
      connection,
      protoDescriptor,
    });

    let currentOffset = 0;
    while (currentOffset < data.length) {
      const dataChunk = data.slice(
        currentOffset,
        currentOffset + BQ_INSERT_DATA_CHUNCK_COUNT,
      );
      const pw = writer.appendRows(dataChunk as JSONList, currentOffset);
      const result = await pw.getResult();
      currentOffset = Number.parseInt(
        result.appendResult?.offset?.value?.toString() ?? '0',
        10,
      );
      currentOffset += BQ_INSERT_DATA_CHUNCK_COUNT;
      // TODO: error handling
      logger.info('pending write pushed', {
        result,
        currentOffset,
      });
    }

    logger.info('data inserted');
    await connection.finalize();

Any error messages you're getting

PendingWrite.getResult will contains rowErrors and error properties, the documented 2 errors will come in the error prop. Here is the screenshot when I produce the error intentionally with the same offset value.

image

I tried to decode the Buffer in error.details with toString, and indeed I found the ALREADY_EXISTS keyword.

image

However, the error.code is 6, which is different from what I found in the source code

    // Offset already exists.
    OFFSET_ALREADY_EXISTS = 8;

    // Offset out of range.
    OFFSET_OUT_OF_RANGE = 9;

Now I don't have any idea what will the error code be for OUT_OF_RANGE error, or where to find the correct error code list.

Additional questions

Apart from the error code mismatch issue above, I am also not sure about the error handling implementation, and the offset manipulation due to the lack of sample code or documentation.

convers39 commented 4 months ago

BTW google support responded with 'ask our sales or account team', now I can only rely on github :(

Hopefully someone can follow on this question 🙏

image
LumaKernel commented 4 months ago

I found the mapping in Python library looks also the same. https://github.com/googleapis/python-bigquery-storage/blob/54f9d21db873db50b505b97f46019ba89e709b00/google/cloud/bigquery_storage_v1/types/storage.py#L827-L828

larssn commented 2 months ago

I wonder if this repo has a product owner or any dedicated team at all, because people seem to be entirely on their own in here. And all the API's feel pretty "alpha" and WIP.

It's weird.

alvarowolfx commented 2 months ago

@convers39 Sorry for the late reply on this, I was out during the time this issue landed and I ended missing giving a follow up here and I'm very sorry.

To answer some of your questions:

I tried to decode the Buffer in error.details with toString, and indeed I found the ALREADY_EXISTS keyword. Now I don't have any idea what will the error code be for OUT_OF_RANGE error, or where to find the correct error code list.

This is indeed confusing, but let me explain here. The error code that you got 6 is the gRPC Status Code ALREADY_EXISTS, which is the gRPC status that is thrown if user provides an offset that has already been written to or gRPC Status 11 OUT_OF_RANGE if an attempt is made to append to an offset beyond the current end of the stream. Then with those errors codes, you can get different Storage Error codes, which are the ones you can parse using this utility here and are going to have those other extra error codes that you already found out.

To clarify: If offset is specified, the offset is checked against the end of stream. You can retry with adjusted offset within the same StreamConnection. If offset is not specified, append happens at the end of the stream.

Will the error like OFFSET_ALREADY_EXISTS be thrown somewhere? Or do I have to check the result for each PendingWrite?

The OFFSET_ALREADY_EXISTS error is an AppendRowsResponse level error, so it should come only as a part of the response that you get from PendingWrite.getResult (which is in the end an AppendRowsResponse) and it will be on the error attribute of it.

Do I need to control the offset on my application side instead of an empty parameter? As the doc said I should manage stream offset to achieve exactly-once semantics. Meanwhile, I also tried with an empty offset parameter and cannot find any difference in the behavior, I am not sure if an empty offset will produce duplicate insertion.

This answer depends on your application needs. As you already mentioned that you need exactly-once semantics, it's better to use offsets. This page contains a guide on which WriteStream type is the best depending on your use case: https://cloud.google.com/bigquery/docs/write-api. But yes, if you use a Default stream for example, is not guaranteed that you're not going to get duplicate insertions.

alvarowolfx commented 2 months ago

@larssn sorry that you feel that way, we do have a team and I'm current the owner of this repo. I have been trying to keep all issues answered and resolved here, and the issue count number here is much lower than the past. The open issues that we have are mostly internal feature requests/improvements which we can't work right now due to other work streams.

We also have work on the BigQuery Storage Read API that is happening, which is going to make life easier to use that API and fetch results using less memory and much faster. Similar to this Write veneer that didn't even exists an year ago, which makes users life easier as we already have other customer successful stories.

Again sorry, for the super late reply here.

convers39 commented 2 months ago

@alvarowolfx thank you for your follow-up

I have proceeded with my implementation based on what I got from the pendingWrite.getResult response.

And thanks for clarifying the OUT_OF_RANGE and ALREADY_EXISTS error codes. I could reproduce OUT_OF_RANGE by passing a bigger offset than expected, which does respond with error code 11.

Thus as you said, I need to manage the offset value on my client side, so that the OUT_OF_RANGE error would never happen, and just ignore ALREADY_EXISTS in case it occurred.

Currently, I have my implementation as below. I may adjust the error handling based on the grpc status code instead.

export const streamDataToBq = async <T extends Array<Record<string, unknown>>>({
  destinationTable: string,
  data: T,
  logger: Logger,
}) => {
  const streamType = managedwriter.CommittedStream;
  const writeClient = new managedwriter.WriterClient();
  logger.info('preparing write stream');
  const writeStream = await writeClient.createWriteStreamFullResponse({
    streamType,
    destinationTable,
  });

  if (writeStream.tableSchema == null) {
    throw new Error(`tableSchema for table '${destinationTable}' is undefined`);
  }
  const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
    writeStream.tableSchema,
    'root',
  );

  if (writeStream.name == null) {
    throw new Error(`writeStream for table '${destinationTable}' is undefined`);
  }
  const streamId = writeStream.name;
  const connection = await writeClient.createStreamConnection({
    streamId,
  });
  logger.info(`Stream connection created: ${streamId}`);

  const writer = new managedwriter.JSONWriter({
    connection,
    protoDescriptor,
  });

  try {
    logger.info('appending data to write stream');
    let currentOffset = 0;
    const pendingWrites = [];
    while (currentOffset < data.length) {
      const dataChunk = data.slice(
        currentOffset,
        currentOffset + BQ_INSERT_DATA_CHUNCK_SIZE,
      );
      const pw = writer.appendRows(dataChunk as JSONList, currentOffset);
      pendingWrites.push(pw);
      currentOffset += dataChunk.length;
      logger.info('pending write pushed', {
        currentOffset,
      });
    }
    const results = await Pomise.all(pendingWrites.map(pw => handleAppendResult(pw, logger)));
    logger.info('data inserted');
    return results;
  } catch (e: unknown) {
    // ...
  } finally {
    logger.info('close steam connection');
    await connection.finalize();
  }
};

const handleAppendResult = async (pw: PendingWrite, logger: LoggerProtocol) => {
  const { appendResult, error, rowErrors } = await pw.getResult();
  if (appendResult?.offset?.value == null) {
    logger.warn('No offset returned in appendResult');
  }
  if (error != null) {
    const errorDetails =
      error.details?.map((b) => ({
        ...b,
        value: b.value?.toString(),
      })) ?? [];
    // NOTE: https://cloud.google.com/bigquery/docs/write-api-best-practices#manage_stream_offsets_to_achieve_exactly-once_semantics
    // ignore 
    if (errorDetails.some((e) => e.value?.includes('ALREADY_EXISTS'))) {
      logger.warn('error "OFFSET_ALREADY_EXISTS" occurred, safely ingored');
      return appendResult;
    }
    logger.error('error occurred while writing data', {
      ...error,
      errorDetails,
    });
    throw new GrpcStatusError(error);
  }

  if (rowErrors != null && rowErrors.length > 0) {
    const errMessages = rowErrors.map((e) => e.message);
    logger.error('rowErrors occured', { errMessages });
    throw new AggregateError(
      rowErrors,
      'RowErrors occured while inserting data',
    );
  }
  return appendResult;
};
larssn commented 2 months ago

@larssn sorry that you feel that way, we do have a team and I'm current the owner of this repo. I have been trying to keep all issues answered and resolved here, and the issue count number here is much lower than the past. The open issues that we have are mostly internal feature requests/improvements which we can't work right now due to other work streams.

We also have work on the BigQuery Storage Read API that is happening, which is going to make life easier to use that API and fetch results using less memory and much faster. Similar to this Write veneer that didn't even exists an year ago, which makes users life easier as we already have other customer successful stories.

Again sorry, for the super late reply here.

Super glad to hear to hear it, because in the past there was little to no response in this repo, and we've finally started using the Storage Write API, now that proto files are no longer needed.

Anyway, I don't want to sidetrack the issue. Thanks!