apache / parquet-java

Apache Parquet Java
https://parquet.apache.org/
Apache License 2.0
2.65k stars 1.41k forks source link

DictionaryFilter.canDrop may return false positive result when dict size exceeds 8k #3040

Closed pan3793 closed 2 weeks ago

pan3793 commented 2 weeks ago

Describe the bug, including details regarding any error messages, version, and platform.

Background

I get some data loss reports after upgrading the internal Spark's Parquet from 1.13.1 to 1.14.3, after some experiments, I believe this should be a bug on the Parquet side, and it could be worked around by disabling spark.sql.parquet.filterPushdown.

Analysis

With some debugging, I think the issue was introduced by PARQUET-2432(https://github.com/apache/parquet-java/pull/1278).

The issue is, during the evaluation of DictionaryFilter.canDrop(this happens when reading a column that has PLAIN_DICTIONARY with pushed predications), when dict size exceeds 8k, only the head 8k was copied

https://github.com/apache/parquet-java/blob/274dc51bc9e5cc880ba3c77c3db826d2a4943965/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java#L113

the correct data

...
|00001fd0| 34 32 35 34 30 39 0a 00 00 00 37 32 39 33 39 38 |425409....729398|
|00001fe0| 39 35 33 30 0a 00 00 00 39 38 32 30 31 36 39 36 |9530....98201696|
|00001ff0| 34 34 0a 00 00 00 37 33 39 31 32 34 37 38 30 36 |44....7391247806|
|00002000| 0b 00 00 00 31 32 35 32 35 31 36 31 34 31 36 0a |....12525161416.|
|00002010| 00 00 00 38 37 38 35 35 34 34 36 34 35 0b 00 00 |...8785544645...|
|00002020| 00 31 32 32 38 30 38 37 35 39 30 32 0b 00 00 00 |.12280875902....|
...

the copied data

...
|00001fd0| 34 32 35 34 30 39 0a 00 00 00 37 32 39 33 39 38 |425409....729398|
|00001fe0| 39 35 33 30 0a 00 00 00 39 38 32 30 31 36 39 36 |9530....98201696|
|00001ff0| 34 34 0a 00 00 00 37 33 39 31 32 34 37 38 30 36 |44....7391247806|
|00002000| 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
|00002010| 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
|00002020| 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
...

the root cause is https://github.com/apache/parquet-java/blob/274dc51bc9e5cc880ba3c77c3db826d2a4943965/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java#L379

may not read fully if the underlying InputStream's available method always returns 0

    private static class ReadableByteChannelImpl
        extends AbstractInterruptibleChannel    // Not really interruptible
        implements ReadableByteChannel
    {
     ...
        private static final int TRANSFER_SIZE = 8192;

        public int read(ByteBuffer dst) throws IOException {
            if (!isOpen()) {
                throw new ClosedChannelException();
            }

            int len = dst.remaining();
            int totalRead = 0;
            int bytesRead = 0;
            synchronized (readLock) {
                while (totalRead < len) {
                    int bytesToRead = Math.min((len - totalRead),
                                               TRANSFER_SIZE);
                    if (buf.length < bytesToRead)
                        buf = new byte[bytesToRead];
                    if ((totalRead > 0) && !(in.available() > 0))
                        break; // block at most once
                    try {
                        begin();
                        bytesRead = in.read(buf, 0, bytesToRead);
                    } finally {
                        end(bytesRead > 0);
                    }
                    if (bytesRead < 0)
                        break;
                    else
                        totalRead += bytesRead;
                    dst.put(buf, 0, bytesRead);
                }
                if ((bytesRead < 0) && (totalRead == 0))
                    return -1;

                return totalRead;
            }
        }
        ...
    }

Component(s)

Core