googleapis / nodejs-bigquery

Node.js client for Google Cloud BigQuery: A fast, economical and fully-managed enterprise data warehouse for large-scale data analytics.
https://cloud.google.com/bigquery/
Apache License 2.0
464 stars 210 forks source link

Memory Issue with bigQuery.createQueryStream in Node.js #1392

Closed gayathri-mandala closed 1 day ago

gayathri-mandala commented 1 month ago

The bigQuery.createQueryStream loads an entire data set into memory. When we try to retrieve the data chunk-wise, it causes a memory issue. Upon checking the heap profiles, much data is getting stored in _cachedResponse, _cachedRows, and rows.

Environment details

Steps to reproduce

Here is the sample script


const query = `Select * from table`;

async function queryBigQuery(query) {
  const bigquery = new BigQuery(creds);

  const queryStream = bigquery.createQueryStream(query);

  console.log('Query started.');

  let recordsBuffer = [];
  const batchSize = 100;

  // Process the stream
  queryStream
    .on('data', row => {
      recordsBuffer.push(row);
      if (recordsBuffer.length >= batchSize) {
        // Process the batch of records
        processBatch(recordsBuffer)
      }
    })
    .on('end', () => {
      // Process any remaining records in the buffer
      if (recordsBuffer.length > 0) {
        processBatch(recordsBuffer);
      }
      console.log('Query completed.');
    })
    .on('error', err => {
      console.error('Error during query execution:', err);
    });
}

// Function to process a batch of records
function processBatch(batch) {
  console.log(`Processing batch of ${batch.length} records.`);
}

queryBigQuery(query).catch(console.error);

When we have multiple connections, and for every connection request, the data gets loaded into memory, causing the memory size to increase.

Issue with autoPaginate I tried using the autoPaginate field: const queryStream = bigquery.createQueryStream(query, { autoPaginate: true }); However, it still behaves as if autoPaginate is set to false. Is there a way or field that allows us to retrieve the data in chunks rather than loading the entire data into memory? Reference Here it is mentioned that we need to end the stream after a certain amount of data. However, this approach could lead to data loss. How can we implement this correctly? Please provide a sample.

alvarowolfx commented 1 month ago

hey @gayathri-mandala, thanks for the report. Unfortunately, using the BigQuery REST API, we can't control the amount of rows being returned per page, so that's why even by using the createQueryStream method, we basically still fetch the query results in pages and keep in memory every page. As explained on https://github.com/googleapis/nodejs-bigquery/issues/1073#issuecomment-1073295267, the createQueryStream just creates an streamified version of the paginated calls to the getQueryResults method.

I'm doing some work integrating the BigQuery Storage Read API, that allows reading a table using gRPC and Arrow data format. That API also works in true stream like fashion. The plan is wrap this work by end of month and users will be able to use it transparently here on this package. Or users will be able to use the BigQuery Storage Read wrapper to fetch tables using it more explicitly. You can keep track here:

You can already use the BigQuery Storage Read API, but there is no client wrapper yet, which makes the developer experience not super nice. Here is a basic sample on how to use with AVRO format with the raw client (but I advise using ARROW format, I can provide a sample if you want to go down that route):

https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/quickstart.js

Also I've just opened a PR #1393 to reduce the memory footprint, by cleaning _cachedRows, _cacheRes and rows. wherever is not needed anymore.

gayathri-mandala commented 1 month ago

Thanks for the information @alvarowolfx could you please provide a sample for the ARROW format? Please update us when the open PR get merged and released.

alvarowolfx commented 1 month ago

@gayathri-mandala here is a sample for reading data with the BigQuery Storage API and in Arrow format. In this sample I show that you can read a full table directly using the BQ Storage API or run a query, obtain the destination table with results and then read with the BQ Storage API.

