neo4j / neo4j-javascript-driver

Neo4j Bolt driver for JavaScript
https://neo4j.com/docs/javascript-manual/current/
Apache License 2.0
853 stars 148 forks source link

How can I update parent data in a stream and retrieve the updated data #1204

Open bekkazy-k opened 3 months ago

bekkazy-k commented 3 months ago

Here is a method used nodejs-driver:

public async getReviewsInStream(importId: string, handleData: Handler) {
  const rxSession = driver.rxSession({ defaultAccessMode: Neo4jSessionMode.WRITE, database });
  try {
    await new Promise<void>((resolve, reject) => {
      rxSession
        .run(
          `MATCH (i:Import)-[PART_OF]-(p:Product)-[REVIEWS]-(r:Review)-[WROTE]-(c:Customer)
                        WHERE elementId(i) = $importId
                        RETURN r, p, c`,
          { importId },
        )
        .records()
        .pipe(
          map((record) => {
            const review = record.get("r");
            const product = record.get("p");
            const customer = record.get("c");
            return { review, product, customer };
          }),
          concatMap((data) => handleData(importId, data.review, data.product, data.customer).then(data)),
          concatWith(rxSession.close()),
        )
        .subscribe({
          next: (data) => {
            // console.log(data)
          },
          complete: () => {
            resolve();
          },
          error: (err) => {
            rxSession.close();
            reject(err);
          },
        });
    });
  } catch (error) {
    console.log("stream eror:", error);
  }
}

In the handleData method, I update the statuses of Product and Customer, but in the subsequent records, they are received as not updated. Could you please advise on what I can do? Currently, even if the parent has already been updated in subsequent review nodes, I still get the old data.

bigmontz commented 3 months ago

When you call session.run(), a transaction is created in the database. So, the data in the stream are from the point where the transaction was created, no changes in future transactions will affect this stream.

A workaround you can use is keeping track of what did you changed in a local state, this way you avoid to update twice the same Node.

bekkazy-k commented 3 months ago

@bigmontz I would prefer not to store states in memory. I'm new to Neo4j; is it possible that APOC can help me with streaming reads and updates?

bigmontz commented 3 months ago

So,

if the changes are based in the content which are being processed, then you can use cypher+apoc for doing it.

If it depends on an external input, than no, unless that can be expressed with parameters to the query.