scramjetorg / scramjet

Public tracker for Scramjet Cloud Platform, a platform that bring data from many environments together.
https://www.scramjet.org
MIT License
253 stars 20 forks source link

DataStream.batch() streaming data from BigQuery #90

Closed chapmanjacobd closed 4 years ago

chapmanjacobd commented 4 years ago

If I remove the .batch() or .timeBatch() line then it works fine.

With it I get this error:

Cannot read property 'value' of undefined

      at node_modules/.pnpm/scramjet-core@4.28.2/node_modules/scramjet-core/lib/util/mk-transform.js:59:44
      at processTicksAndRejections (internal/process/task_queues.js:97:5)
        caused by:
      at DataStream.<anonymous> (src/bq-to-mssql.ts:99:55)
        --- raised in DataStream(15) constructed ---
      at new PromiseTransformStream (node_modules/.pnpm/scramjet-core@4.28.2/node_modules/scramjet-core/lib/util/promise-transform-stream.js:65:27)
      at new DataStream (node_modules/.pnpm/scramjet-core@4.28.2/node_modules/scramjet-core/lib/data-stream.js:43:9)    
      at DataStream.map (node_modules/.pnpm/scramjet-core@4.28.2/node_modules/scramjet-core/lib/data-stream.js:186:26) 

To Reproduce

import { BigQuery } from '@google-cloud/bigquery';

const bq = new BigQuery();

async function bqStreamToMSSQL(
  trx: Knex.Transaction<any, any>,
  table: string,
  query: string
) {

// BigQuery.createQueryStream: (options?: Query) => ResourceStream<any>
  return await bq
    .createQueryStream(query)
    .pipe(new DataStream({ maxParallel: 1 }))
    .timeBatch(7000, 10000)
    .map((row) => ({ ...row, at_ingest: row.at_ingest.value }))
    .do(async (row) => {
      await trx
        .delete()
        .from(table)
        .where('pk', '=', row.pk)
        .andWhere('at_ingest', '<=', row.at_ingest);

      await trx.insert(row).into(table);
    })
    .run();
}

maybe it is a limitation of the BigQuery method or maybe I'm doing this wrong

MichalCz commented 4 years ago

Ok, batch and timebatch actually turn these rows into arrays of rows so I'm guessing that's what causes the errors.

You'd need to put the timeBatch after the map there and in do use either Promise.all or better yet convert this delete a number of records with IN.

Reach out to me on slack directly I can take a look at this. :)

MichalCz commented 4 years ago

BTW, you may want to look at the nagle method - this may work more as you expected, but it wouldn't be as efficient as you could achieve using the above timeBatch with actual batched deletes and inserts.

chapmanjacobd commented 4 years ago

Yeah I decided to change my approach. I'm going with batched inserts to a temporary table outside of a transaction and then have a separate transaction to run some fast queries to delete and insert from the temp table into the destination table

MichalCz commented 4 years ago

That's cool. So I guess DELETE FROM x WHERE id IN (SELECT id FROM temp_x)?

For others, would you be so kind to post the code here when you're done?

chapmanjacobd commented 4 years ago

yeah I just do something like this:

  await Promise.all(
    sourceTables.map((table) =>
      insertBQToMSSQLTempTable({
        query: genSinceSQL(table),
        tempTableName: `temp_${table}`,
      })
    )
  );

await mssql.transaction(async function (trx) {
    for await (const table of sourceTables) {
       await atomicMoveIntoTable(trx, sourceColumns, `temp_${table}`, table);
    }
});

async function insertBQToMSSQLTempTable({
  query,
  tempTableName,
}) {
  return await bq
    .createQueryStream(query)
    .pipe(new DataStream({ maxParallel: 16 }))
    .map((row) => {
      delete row.id;

      return { ...row, ['at_bq']: row['at_bq'].value };
    })
    .timeBatch(2000, 5000)
    .do(async (rows: any[]) => {
      await mssql.raw('SET NOCOUNT ON');
      await mssql.batchInsert(tempTableName, rows, 300).catch((err) => {
        console.log(err);
        console.log(err[0]);
        console.log(err.originalError);
        console.log(err.message);
        process.exit(2);
      });
    })
    .run();
}

async function atomicMoveIntoTable(
  trx: Knex.Transaction<any, any>,
  sourceColumns: SourceColumn[],
  tempTable: string,
  destinationTable: string
) {
  await trx.raw(
    `DELETE FROM ${destinationTable} WHERE pk IN
    (select pk FROM ${tempTable})`
  );

  // specify all columns if you have any IDENTITY columns, if not you could just use `select *`
  await trx.raw(`INSERT into ${destinationTable}(${sourceColumns}) select ${sourceColumns} from ${tempTable}`);

  await trx.schema.dropTable(tempTable);
}

how do I access the originalError from within the do method?

MichalCz commented 4 years ago

You caught the error as err so your originalError is just err.

If you don't catch it (just leave it as await) then a scramjet wrapped error can be catught using a catch after the whole do, or simply by catching the error after run which returns the promise.

It's a fantastic example BTW. :)

MichalCz commented 4 years ago
    .do(async (rows: any[]) => {
      await mssql.raw('SET NOCOUNT ON');
      await mssql.batchInsert(tempTableName, rows, 300).catch((err) => {
        console.log(err);
        console.log(err[0]);
        console.log(err.originalError);
        console.log(err.message);
        process.exit(2);
      });
    })
    // here .catch(err => err.cause) // then you can still run the queries, but seems you want to fail fast
    .run()
    .then(err => err.cause /* maybe rollback the tx rather than pc exit? */)
;
chapmanjacobd commented 4 years ago

If you don't catch it (just leave it as await) then a scramjet wrapped error can be catught

ohh I see... so that's how it works

maybe rollback

ahh yes. I am worried that if I do too many inserts into SQL Server then the transaction log will run out of space. I'm not sure how it works internally so I'm being extra careful.

awaited knex.batchInsert() will actually do an implicit transaction so it will never insert only some "chunks" but not others. It has an implicit commit/rollback.

awaited mssql.transaction(async function (trx) {}); also is implicit commit/rollback.

the only leaking part here is that if you use this code you should:

1) make sure only one copy is running at a time and

2) truncate the holding temp tables first thing in case there was an error previously.