rdfjs / N3.js

Lightning fast, spec-compatible, streaming RDF for JavaScript
http://rdf.js.org/N3.js/
Other
676 stars 127 forks source link

Slow operation inside Stream Parser onData #294

Closed marcelomachado closed 2 years ago

marcelomachado commented 2 years ago

I am trying to read a big RDF file in parts, so I am using stream. I would like to sequentially store a CHUNK_SIZE amount in an array and after doing a slow operation using that array, I would like to free it up to get a new chunk of CHUNK_SIZE size.

The following code shows what I am trying to achieve:

const streamParser = new StreamParser();
const inputStream = Readable.from(file.data);
inputStream.pipe(streamParser);
const CHUNK_SIZE = 100000;
let counter = 1;
let storage = [];
streamParser.on("data", async (statement) =>
{
    storage.push(statement);
    if(counter === CHUNK_SIZE)
    {            
        await slowOperation(storage);
        storage = []; // free storage
        counter = 0;        
    };
     counter++;
});

streamParser.on('end', async () => 
{
     await slowOperation(storage);              
});

However, onData returns a statement as soon as it is read and therefore the counter is lost and therefore this strategy does not work.

Anyway, my question is how to solve this problem.

RubenVerborgh commented 2 years ago

Hi @marcelomachado,

Encapsulate your logic into a Node.js Stream Transformer and pipe the parsed stream into it. Call the callback after you've stored a triple, or when the processing of the chunk is done. That will pause the stream until processing is done. (streamParser.pause and . resume before and after your await might achieve the same thing, but that's more of a hack)

marcelomachado commented 2 years ago

Hi @RubenVerborgh thank you for your answer.

Could you please help me with some code?

Here, is what I've just tried:

const CHUNK_SIZE = 100000;

const streamParser = new StreamParser();
const inputStream = Readable.from(file.data);
inputStream.pipe(streamParser);
let storage = [];
const slowConsumer = new Transform({
    async transform(chunk, encoding, callback) {
        storage.push(statement);
        if(storage.length === CHUNK_SIZE)
        {            
                await slowOperation(storage);
                storage = []; // free storage   
        };
        callback();
    },
});

streamParser.pipe(slowConsumer);

However, it gives me the error:

The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Quad

Besides, how to get the end event? I mean, I need to know when all the chunks were read since the storage may still have elements not flushed.

I also tried


const CHUNK_SIZE = 100000;

const streamParser = new StreamParser();
const inputStream = Readable.from(file.data);
inputStream.pipe(streamParser);

let storage = [];
streamParser.pipe(slowConsumer(storage));
streamParser.on("end", () => {
    await slowOperation(storage);
});

slowConsumer(storage) {
    const writer = new Writable({ objectMode: true });
    writer._write = async (chunk, encoding, done) => {
        graphToImport.graph.addQuad(chunk);
        if(storage.length === CHUNK_SIZE)
        {   
            writer.cork();  
            await slowOperation(storage);
            storage = []; // free storage   
            process.nextTick(() => {
                writer.uncork();
            });
        }
        done();
    };

    return writer;
}

However, the onEnd doesn't wait till chunk is read and processed in the slowOperation.

jeswr commented 2 years ago

gives me the error:

In your first code snippet, you need to set objectMode: true in the Transform options (https://nodejs.org/api/stream.html#object-mode)


Another option entirely is to do everything pull-based using stream.read().

marcelomachado commented 2 years ago

After @jeswr answer, the code was like this:


const CHUNK_SIZE = 100000;

const streamParser = new StreamParser();
const inputStream = Readable.from(file.data);
inputStream.pipe(streamParser);
let storage = [];
const slowConsumer = new Transform({
    objectMode: true,
    async transform(chunk, encoding, callback) {
        storage.push(statement);
        if(storage.length === CHUNK_SIZE)
        {            
                await slowOperation(storage);
                storage = []; // free storage   
        };
        callback();
    },
    flush(callback)
    {       
      await slowOperation(storage);
      callback();
    }
});

streamParser.pipe(slowConsumer);

However it is not working as I would like.

I tested it with a small file with only 6 statements and put a CHUNK_SIZE = 3. So, the statements were processed 3 by 3. It worked as I wanted.

But when I went to test with a large file the process stopped suddenly after counting few statements. Strangely whenever the counter reaches the number 16 (for any file that has more than 16 statements) the reading of chunks stops. It doesn't matter what I do in the write method it always stop the reading on chunk 16. Next I send some screenshots.

Screen Shot 2022-05-06 at 04 20 57 Screen Shot 2022-05-06 at 04 21 46

Do you have any idea what could be going on?

marcelomachado commented 2 years ago

Thanks to @jeswr idea of using stream.read() now everything is working as expected. Below I present the final code:

const streamParser = new StreamParser();
const inputStream = Readable.from(file.data);
inputStream.pipe(streamParser);

let storage = [];
streamParser.on('readable', async () => {
    let chunk;
    while (null !== (chunk = streamParser.read())) {
        storage.push(chunk);
        if( storage.length === CHUNK_SIZE)
        {   
            await slowOperation(storage);   
            storage = [];
        }
    }
});

streamParser.on('end', async () => {
    if(storage.length > 0)
    {
        await slowOperation(storage);                       
    }
});

However, I am still curious about what is happening in the case presented in my last commentary =D

Again, thank you @RubenVerborgh and @jeswr .

jeswr commented 2 years ago

However, I am still curious about what is happening in the case presented in https://github.com/rdfjs/N3.js/issues/294#issuecomment-1119314861 =D

I can't see you attaching any listeners to the stream or calling resume so it's not actually in flow mode as far as I can tell. I assume that this means that it is just buffering 16 elements through the transformer and then waiting for data to be .read() or be put into flow mode which happens when you attach a 'data' event listener, or call .resume() before it processes any more data.

RubenVerborgh commented 2 years ago

Indeed; you were creating the stream but not consuming it. The 16 iterations are the 16 internal buffer spaces filling up.