rdfjs / N3.js

Lightning fast, spec-compatible, streaming RDF for JavaScript
http://rdf.js.org/N3.js/
Other
684 stars 128 forks source link

Cannot catch parse error when using streams #389

Closed flyon closed 1 month ago

flyon commented 1 month ago

I'm using N3.StreamParser to parse and N3.Writer to write (with a write stream). At some point my process happened to crash during writing, causing an incomplete n3 file. Upon restarting this caused a parse error:

Error: Expected entity but got . on line 29.
    at N3Parser._error (/Users/rene/web/titan/node_modules/n3/lib/N3Parser.js:843:17)
    at N3Parser._readEntity (/Users/rene/web/titan/node_modules/n3/lib/N3Parser.js:187:21)
    at N3Parser._readObject (/Users/rene/web/titan/node_modules/n3/lib/N3Parser.js:318:34)
    at /Users/rene/web/titan/node_modules/n3/lib/N3Parser.js:975:127
    at emitToken (/Users/rene/web/titan/node_modules/n3/lib/N3Lexer.js:375:7)

The problem is that the error goes straight into this last resort handler:

process.on('uncaughtException', (err) => {
  console.error(chalk.red('Asynchronous error caught.'));
});

I can't seem to catch the error myself though.

Here is my code, I'm using my own DataFactory. I added several things to try and catch the error (.on('error') and try catch and .catch() the promise) but none seem to catch it.

async loadContents(): Promise<QuadSet> {
    let readStream = await this.getReadStream();

    this.contents = new QuadSet();

    let factory = new Datafactory();
    const parser = new N3.StreamParser({
      factory: factory,
    });

    function Consumer(contents) {
      const writer = new Writable({objectMode: true});
      writer._write = (quad, encoding, done) => {
        contents.add(quad);
        done();
      };
      return writer;
    }
    readStream.pipe(parser);
    parser.pipe(new (Consumer as any)(this.contents));

    return new Promise((resolve, reject) => {
      try {
        let _this = this;
        readStream.on('error',function(err) {
          console.error(err);
          reject(false);
        });
        readStream.on('end',() => {
          resolve(_this.contents);
        });
      } catch (err) {
        console.error('Could not load quads from file:', err);
        reject(false);
      }
    }).catch((err) => {
      console.error('Could not load quads from file:', err);
      return false;
    }) as Promise<any>;
  }
RubenVerborgh commented 1 month ago

No error listener attached to parser?

flyon commented 1 month ago

ah yes. I didn't realize I can listen for events on the parser. That works, thank you

      parser.on('error',function(err) {
        console.error(`Could not load quads from ${this.path}: ${err}`);
        reject(false);
      });
flyon commented 1 month ago

solved

RubenVerborgh commented 1 month ago

Yes, stream errors propagate forwards. So rather than listening to readStream, listen to parser—and the latter will pass on any errors from the former, so no need to listen to both.

flyon commented 1 month ago

gotcha, good to know. simplified it a bit now.

has some app specific code, but posting as an example of turning read stream into a write stream and using promises and error handling:

  /**
   * Reads the contents from a N3 file and loads it into a in-memory QuadSet in this store
   */
async loadContents(): Promise<QuadSet> {
    this.contents = new QuadSet();

    let factory = new Datafactory();
    const parser = new N3.StreamParser({
      factory: factory,
    });

    //takes a read stream and turns it into a write stream that adds the quads to the contents of this store
    function Consumer(contents) {
      const writer = new Writable({objectMode: true});
      writer._write = (quad, encoding, done) => {
        contents.add(quad);
        done();
      };
      writer.on("error", (err) => {
        console.error('Error writing quad:', err);
      });
      return writer;
    }
    let readStream = await this.getReadStream();
    readStream.pipe(parser);
    parser.pipe(new (Consumer as any)(this.contents));

    return new Promise((resolve, reject) => {
      let _this = this;
      //stream error propagate forwards, so we listen to the parser for errors
      parser.on('error',(err) => {
        console.error(`Could not load quads from ${this.path}: ${err}`);
        reject(false);
      });
      //when done reading, resolve the promise
      readStream.on('end',() => {
        console.log('Loaded ' + factory.quads.size + ' quads');
        resolve(_this.contents);
      });
    });
  }
RubenVerborgh commented 1 month ago

Thanks for sharing. Still a bug there: readStream.on('end' should be parser. (When the reader is done, the parser might still need to process the final bits that the reader just read.)

Simply said: whenever you pipe a first stream into a second one, the first one is dead to you. Don't use it. Only use the second one.