TimonKK / clickhouse

NodeJS client for ClickHouse
Apache License 2.0
220 stars 122 forks source link

Pipe readable stream across writable stream #131

Open SudhakarSA opened 1 year ago

SudhakarSA commented 1 year ago

Here is my table schema


eventDate          Date    
eventDateTime  DateTime
numericValue     UInt32  

And i am trying to write data from one table to another. Below is my code

let sql  = "select toDate(now()) as eventDate, now() as eventDateTime, 1 as numericValue";

const rs = connection.query(sql).stream();

const tf = new stream.Transform({
    objectMode : true,
    transform  : function (chunk, enc, cb) {

        cb(null, JSON.stringify(chunk));
    }
});

const ws = connection.insert('INSERT INTO demo.testing_table').stream();
const result = await rs.pipe(tf).pipe(ws).exec();

I am getting this error Error:

Error: Cannot parse input: expected '\t' before: 'ate":"2022-08-16","eventDateTime":"2022-08-16 13:42:35","numericValue":1}': (at row 1)
    at getErrorObj (/home/ubuntu/development/git/atatus/atatus-ch-migration/node_modules/clickhouse/index.js:230:14)
    at IncomingMessage.<anonymous> (/home/ubuntu/development/git/atatus/atatus-ch-migration/node_modules/clickhouse/index.js:341:9)
    at IncomingMessage.emit (node:events:402:35)
    at endReadableNT (node:internal/streams/readable:1343:12)
    at processTicksAndRejections (node:internal/process/task_queues:83:21) {
  code: 27
}
TimonKK commented 1 year ago

In code

cb(null, JSON.stringify(chunk));

you pass data as JSON. But internal format is TSV. So If you pass like

cb(null, [chunk.eventDate, chunk.eventDateTime, data.numericValue].join('\t'));

it must work

SudhakarSA commented 1 year ago

Yes. this works for me. In docs, i see JSON.stringify instead of tab separated values as internal format is TSV

Mauzzz0 commented 1 year ago

cb(null, [chunk.eventDate, chunk.eventDateTime, data.numericValue].join('\t'));

You forgot about '\n', full working code looks like this:

cb(null, [chunk.eventDate, chunk.eventDateTime, data.numericValue].join('\t') + '\n');