apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.24k stars 2.17k forks source link

Caused by: java.net.SocketException: Connection reset #9444

Open javrasya opened 8 months ago

javrasya commented 8 months ago

Apache Iceberg version

1.4.3 (latest release)

Query engine

Flink

Please describe the bug 🐞

I am using Flink version 1.15 and trying to consume from an Iceberg table registered on Glue catalog and located in S3. My Iceberg source IS NOT in streaming mode, but my flink app is and bounded.

The application fails roughly after 10 minutes with the following error;

I tried setting the following settings with my catalog loader but no luck;

http-client.apache.expect-continue-enabled
http-client.apache.tcp-keep-alive-enabled
http-client.apache.socket-timeout-ms
http-client.apache.connection-timeout-ms

Can someone help me to fix this problem?

Thanks in advance.

    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:156)
    at org.apache.flink.connector.base.source.hybrid.HybridSourceReader.pollNext(HybridSourceReader.java:85)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: SplitFetcher thread 11 received unexpected exception while polling the records
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
Caused by: org.apache.iceberg.exceptions.RuntimeIOException: Failed to find sync past position 0
    at org.apache.iceberg.avro.AvroIterable$AvroRangeIterator.<init>(AvroIterable.java:117)
    at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:83)
    at org.apache.iceberg.io.CloseableIterable.lambda$filter$0(CloseableIterable.java:109)
    at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72)
    at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:64)
    at org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:34)
    at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72)
    at org.apache.iceberg.flink.source.RowDataFileScanTaskReader.open(RowDataFileScanTaskReader.java:110)
    at org.apache.iceberg.flink.source.DataIterator.openTaskIterator(DataIterator.java:139)
    at org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator(DataIterator.java:129)
    at org.apache.iceberg.flink.source.DataIterator.hasNext(DataIterator.java:109)
    at org.apache.iceberg.flink.source.reader.ArrayPoolDataIteratorBatcher$ArrayPoolBatchIterator.hasNext(ArrayPoolDataIteratorBatcher.java:82)
    at org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader.fetch(IcebergSourceSplitReader.java:82)
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
    ... 6 more
Caused by: java.net.SocketException: Connection reset
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
    at java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:484)
    at java.base/sun.security.ssl.SSLSocketInputRecord.readFully(SSLSocketInputRecord.java:467)
    at java.base/sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:243)
    at java.base/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:181)
    at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:111)
    at java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1510)
    at java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1477)
    at java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:1066)
    at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
    at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
    at org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:205)
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176)
    at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
    at java.base/java.io.FilterInputStream.read(FilterInputStream.java:133)
    at software.amazon.awssdk.services.s3.checksums.ChecksumValidatingInputStream.read(ChecksumValidatingInputStream.java:112)
    at java.base/java.io.FilterInputStream.read(FilterInputStream.java:133)
    at software.amazon.awssdk.core.io.SdkFilterInputStream.read(SdkFilterInputStream.java:66)
    at org.apache.iceberg.aws.s3.S3InputStream.read(S3InputStream.java:109)
    at org.apache.iceberg.avro.AvroIO$AvroInputStreamAdapter.read(AvroIO.java:117)
    at org.apache.avro.file.DataFileReader$SeekableInputStream.read(DataFileReader.java:296)
    at org.apache.avro.file.DataFileReader$SeekableInputStream.read(DataFileReader.java:301)
    at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.read(BinaryDecoder.java:912)
    at org.apache.avro.file.DataFileReader.sync(DataFileReader.java:201)
    at org.apache.iceberg.avro.AvroIterable$AvroRangeIterator.<init>(AvroIterable.java:115)
    ... 20 more
javrasya commented 8 months ago

This happens more often when consumption rate is high which is like replaying historical messages. When I run it in unbounded streaming mode and use INCREMENTAL_FROM_EARLIEST_SNAPSHOT streaming strategy instead of batch mode, the consumption rate drops inherently and this error occurs way less in a way that my app fails but can recover from it and continues and reaches to the end, very slowly but I will take it.

Could this be happening because S3 is throttling or something, is there anyone else observed anything like this before?

Note: The upstream is committing every minute which means that we are having new snapshot every minute which can also lead too many small files and this service which is having the respective error in the original post might be needing to pull too many files and eventually hitting that connection reset issue. This is just a theory, I couldn't verify it.

javrasya commented 8 months ago

I wrote my own S3FileIO which uses a custom S3InputStream which retries when it hits a socket exception and it is all stable that way.

Here is the part I modified with a very primitive retry logic in S3InputStream.java;

    @Override
    public int read() throws IOException {
        Preconditions.checkState(!closed, "Cannot read: already closed");
        positionStream();

        pos += 1;
        next += 1;
        readBytes.increment();
        readOperations.increment();

        try {
            int read = stream.read();
            currentRetry = 0;
            return read;
        } catch (SocketException e) {
            if (currentRetry < MAX_RETRIES) {
                currentRetry++;
                LOG.debug("Reopening stream after socket exception", e);
                return read();
            } else {
                throw e;
            }
        }
    }