async function main() {
  // The read stream contains blocks of Arrow-encoded bytes.
  const {RecordBatchReader} = require('apache-arrow');
  const {Readable} = require('stream');

  // See reference documentation at
  // https://cloud.google.com/bigquery/docs/reference/storage
  const {BigQuery} = require('@google-cloud/bigquery');
  const {BigQueryReadClient} = require('@google-cloud/bigquery-storage');

  const client = new BigQueryReadClient();

  async function bigqueryStorageQuickstart() {
    // Get current project ID. The read session is created in this project.
    // This project can be different from that which contains the table.
    const myProjectId = await client.getProjectId();

    // We can run a query job and use the destionation table to read data from it.
    const bigquery = new BigQuery();
    const sqlQuery = 'SELECT name, number, state from `bigquery-public-data.usa_names.usa_1910_current` where state = "CA"';
    const [job] = await bigquery.createQueryJob({
      query: sqlQuery,
      location: 'US',
    });
    const [metadata] = await job.getMetadata();
    const qconfig = metadata.configuration.query;
    const dstTableRef = qconfig.destinationTable;
    const projectId = dstTableRef.projectId;
    const datasetId = dstTableRef.datasetId;
    const tableId = dstTableRef.tableId;

    // We cna used a fixed table.
    // This example reads baby name data from the public datasets.
    /*const projectId = 'bigquery-public-data';
    const datasetId = 'usa_names';
    const tableId = 'usa_1910_current';*/

    const tableReference = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;

    const parent = `projects/${myProjectId}`;

    /* We can limit the output columns to a subset of those allowed in the table,
     * and set a simple filter to only report names from the state of
     * Washington (WA).
     */
    const readOptions = {
      selectedFields: [],
      // selectedFields: ['name', 'number', 'state'],
      // rowRestriction: 'state = "WA"',
    };

    // API request.
    const request = {
      parent,
      readSession: {
        table: tableReference,
        // This API can also deliver data serialized in Apache Arrow format.
        // This example leverages Apache Avro.
        dataFormat: 'ARROW',
        readOptions,
      },
    };

    const [session] = await client.createReadSession(request);

    console.log(`session ${session.name} with ${session.streams.length} streams`);
    const streams = [];
    for (const readStream of session.streams) {      
      const readRowsRequest = {
        // Required stream name and optional offset. Offset requested must be less than
        // the last row read from readRows(). Requesting a larger offset is undefined.
        readStream: readStream.name,
        offset: 0,
      };
      const stream = client.readRows(readRowsRequest);
      streams.push(stream);
    }

    async function* mergeStreams(readables) {
      for (const readable of readables) {
        for await (const chunk of readable) {
          yield chunk;
        }
      }
    }

    const joined = Readable.from(mergeStreams(streams));

    const rows = [];

    joined
      .on('error', console.error)
      .on('data', data => {
        try {
          const buf = Buffer.concat([
            session.arrowSchema.serializedSchema,
            data.arrowRecordBatch.serializedRecordBatch,
          ]);
          const reader = RecordBatchReader.from(buf);
          const batches = reader.readAll();
          for (const batch of batches) {
            for (const row of batch.toArray()) {
              rows.push(row);
            }
          }
        } catch (error) {
          console.log(error);
        }
      })
      .on('end', () => {
        console.log(`Got ${rows.length} rows`);
      });
  }
  bigqueryStorageQuickstart();
}

main(...process.argv.slice(2));
jcbowman commented 3 weeks ago

We are having the same memory issues using createQueryStream.

Is there an update on this to make the data truly streamable?

alvarowolfx commented 3 weeks ago

We are having the same memory issues using createQueryStream.

Is there an update on this to make the data truly streamable?

@jcbowman with PR https://github.com/googleapis/nodejs-bigquery/pull/1393, memory usage is reduced, but still doesn't make the call 100% streamable - as explained https://github.com/googleapis/nodejs-bigquery/issues/1392#issuecomment-2223187876 that's not possible when using the BigQuery v2 REST API.

You can use the BQ Storage Read API, which I added an example here https://github.com/googleapis/nodejs-bigquery/issues/1392#issuecomment-2233724986. We are also close to landing a wrapper to make the BQ Storage Read API easier to use with PR https://github.com/googleapis/nodejs-bigquery-storage/pull/431

alvarowolfx commented 1 day ago

as we can't make the createQueryStream truly streamable without the BQ Storage Read API, I'll close this for now as is not something we can fix because of the nature of the BigQuery v2 API.