neo4j / neo4j-javascript-driver

Neo4j Bolt driver for JavaScript
https://neo4j.com/docs/javascript-manual/current/
Apache License 2.0
847 stars 148 forks source link

[Bug] Neo4jError: Ignored either because of an error or RESET #1080

Open gchicoye opened 1 year ago

gchicoye commented 1 year ago

Bug Report

While updating my Neo4J database (community edition) with the following script

const upsertPromiseArrayCreator = (dbEntries:DbEntry[], tx:ManagedTransaction)
:Promise<QueryResult>[] => dbEntries
  .filter((dbEntry) => dbEntry.action === 'UPSERT')
  .map((dbEntry) => new Promise((resolve, reject) => {
    tx.run(
      `
        MERGE (user:User{first_id: $userId })
        ON CREATE SET user.created_at = timestamp()
        ON MATCH SET user.updated_at = timestamp()
        MERGE (audience:Audience{mediarithmics_id:  $segmentId })
        ON CREATE SET audience.created_at = timestamp()
        ON MATCH SET audience.updated_at = timestamp()
        MERGE (user)-[rel:BELONGS_TO]->(audience)
        ON CREATE SET rel.created_at = timestamp()
      `,
      dbEntry,
    )
      .then((result) => resolve(result))
      .catch((e) => {
        // eslint-disable-next-line no-console
        console.log('upsertPromiseArrayCreator error');
        reject((e as Error).message);
      });
  }));

const deletePromiseArrayCreator = (dbEntries:DbEntry[], tx:ManagedTransaction)
:Promise<QueryResult>[] => dbEntries
  .filter((dbEntry) => dbEntry.action === 'DELETE')
  .map((dbEntry) => new Promise((resolve, reject) => {
    tx.run(
      `MATCH (user:User{first_id:$userId})-[belong:BELONGS_TO]->(audience:Audience{mediarithmics_id: $segmentId})
        DELETE belong`,
      dbEntry,
    )
      .then((result) => resolve(result))
      .catch((e) => {
        // eslint-disable-next-line no-console
        console.log('deletePromiseCreator error');
        reject((e as Error).message);
      });
  }));

export const registerChunk = (dbEntries:DbEntry[])
:Promise<true> => new Promise((resolve, reject) => {
  try {
    const session:Session = driver.session();
    session.executeWrite((tx) => {
      const promises = [
        ...upsertPromiseArrayCreator(dbEntries, tx),
        ...deletePromiseArrayCreator(dbEntries, tx),
      ];
      Promise.all(promises)
        .then(() => {
          session.close();
          mediarithmicsLinesCounter.inc(dbEntries.length);
          resolve(true);
        })
        .catch((e) => {
          session.close();
          // eslint-disable-next-line no-console
          console.log('Error in registerChunk Element', e.message);
          reject((e as Error).message);
        });
    });
  } catch (e) {
    // eslint-disable-next-line no-console
    console.log('Error in registerChunk', (e as Error).message);
    reject((e as Error).message);
  }
});

I get the following uncatchable error.

node:internal/process/promises:288
            triggerUncaughtException(err, true /* fromPromise */);
            ^

Neo4jError: Ignored either because of an error or RESET

    at captureStacktrace (/app/node_modules/neo4j-driver-core/lib/result.js:611:17)
    at new Result (/app/node_modules/neo4j-driver-core/lib/result.js:105:23)
    at finishTransaction (/app/node_modules/neo4j-driver-core/lib/transaction.js:488:12)
    at Object.commit (/app/node_modules/neo4j-driver-core/lib/transaction.js:301:25)
    at Transaction.commit (/app/node_modules/neo4j-driver-core/lib/transaction.js:202:37)
    at TransactionExecutor._handleTransactionWorkSuccess (/app/node_modules/neo4j-driver-core/lib/internal/transaction-executor.js:155:16)
    at /app/node_modules/neo4j-driver-core/lib/internal/transaction-executor.js:131:42
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  constructor: [Function: Neo4jError] { isRetriable: [Function (anonymous)] },
  code: 'N/A',
  retriable: false
}

Node.js v18.16.0

On my server, we get the following log

2023-04-20 14:21:47.041+0000 ERROR [bolt-1789] Terminating connection due to unexpected error
org.neo4j.bolt.protocol.error.streaming.BoltStreamingWriteException: Failed to finalize batch: Cannot write result response
        at org.neo4j.bolt.protocol.common.transaction.result.ResultHandler.onFinish(ResultHandler.java:116) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at org.neo4j.bolt.protocol.common.fsm.AbstractStateMachine.after(AbstractStateMachine.java:126) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at org.neo4j.bolt.protocol.common.fsm.AbstractStateMachine.process(AbstractStateMachine.java:99) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.lambda$submit$4(AtomicSchedulingConnection.java:113) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.executeJob(AtomicSchedulingConnection.java:336) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.doExecuteJobs(AtomicSchedulingConnection.java:270) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.executeJobs(AtomicSchedulingConnection.java:212) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
        at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: io.netty.channel.unix.Errors$NativeIoException: writevAddresses(..) failed: Broken pipe

My Environment

Javascript Runtime Version: 18.16.0 Driver Version: 5.7.0 Neo4j Version and Edition: docker Neo4j:latest - 5.6 Community edition Operating System: Debian 10 for Server, node:lts-alpine for Client (18.16.0)

gchicoye commented 1 year ago

