ironSource / parquetjs

fully asynchronous, pure JavaScript implementation of the Parquet file format
MIT License
348 stars 175 forks source link

Extra rows are written when used with readline and small rowGroupSize #101

Closed nicwaller closed 4 years ago

nicwaller commented 4 years ago

I encountered a very strange bug using this library to generate parquet files that resulted in the output files containing duplicate rows and file sizes being massively inflated. Sometimes my output file in parquet format was 50x larger than my text input file!

I can reproduce this bug with a simple test case: read lines of text from a text file and add them to a parquet file. But I can only reproduce this when two conditions are met simultaneously:

Here's the program output:

/usr/local/bin/node /Users/nicwaller/workspace/random.surf/data/src/parquetify.js
Line> apples
Line> oranges
Line> bananas
Line> grapes
Finished writing file
{ key: 'apples' }
{ key: 'apples' }
{ key: 'oranges' }
{ key: 'apples' }
{ key: 'oranges' }
{ key: 'bananas' }
{ key: 'apples' }
{ key: 'oranges' }
{ key: 'bananas' }
{ key: 'grapes' }
{ key: 'apples' }
{ key: 'oranges' }
{ key: 'bananas' }
{ key: 'grapes' }
Finished reading file.

Process finished with exit code 0

Some additional info from parquet-tools.

Antares:data nicwaller$ parquet-tools rowcount fruits.parquet
Total RowCount: 14
Antares:data nicwaller$ parquet-tools dump fruits.parquet
row group 0
--------------------------------------------------------------------------------
key:  BINARY UNCOMPRESSED DO:0 FPO:4 SZ:32/32/1.00 VC:1 ENC:PLAIN,RLE  [more]...

    key TV=1 RL=0 DL=0
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:10 VC:1

row group 1
--------------------------------------------------------------------------------
key:  BINARY UNCOMPRESSED DO:0 FPO:59 SZ:43/43/1.00 VC:2 ENC:PLAIN,RLE [more]...

    key TV=2 RL=0 DL=0
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:21 VC:2

row group 2
--------------------------------------------------------------------------------
key:  BINARY UNCOMPRESSED DO:0 FPO:125 SZ:54/54/1.00 VC:3 ENC:PLAIN,RLE [more]...

    key TV=3 RL=0 DL=0
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:32 VC:3

row group 3
--------------------------------------------------------------------------------
key:  BINARY UNCOMPRESSED DO:0 FPO:203 SZ:64/64/1.00 VC:4 ENC:PLAIN,RLE [more]...

    key TV=4 RL=0 DL=0
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:42 VC:4

row group 4
--------------------------------------------------------------------------------
key:  BINARY UNCOMPRESSED DO:0 FPO:293 SZ:64/64/1.00 VC:4 ENC:PLAIN,RLE [more]...

    key TV=4 RL=0 DL=0
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:42 VC:4

BINARY key
--------------------------------------------------------------------------------
*** row group 1 of 5, values 1 to 1 ***
value 1:  R:0 D:0 V:apples
*** row group 2 of 5, values 2 to 3 ***
value 2:  R:0 D:0 V:apples
value 3:  R:0 D:0 V:oranges
*** row group 3 of 5, values 4 to 6 ***
value 4:  R:0 D:0 V:apples
value 5:  R:0 D:0 V:oranges
value 6:  R:0 D:0 V:bananas
*** row group 4 of 5, values 7 to 10 ***
value 7:  R:0 D:0 V:apples
value 8:  R:0 D:0 V:oranges
value 9:  R:0 D:0 V:bananas
value 10: R:0 D:0 V:grapes
*** row group 5 of 5, values 11 to 14 ***
value 11: R:0 D:0 V:apples
value 12: R:0 D:0 V:oranges
value 13: R:0 D:0 V:bananas
value 14: R:0 D:0 V:grapes

And here's the test case.

const fs = require('fs');
const readline = require('readline');
const parquet = require('parquetjs');

const schema = new parquet.ParquetSchema({
    key: { type: 'UTF8' },
});

async function main() {
    return new Promise(async (resolve, reject) => {
        const lines = readline.createInterface({
            input: fs.createReadStream('fruits.txt'),
        });
        const writer = await parquet.ParquetWriter.openFile(schema, 'fruits.parquet');
        writer.setRowGroupSize(1);
        lines.on('line', async (line) => {
            console.log(`Line> ${line}`);
            await writer.appendRow({key: line});
        });
        lines.on('close', async () => {
            await writer.close();
            resolve();
        });
    });
}

async function verify() {
    return new Promise(async (resolve, reject) => {
        let reader = await parquet.ParquetReader.openFile('fruits.parquet');
        let cursor = reader.getCursor();
        let record = null;
        while (record = await cursor.next()) {
            console.log(record);
        }
        resolve();
    });
}

main().then(() => {
    console.log('Finished writing file');
    verify().then(() => {
        console.log('Finished reading file.')
    });
});
arnabguptadev commented 4 years ago

This is similar to the issue I described here: https://github.com/ironSource/parquetjs/issues/60 (check the last comment). I also included a local fix that I tested that may work. Waiting for feedback.

asmuth commented 4 years ago

Thanks for reaching out about this and providing a repro case! I think this problem is caused by concurrent calls to appendRow, which are currently not supported. The best workaround for now is to ensure appendRow is not called concurrently, i.e. ensure that the previous call to appendRow has returned (using await) before issuing a new one.

Please also see the comments in https://github.com/ironSource/parquetjs/issues/60#issuecomment-641975079 and https://github.com/ironSource/parquetjs/pull/105#issuecomment-641991636

nicwaller commented 4 years ago

Thanks @asmuth, your evaluation was correct. Even though I was using await, my use of event emitters still meant that it was possible for multiple invocations to occur simultaneously. Although appendRow is not reentrant, I was able to work around it using the for await construct instead:

for await (const line of lines) {
    await writer.appendRow({key: line});
}