I think Iceberg with Flink should do that out of the box. What do you think @pvary @stevenzwu ?

pvary commented 8 months ago

@javrasya: Why isn't this issue happening outside Flink? Isn't this a more general S3 issue?

javrasya commented 8 months ago

No idea tbh. It is very hard to address. This does not even happen when I run it on a standalone Flink cluster running on my local. This happens when my app runs on AWS Managed Flink service on production. Maybe that is due to some limitation with network or something funky going on. I tried many things; tried tweaking apache http client settings like, socket timeout, tcp keep alive, max number of connection and also tried urlconnection http client instead of Apache http client, non really helped. I saw some other people facing the same issue outside of Flink, Iceberg and none of the remedies they came up with helped me except retrying. But for that I needed to modify the bit running in the iceberg-flink library.

But regardless, it feels like retrying when that happens certain times (in my code I hardcoded that to be 3 for example) brings no harm anyway in the Iceberg flink source code. It sounds like a generic feature even. Because it can also be an intermittent network issue for anyone using Iceberg with Flink and failing the entire stream for that sounds a bit harsh.

pvary commented 8 months ago

Because it can also be an intermittent network issue for anyone using Iceberg with Flink and failing the entire stream for that sounds a bit harsh.

For the record, the fix you are suggesting is in the Iceberg AWS code, not in the Flink Iceberg connector code 😄

If we do retries in Iceberg we use Tasks for retries, like: https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java#L175

javrasya commented 8 months ago

True, the same file Io can be used for Spark too, I forgot that :-) Exactly I saw that implementation for retries in the source code. Thanks for sharing it, it is a good reference. I just needed a very simple retry logic in place to see if it really helps. I think it should be implemented in that way instead.

Who do you think can be pulled in the discussions here for that? I know you guys are looking on the Flink side of Iceberg mostly. Not really sure how the contributors are grouped.

pvary commented 8 months ago

Maybe @jackye1995?

amogh-jahagirdar commented 8 months ago

I think I remember coming across a discussion about retrying at the input stream level and the general concern being that changing the FileIO to accomodate this case leads to more complexity than it's worth (avoiding FileIO become a dumping ground of many properties that users are unsure of what to tune along with maintenance complexities).

That being said, if this issue is causing problems across users it could be worth revisiting this and seeing how we can accommodate retries without complicating things too much.

amogh-jahagirdar commented 8 months ago

Let me see if I can find that discussion

amogh-jahagirdar commented 8 months ago

There's this old PR: https://github.com/apache/iceberg/pull/4912

javrasya commented 8 months ago

Thank you for jumping in @amogh-jahagirdar . The way it is not literally unusable for us so I had to write my own S3FileIO together with all the nested classes so that eventually I could put a simple retry mechanism. Not really sure how much that PR intended to cover but SocketException seems to me a kind on which should be retried and that wouldn't bring any harm.

Do you think reviving that PR or maybe just for the sake of simplicity, starting a new one and target only SocketExceptions is good idea?

shanzi commented 6 months ago

We are using the new Iceberg 1.5.0 with Flink 1.18.1 and have exactly the same issue. Is it possible that Flink will open more connections (especially with the FLIP-27 sources) or there is an http client leak?

We managed to use iceberg-flink-runtime-1.17-1.3.1 with Flink 1.18, and it works fine.

shanzi commented 6 months ago

After applied @javrasya 's fix, we no longer encounter the connection reset error, but sometimes we will get org.apache.http.conn.ConnectionPoolTimeoutException. So I guess it's very likely that there is a connection leak?

javrasya commented 6 months ago

The code I shared earlier @shanzi caused some data loss. It was not closing the currently open stream, so be careful with that. I am sorry see that my buggy code above is spreading. I have been using a safe version for a while and I got the inspiration (copied/pasted) from the PR @amogh-jahagirdar mentioned. Here is the custom S3InputFile I have been using with my Flink and Spark projects;

I have tested it with data at scale and my data loss problem went away 100% and I have not been getting that socket closed exception anymore. https://gist.github.com/javrasya/76ad0267399e379f5801a6d75c09882a

javrasya commented 6 months ago

I have a strong feeling that the Timeout error you get is also due to my buggy code. Just because it does not close the stream when socket closed exception is received, it ends up getting timeout most likely.

Otherwise, you can also play with the apache http client settings. I am defining the following ones explicitly for my Flink app;

      "http-client.apache.tcp-keep-alive-enabled": "true"
      "http-client.apache.socket-timeout-ms": "3600000"
      "http-client.apache.max-connections": "500"