Hey, any news on that? Thanks!

gchicoye commented 1 year ago

Hi, would it be possible to get some insight on this? Thanks!

bigmontz commented 1 year ago

Hi @gchicoye, do you have some logs or an minimal example?

You can enable the logs by creating the driver with:

const driver = neo4j.driver(<your url>, <your credentials>, {
  ...// other configurations,
  logging: neo4j.logging.console("debug")
})

See: https://neo4j.com/docs/api/javascript-driver/5.9/function/index.html#static-function-driver

gchicoye commented 1 year ago

Hi @gchicoye, do you have some logs or an minimal example?

You can enable the logs by creating the driver with:

const driver = neo4j.driver(<your url>, <your credentials>, {
  ...// other configurations,
  logging: neo4j.logging.console("debug")
})

See: https://neo4j.com/docs/api/javascript-driver/5.9/function/index.html#static-function-driver

I get a TS error here...

Property 'logging' does not exist on type '{ driver: (url: string, authToken?: AuthToken | undefined, config?: Config | undefined) => Driver; hasReachableServer: (url: string, config?: Pick<Config, "logging"> | undefined) => Promise<...>; ... 68 more ...; notificationFilterMinimumSeverityLevel: EnumRecord<...>; }'.ts(2339)
gchicoye commented 1 year ago

Hi, any idea on this?

bigmontz commented 1 year ago

Can you try to ignore the ts definition?

const driver = neo4j.driver(<your url>, <your credentials>, {
  ...// other configurations,
  // @ts-ignore
  logging: neo4j.logging.console("debug")
})

I will investigate the ts type error.

gchicoye commented 1 year ago

Hey,

Done ! I've done a lot of updates on my code, now it looks like the following

const upsertDbEntry = async (dbEntry:DbEntry, txc:Transaction)
:Promise<QueryResult> => txc.run(
  `
      MERGE (user:User{first_id: $userId })
      ON CREATE SET user.created_at = timestamp()
      ON MATCH SET user.updated_at = timestamp()
      MERGE (audience:Audience{mediarithmics_id:  $segmentId })
      ON CREATE SET audience.created_at = timestamp()
      ON MATCH SET audience.updated_at = timestamp()
      MERGE (user)-[rel:BELONGS_TO]->(audience)
      ON CREATE SET rel.created_at = timestamp()
    `,
  dbEntry,
);

const deleteDbEntry = async (dbEntry:DbEntry, txc:Transaction)
:Promise<QueryResult> => txc.run(
  `MATCH (user:User{first_id:$userId})-[belong:BELONGS_TO]->(audience:Audience{mediarithmics_id: $segmentId})
      DELETE belong`,
  dbEntry,
);

export const saveChunkData = async (dbEntries:DbEntry[]):Promise<boolean> => {
  const session = driver.session();
  const txc = session.beginTransaction();
  try {
    for (let i = 0; i < dbEntries.length; i += 1) {
      const dbEntry = dbEntries[i];
      if (dbEntry.action === 'UPSERT') upsertDbEntry(dbEntry, txc);
      if (dbEntry.action === 'DELETE') deleteDbEntry(dbEntry, txc);
      mediarithmicsLinesCounter.inc();
    }
    await txc.commit();
    return true;
  } catch (error) {
    // eslint-disable-next-line no-console
    console.log(error);
    await txc.rollback();
    // eslint-disable-next-line no-console
    console.log('rolled back');
    return false;
  } finally {
    await session.close();
  }
};

export const saveChunkDataWithRace = async (dbEntries:DbEntry[]):Promise<void> => {
  const endPromise:Promise<boolean> = new Promise((resolve) => {
    setTimeout(() => {
      resolve(false);
    }, 60000);
  });
  const promises:Promise<boolean>[] = [
    saveChunkData(dbEntries),
    endPromise,
  ];
  const result = await Promise.race(promises);
  if (!result) {
    // eslint-disable-next-line no-console
    console.log(`${new Date().toString()} - Commit timeout`);
    process.exit(1);
  }
};

I get the following logs from debug (the ts-ignore trick made the job :))

1687880367930 DEBUG Connection [0][bolt-2411] C: RUN 
      MERGE (user:User{first_id: $userId })
      ON CREATE SET user.created_at = timestamp()
      ON MATCH SET user.updated_at = timestamp()
      MERGE (audience:Audience{mediarithmics_id:  $segmentId })
      ON CREATE SET audience.created_at = timestamp()
      ON MATCH SET audience.updated_at = timestamp()
      MERGE (user)-[rel:BELONGS_TO]->(audience)
      ON CREATE SET rel.created_at = timestamp()
     {"action":"UPSERT","userId":"7c5c2e7fb4cb4eba77e2daa02a4ce301","segmentId":"36595"} {}
1687880367930 DEBUG Connection [0][bolt-2411] C: PULL {"n":{"low":1000,"high":0}}
1687880367932 DEBUG Connection [0][bolt-2411] S: SUCCESS {"signature":112,"fields":[{}]}

However, I usually cannot get the commit to perform, and so nothing happens, my timeout function is executed (nothings happens for minutes otherwise) and container restarts...

bigmontz commented 1 year ago

You should resolve the promise returned by the tx.run. You can do by:

if (dbEntry.action === 'UPSERT') await upsertDbEntry(dbEntry, txc);
if (dbEntry.action === 'DELETE') await deleteDbEntry(dbEntry, txc);