brianc / node-pg-copy-streams

COPY FROM / COPY TO for node-postgres. Stream from one database to another, and stuff.
331 stars 40 forks source link

`COPY FROM` fails when upgrading from v6.0.1 to v6.0.2+ #143

Closed basselworkforce closed 1 year ago

basselworkforce commented 1 year ago

We encountered an issue when updating from v6.0.1 to v6.0.2+ and executing the COPY FROM command:

import { Client } from 'pg';
import { from } from 'pg-copy-streams';
import { Readable, Writable } from 'stream';
...
const writableStream: Writable = pgClient.query(
    from(`COPY "${tableName}" FROM STDIN DELIMITER ','  NULL ''`)
);

The following error is thrown in Postgres: ERROR: COPY from stdin failed: No source stream defined

and writableStream above is undefined.

We use Sequelize to create the connection, which leverages the installed 'pg' package (https://sequelize.org/docs/v6/other-topics/dialect-specific-things/#postgresql).

One important note: we were unable to reproduce this locally (running a PostgreSQL Docker image), the issue only occurs on the Cloud SQL instance.

Environment:

Cloud SQL (Google Cloud) PostgreSQL 12.11 Node 18.12.1 Sequelize 6.25.8 pg 8.8.0 pg-copy-streams 6.0.1 -> 6.0.2+

Non-default Client options set:

statement_timeout 10,000 ms

Please let me know if I can provide any more detail, would love to resolve this and keep the library up to date.

Thank you!

jeromew commented 1 year ago

Hello,

Thanks for the report. As you know, it is hard to fix a bug that cannot be reproduced.

The error message you are getting seems to exist in pg so at least this could be a starting point (I upgraded to the latest pg 8.8.0)

node_modules/pg/lib/query.js: connection.sendCopyFail('No source stream defined') This seems to be the default implemention of

 handleCopyInResponse(connection) {
    connection.sendCopyFail('No source stream defined')
  }

probably used when the query in pg has not been setup to send data to postgres.

At first glance this is weird because this method is overriden in https://github.com/brianc/node-pg-copy-streams/blob/master/copy-from.js#L138

If any of this rings a bell to you on things you may try to observe on your side to narrow the bug tracking, tell me.

jeromew commented 1 year ago

how do you extract the pgClient from sequelize ?

jeromew commented 1 year ago

do you observe the same problem when you do not use the statement_timeout option ?

jeromew commented 1 year ago

Since 6.0.2 seems to break things for you and this version was trying to have copy-from behave correctly with the pg timeout mechanism, the problem could be coming from the timeout option & the way it is implemented inside sequelize / pg / copy-from. The implementation in https://github.com/brianc/node-pg-copy-streams/commit/b6c5c19bd3a9e5783c0da92dd39a7936221992e1 is using internal behaviour of pg.

what is the timing of the ERROR that you receive ? is Cloud SQL (Google Cloud) using a custom version of postgres ?

jeromew commented 1 year ago

The thing is that when pg.query(from) is called, from is used as if it was a Query object because it has a submit method so at this stage I do now see how connection.sendCopyFail('No source stream defined') could be called except if maybe you are using a sequelize wrapped version of pg that has its own custom implementation of query that does not know of this submittable mechanism.

Are there any other messages that are coming from postgres ?

fokopratik commented 1 year ago

how do you extract the pgClient from sequelize ?

We're using the pgClient from the Sequelize transaction object so like const pgClient = transaction.connection which then returns a PGConnection object that extends the pg client. It doesn't seem to override the query method

fokopratik commented 1 year ago

do you observe the same problem when you do not use the statement_timeout option ?

We can try that, but the error is instant which leads me to believe it might not be due to the timeout.

is Cloud SQL (Google Cloud) using a custom version of postgres ?

Don't think so, It says it's using postgres 12.11

fokopratik commented 1 year ago

The thing is that when pg.query(from) is called, from is used as if it was a Query object because it has a submit method so at this stage I do now see how connection.sendCopyFail('No source stream defined') could be called except if maybe you are using a sequelize wrapped version of pg that has its own custom implementation of query that does not know of this submittable mechanism.

Are there any other messages that are coming from postgres ?

So we did look at the Sequelize code and it doesn't seem to override the query method, so it's likely using the postgres one. And no we didn't see any more messages from Postgres, just that one

jeromew commented 1 year ago

Do you have a possibility to do a stripped down test without sequelize on your different environments ? a strange thing is the difference you observe between Google Cloud (not working) vs Docker (working)

jeromew commented 1 year ago

I am not familiar with sequelize, but maybe the code path from running query is

which means that sequelize seems to be adding a callback to what it calls connection.query, which could be the equivalent pg client.query

This needs to be analyzed but that could maybe explain a non-expected interaction with the code leading to 6.0.2 which needs to interact with the pg callback mechanism to handle the timeout mechanism.

jeromew commented 1 year ago

@fokopratik Could you try with the following code

import { Client } from 'pg';
import { from } from 'pg-copy-streams';
import { Readable, Writable } from 'stream';
...
const writableStream: Writable = from(`COPY "${tableName}" FROM STDIN DELIMITER ','  NULL ''`)
writableStream.callback = null
pgClient.query(
    writableStream
);

BTW are you sure the code on google cloud is correct & the same as what you copy pasted.

I ask because for me a test with sequelize works with

const writableStream: Writable = pgClient.query(
    from(`COPY "${tableName}" FROM STDIN DELIMITER ','  NULL ''`)
);

but outputs a ERROR: COPY from stdin failed: No source stream defined when using

const writableStream: Writable = pgClient.query(
    (`COPY "${tableName}" FROM STDIN DELIMITER ','  NULL ''`)
);
fokopratik commented 1 year ago

Do you have a possibility to do a stripped down test without sequelize on your different environments ? a strange thing is the difference you observe between Google Cloud (not working) vs Docker (working)

Yeah the weird part is this only seems to happen on our servers on google cloud, can't reproduce this locally. So we did do a test removing Sequelize for this query, so instead of getting pg from sequelize transaction, we initialized a new pg client and used that to run the copy command but it still gives us the same exact error.

fokopratik commented 1 year ago

We tried setting the callback to null as well but we got an error on line 156 in the copy-from, where it tried to use that callback.

We can check if the code running is the same. For the test you did for Sequelize, the difference is passing a string vs a CopyStreamQuery object? When we pass string it gives us that same error but our logs showed that the object returned from the from that's being passed as an arg to the query is a CopyStreamQuery obj, seems like something weird happens in the query method

jeromew commented 1 year ago

Indeed, when calling pgClient.query(something), when something does not have a submit method, a regular pg.Query object is created that sets up the ERROR: COPY from stdin failed: No source stream defined for when a COPY operation is started.

In pgClient.query(from("COPY ...")) the CopyStreamQuery has submit and handle everything. In pgClient.query("COPY..."), pg starts a COPY operation, the database sends the CopyIn message, and then pg reacts with a CopyFail "No source stream defined" message, which makes the database complain with the message.

So the question is how does the code on Google Cloud ends up with a regular pg.Query object when it receives CopyIn message instead of the CopyStreamQuery that is created.

My 2 cts in the realm of weird ideas, but could there be a process / code rewriter somewhere in your setup on Google Cloud that thinks that "from" is a reserved keyword and removes it from the call (?#).

marcosbaroni commented 1 year ago

So the question is how does the code on Google Cloud ends up with a regular pg.Query object when it receives CopyIn message instead of the CopyStreamQuery that is created.

We confirmed that we do send to pgClient.query() a CopyStreamQuery with it's .submit function defined, which makes unclear why the error being triggered is No source stream defined which is from the pg.Query. One hypothesis I had is by the time the pgClient receives the copyInResponse event from postgres backend, for some unknown reason, the activeQuery is not the CopyStreamQuery but a regular pg.Query (pg/lib/client.js L.384); something that doesn't happen without that this.callback/timeout_mechanism.

jeromew commented 1 year ago

You could maybe print the query.text attribute of the Query object that triggers the copyFail to maybe have a hint of which query is active (by "patching" handleCopyInResponse in node_modules/pg/lib/query.js)

Do you have a stripped down code block that triggers the error ? It could be related to a subtle difference in the sequence of messages between pg and postgres in Google Cloud but at this stage I don't know.

Is it simple to setup & reproduce the problem in a fresh Google Cloud test environment ?

jeromew commented 1 year ago

@fokopratik, @marcosbaroni do you confirm that 6.0.2+ works when the timeout option is not set ? This is to confirm that the problem lies around the callback/timeout mechanism.

jeromew commented 1 year ago

maybe you could also add an error handler on writableStream to see if an unexpected error could be happening. An error could maybe be the reason for a switch of activeQuery inside pg

fokopratik commented 1 year ago

Is it simple to setup & reproduce the problem in a fresh Google Cloud test environment ?

We'll have to look into if we can reproduce this is a simple env on g cloud, right now we were trying to reproduce it locally but not happening.

Do you have a stripped down code block that triggers the error ? It could be related to a subtle difference in the sequence of messages between pg and postgres in Google Cloud but at this stage I don't know.

Can you elaborate on this? How does sequence of messages impact this?

fokopratik commented 1 year ago

maybe you could also add an error handler on writableStream to see if an unexpected error could be happening. An error could maybe be the reason for a switch of activeQuery inside pg

So we tried doing that but the object that's returned by the .query is undefined so if we do writable.on('error') it fails.

This is what's bugging me a little bit, we verified that we're able to create a proper CopyStreamQuery object that has a submit function which will be then passed as an argument to the query method. So then shouldn't it return a CopyStreamQuery (based on the if statement on line 521) instead of an undefined. I don't see anything else that could set result to undefined here

fokopratik commented 1 year ago

@fokopratik, @marcosbaroni do you confirm that 6.0.2+ works when the timeout option is not set ? This is to confirm that the problem lies around the callback/timeout mechanism.

Just to clarify are you explicitly talking about query_timeout config? We tried removing the statement timeout but that didn't do anything. We don't set a query_timeout config right now

jeromew commented 1 year ago

@fokopratik I am only trying to find ideas for you to test as I am totally blind here. If you have a box I can log into I can connect to it.

regarding the timeout, 6.0.2 only differs from 6.0.1 on this callback/timeout mechanism so it can be worth zooming on L536 which in my mind is where a difference could happen with or without timeout beeing set.

I agree that result should be the CopyStreamQuery object. It is in my local sequelize test.

Can you elaborate on this? How does sequence of messages impact this?

by stripped down version, I mean a minimal test that highlights the bug and that could be reproduced. At this stage I have no clue what is happening. It could be a bug in your code, a bug in pg, a bug in node-copy-streams, a patch by google in the postgres version that they use, .. pg has a lot of internal mechanisms to "make it happen" and "keep the lights on" when unexpected things occur and a simple "non standard" emission of a postgres protocol message like ReadyForQuery could explain the drift from CopyStreamQuery to Query. There is something fishy going on if you cannot reproduce it locally. Now we just need to understand what this is.

the result = undefined could be interesting to follow ; it does not seem to be possible if the code path is the one we think it is in query.js.

jeromew commented 1 year ago

How to you think we could solve your issue ?

Here is the small sequelize snippet that I use locally to show that no error occurs.

const { Sequelize } = require('sequelize');
const from = require('./copy-from.js');
const sequelize = new Sequelize('postgres://localhost:5432/postgres', { dialectOptions: { query_timeout: 10000 }});
(async function() {
 await sequelize.authenticate();
 //await sequelize.query(`CREATE TABLE testnumber (num int)`);
 const writableStream = from(`COPY "testnumber" FROM STDIN DELIMITER ','  NULL ''`)
 const t = await sequelize.transaction();
 const pgClient = t.connection;
 pgClient.query(writableStream);
 writableStream.on('finish', async function () {
   await t.commit();
   await sequelize.close();
 })
 writableStream.end();
})();

can you test it in your different environments to see if/when you can reproduce the issue you mentioned ?

fokopratik commented 1 year ago

Yeah we're looking into how we can replicate it in an isolated environment but it's not as straightforward as we'd need to involve our prod eng team to help setup/replicate this env and give access and all and hope that we can reproduce this bug there. If needed we can go down this route but I'd like to exhaust other options first. I'll try out the code you pasted above, ours is very similar to it.

FYI, here's the code we're using

const pgClient = new Client({
    host: config.db.hostMaster,
    port: config.db.port,
    user: config.db.user,
    password: config.db.password,
    database: config.db.name,
    ssl: config.db.ssl
});
await pgClient.connect();

await new Promise((resolve, reject) => {
    const writableStream: Writable = pgClient.query(
        from(`COPY "${tableName}" FROM STDIN DELIMITER ','  NULL ''`)
    );

    const readableStream = new Readable();

    readableStream.on('error', err => {
        reject(err);
    });

    writableStream.on('error', err => {
        reject(err);
    });

    writableStream.on('finish', () => {
        readableStream.destroy();

        resolve(undefined);
    });

    readableStream.pipe(writableStream, { end: true });

    data.forEach(d => readableStream.push(d));
    readableStream.push(null);
});
fokopratik commented 1 year ago

How to you think we could solve your issue ?

Right now we're looking into the .query method to see if there's something that can affect the result var or mess with the callback but nothing jumps out. We tried this with and without read_timeout and behaviour remained the same so maybe the error could be somewhere else? I'm also trying to see why result gets set to undefined but it's hard to see/log what's going on inside that method.

One thing we're also trying is to override the two method that use the .callback and see if removing it from one of those places fixes it

jeromew commented 1 year ago

the difference between 6.0.1 and 6.0.2 is very small ; maybe you could confirm that patching 6.0.1 manually leads to the problem (?) https://github.com/brianc/node-pg-copy-streams/commit/b6c5c19bd3a9e5783c0da92dd39a7936221992e1

do you have the same typescript version / pipeline on Google Cloud & locally ? I ask because I saw that "from" is a somewhat reserved keyword in typescript and wondered if that could lead to a bug. Of course the probability of such a bug existing is very low..

What seems weird to me is that the COPY operation seems to be sent to postgres since postgres answers with a copyIn message. How the CopyStreamQuery can morph into a Query and then into an undefined is somewhat of a mystery to me when I read the code.

fokopratik commented 1 year ago

We added logging in the callback and now we're able to see a better error obj with more data. Screen Shot 2023-01-30 at 3 09 43 PM

We've noticed that in the error it's referencing to the parser.ts file, is that expected? I thought it'd be looking at the compiled js file

jeromew commented 1 year ago

for the reference to .ts files, I don't know. It could be because of some sourcemap feature of ts so that node show the source line in the original ts file. The parser code is simply here to parse the postgres protocol message that corresponds to the Error that is thrown by postgres.

So this is the stack when node receives the error from postgres. What is "TaskRoundParticipantGroup" ? is that the name of your table in COPY "${tableName}" FROM STDIN DELIMITER ',' NULL ''

It does not seem to say much more.

fokopratik commented 1 year ago

So this is the stack when node receives the error from postgres. What is "TaskRoundParticipantGroup" ? is that the name of your table in COPY "${tableName}" FROM STDIN DELIMITER ',' NULL ''

Yes TaskRoundParticipantGroup is the table name. Could the error code 57014 tell us where to look enxt, upon googling it looks like query was cancelled but not much of an indication on why it would be cancelled

jeromew commented 1 year ago

No I don't think so. "[ERROR: 57014: canceling statement due to user request]" I think that you receive this because the client sent a copyFail to the server. And the client sent a copyFail to the server because the server sent a copyIn to the client. And the server sent a copyIn to the client because it starts to accept data from the client during a COPY operation. And it starts to accept data during a copy operation because it received a COPY request.

Now the whole question is why the client object that handles the copyIn does not seem to be the copyStreamQuery that was prepared.

jeromew commented 1 year ago

an idea could be to patch the query with something like

const originalQuery = pgClient.query.bind(pgClient);
 pgClient.query = function (config, values, callback) {
   const result = originalQuery(config, values, callback);
   console.log(this.activeQuery.text);
   console.log(this.activeQuery)
   return result;
 }

and see what it outputs.

jeromew commented 1 year ago

you probably need to understand

regarding the .ts file, I don't know if there is something to dig or not that could explain the issue. On my local, in node_modules/pg/lib I only have .js files so I don't know where a source map would come from. Are you using node or deno ? or maybe some custom typescript stack.

fokopratik commented 1 year ago

yeah we're using the patch you suggested to get some info for those 2 things. We're using node and typescript and we do have sourcemap installed in our stack so maybe that's where it used it, I wasn't expecting it in prod so figured I'd mention it. I'm not sure what you mean by custom typescript stack

jeromew commented 1 year ago

Did you find any clue ?

Can you post what is logged on your Google Cloud instance with console.log(pgClient.query.toString()) to see what code is run when query is called ?

I am wondering whether your are using pg-native because code paths and implementations are different. Are you using pg-native or could it be in some way installed on your Google Cloud instance ?

basselworkforce commented 1 year ago

These are the logs we got by adding those statements:

this.activeQuery.text

COPY "TaskRoundParticipantGroup" FROM STDIN DELIMITER ',' NULL ''

this.activeQuery

Query {
    _events: [Object: null prototype]{},
    _eventsCount: 0,
    _maxListeners: undefined,
    text: `COPY "TaskRoundParticipantGroup" FROM STDIN DELIMITER ',' NULL ''`,
    values: undefined,
    rows: undefined,
    types: undefined,
    name: undefined,
    binary: undefined,
    portal: '',
    callback: [Function: contextWrapper],
    _rowMode: undefined,
    _result: Result {
        command: null,
        rowCount: null,
        oid: null,
        rows: [],
        fields: [],
        _parsers: undefined,
        _types: TypeOverrides {
            _types: [Object],
            text: {},
            binary: {}
        },
        RowCtor: null,
        rowAsArray: false
    },
    _results: Result {
        command: null,
        rowCount: null,
        oid: null,
        rows: [],
        fields: [],
        _parsers: undefined,
        _types: TypeOverrides {
            _types: [Object],
            text: {},
            binary: {}
        },
        RowCtor: null,
        rowAsArray: false
    },
    isPreparedStatement: false,
    _canceledDueToError: false,
    _promise: null,
    [Symbol(kCapture)]: false
}

callback error

error: COPY from stdin failed: No source stream defined
at Parser.parseErrorMessage(/app/node_modules / pg - protocol / src / parser.ts: 369: 69)
at Parser.handlePacket(/app/node_modules / pg - protocol / src / parser.ts: 188: 21)
at Parser.parse(/app/node_modules / pg - protocol / src / parser.ts: 103: 30)
at Socket. < anonymous > (/app/node_modules / pg - protocol / src / index.ts: 7: 48)
at Socket.emit(node: events: 513: 28)
at addChunk(node: internal / streams / readable: 324: 12)
at readableAddChunk(node: internal / streams / readable: 297: 9)
at Socket.Readable.push(node: internal / streams / readable: 234: 10)
at TCP.onStreamRead(node: internal / stream_base_commons: 190: 23)
at TCP.callbackTrampoline(node: internal / async_hooks: 130: 17) {
    length: 142,
    severity: 'ERROR',
    code: '57014',
    detail: undefined,
    hint: undefined,
    position: undefined,
    internalPosition: undefined,
    internalQuery: undefined,
    where: 'COPY TaskRoundParticipantGroup, line 1',
    schema: undefined,
    table: undefined,
    column: undefined,
    dataType: undefined,
    constraint: undefined,
    file: 'copy.c',
    line: '680',
    routine: 'CopyGetData'
}
basselworkforce commented 1 year ago

For reference this is the code block:

    const fromQuery = from(`COPY "${tableName}" FROM STDIN DELIMITER ','  NULL ''`);
    fromQuery['callback'] = (e?: any) => {
        console.log('---callback---');
        console.log('callback ran');
        console.log(e);
    };
    console.log(fromQuery);

    const originalQuery = pgClient.query.bind(pgClient);
    (pgClient as any).query = function (config: any, values: any, callback: any) {
        const result = originalQuery(config, values, callback);
        console.log('---activeQuery---');
        console.log(this.activeQuery.text);
        console.log(this.activeQuery);
        console.log('---end of activeQuery---');
        return result;
    };

    const writableStream = pgClient.query(fromQuery);
    console.log('---writableStream---');
    console.log(writableStream);
jeromew commented 1 year ago

As we already observed, the strange thing is the Query object in this.activeQuery. It has the same .text as the CopyStreamQuery so somehow CopyStreamQuery is morphed into Query or the query text is copied from CopyStreamQuery to another Query object.

maybe you could log console.log(config); at the entry of query to make sure that fromQuery is a CopyStreamQuery and console.log(result) right after the call to originalQuery to see what result we get.

and more importantly log console.log(pgClient.query.toString()) to maybe have a clue of what code is executed inside pgClient.query to compare it to what can be seen in node_modules/pg/lib/client.js.

basselworkforce commented 1 year ago

So maybe this condition returns false on pg client for some reason: else if (typeof config.submit === 'function') { https://github.com/brianc/node-postgres/blob/master/packages/pg/lib/client.js#L520 which leads to query = new Query(config, values, callback) being called (to create a regular Query instead of CopyStreamQuery): https://github.com/brianc/node-postgres/blob/master/packages/pg/lib/client.js#L528

will add some more logging to validate that theory in addition to what you mentioned above

jeromew commented 1 year ago

Indeed, that could maybe explain we you observe

maybe you can also log typeof config.submit but I am interested to know what is console.log(config)

an option is also that the code is different than what we see on github (hence the need to log pgClient.query.toString()

jeromew commented 1 year ago

could be a problem with the typescript typeguard system maybe ? https://basarat.gitbook.io/typescript/type-system/typeguard

this is related to

I'm not sure what you mean by custom typescript stack

I am not familiar with the impact that typescript can have at runtime. I hope it is not some kind of typescript runtime that has this kind of side-effect

basselworkforce commented 1 year ago

So guess what, query is modified.. 🤦‍♂️

We use a tracing library with google cloud that overrides the query function: https://github.com/open-telemetry/opentelemetry-js-contrib/blob/main/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts#L170

Thank you for suggesting to check that! And for all your help in tracking this down

basselworkforce commented 1 year ago

There is an open issue for this on that package: https://github.com/open-telemetry/opentelemetry-js-contrib/issues/991

jeromew commented 1 year ago

quantum telemetry where the observer modifies the system it observes ! at least we have an explanation for what is happening.