Open paulpaul1076 opened 4 months ago
Premature end of Content-Length delimited message body (expected: 297,221,738; received: 143,086,296)
is likely indicating that either the client or the server prematurely closed the connection before retrieving/sending all the data.
This is coming from the Apache HTTP client being used by the S3 client when it talks to S3.
I'd suggest to take a look at the timeout settings in https://github.com/apache/iceberg/blob/29eebdd89efdcb9fb32e53325dfd2b29b9a3377b/aws/src/main/java/org/apache/iceberg/aws/HttpClientProperties.java#L83-L155 and play around with those.
Another theory would be that maybe the S3 client gets GC'd prematurely when you call the Spark procedure and thus ends up with that error.
If none of that helps, another alternative would be to switch from Apache HTTP client to url-connection-client via http-client.type=urlconnection
(see L37ff in HttpClientProperties
)
@nastra the Scala code works fine, the problem is inside iceberg. It's Spark SQL that fails with this exception. I reproduced this 100 times, Scala works, Spark SQL fails.
I saw the code in iceberg that converts Spark SQL into RewriteDataFiles action, maybe there's a bug there, in that conversion.
@paulpaul1076 do you have a chance to try with http-client.type=urlconnection
? It's of course also possible that there's a bug in RewriteDataFilesProcedure
that went unnoticed.
If you have an easy way to reproduce this and if you're interested in contributing, it would be great if you could submit a patch to address this issue (PRs are always welcome) and we'll get somebody to review it.
Yea, I can try with that setting, where do I set it, by the way? Do I have to rebuild iceberg jars?
The problem is not the RewriteDataFiles Spark action, it's the procedure, from SQL. The Spark action works, like I said, when i call it directly from Scala/Java.
I just need to set a spark option like this, right:
spark.sql.catalog.my_catalog.http-client.type=urlconnection
?
Looks like iceberg-aws-bundle doesn't have this class:
Exception in thread "main" java.lang.NoClassDefFoundError: software/amazon/awssdk/http/urlconnection/UrlConnectionHttpClient$Builder
Should I just add the awsask to classpath also? I thought that iceberg-aws-bundle was supposed to include all that.
Anyways, got it to work, now there's a similar exception, but written a bit different:
org.apache.iceberg.exceptions.RuntimeIOException: java.io.EOFException: Reached the end of stream with 8009878 bytes left to read
at org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:149) ~[iceberg-spark-runtime-3.5_2.12-1.4.3.jar:?]
at org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:126) ~[iceberg-spark-runtime-3.5_2.12-1.4.3.jar:?]
at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:65) ~[iceberg-spark-runtime-3.5_2.12-1.4.3.jar:?]
at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:49) ~[iceberg-spark-runtime-3.5_2.12-1.4.3.jar:?]
at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:140) ~[iceberg-spark-runtime-3.5_2.12-1.4.3.jar:?]
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120) ~[spark-sql_2.12-3.5.0.jar:3.5.0]
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158) ~[spark-sql_2.12-3.5.0.jar:3.5.0]
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.5.0.jar:3.5.0]
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.5.0.jar:3.5.0]
at scala.Option.exists(Option.scala:376) ~[scala-library-2.12.18.jar:?]
Looks like iceberg-aws-bundle doesn't have this class:
Exception in thread "main" java.lang.NoClassDefFoundError: software/amazon/awssdk/http/urlconnection/UrlConnectionHttpClient$Builder
Should I just add the awsask to classpath also? I thought that iceberg-aws-bundle was supposed to include all that.
Ah yeah this is a bug unfortunately and we need to add this to the aws-bundle
, which I've done in https://github.com/apache/iceberg/pull/9685. In the meantime you'd have to either add the AWS SDK or just the url-connection-client dependency
Anyways, got it to work, now there's a similar exception, but written a bit different:
org.apache.iceberg.exceptions.RuntimeIOException: java.io.EOFException: Reached the end of stream with 8009878 bytes left to read at org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:149) ~[iceberg-spark-runtime-3.5_2.12-1.4.3.jar:?] at org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:126) ~[iceberg-spark-runtime-3.5_2.12-1.4.3.jar:?] at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:65) ~[iceberg-spark-runtime-3.5_2.12-1.4.3.jar:?] at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:49) ~[iceberg-spark-runtime-3.5_2.12-1.4.3.jar:?] at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:140) ~[iceberg-spark-runtime-3.5_2.12-1.4.3.jar:?] at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120) ~[spark-sql_2.12-3.5.0.jar:3.5.0] at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158) ~[spark-sql_2.12-3.5.0.jar:3.5.0] at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.5.0.jar:3.5.0] at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.5.0.jar:3.5.0] at scala.Option.exists(Option.scala:376) ~[scala-library-2.12.18.jar:?]
Ok thanks for checking. That means it's probably not directly related to the underlying HTTP client being used. In case you have a fully-reprodible example with some data, that would greatly help anyone that might be able to also debug this issue. Also could you provide the entire stack trace please and potentially just the full log to understand the timings and what else is happening
Basically I had a streaming job that was streaming small files. Then I stopped it after there were around 2k files or so, tried compacting, and it failed with these content-length exceptions. I'll try to find some free time a bit later on to try and reproduce this.
@nastra where should I upload the data for you? I will upload it, then you can register the table in your catalog. I used hive catalog, but I don't think it matters.
Anyways, this data is also easily reproducible, I just made a spark streaming job that wrote random data:
package org.example;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import static org.apache.spark.sql.functions.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class StreamingSparkPartitioned {
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
SparkSession spark = SparkSession.builder()
.getOrCreate();
spark.sql("ALTER TABLE " + args[0] + " SET TBLPROPERTIES('compatibility.snapshot-id-inheritance.enabled'='true')");
System.out.println("TBL PROPS SHOW:");
spark.sql("SHOW TBLPROPERTIES " + args[0] + "('compatibility.snapshot-id-inheritance.enabled')").show();
Column expression = when(expr("value % 3 = 1"), "stupid_event").otherwise(
when(expr("value % 3 = 2"), "smart_event")
.otherwise("neutral_event"));
DataStreamWriter<Row> streamingDF = spark.readStream()
.format("rate-micro-batch")
.option("rowsPerBatch", "300000")
.load()
.withColumn("event_type", expression)
.withColumn("date", current_timestamp().cast("date"))
.withColumn("some_number", rand().multiply(4))
.withColumn("date2", expr("date_sub(date, cast(some_number as int))"))
.withColumn("random_str", split(lit("ABCDEFGHIJK"), ""))
.withColumn("random_str2", repeat(concat_ws("", shuffle(col("random_str"))), 500))
.writeStream()
.option("checkpointLocation", "s3a://obs-zdp-warehouse-stage-mz/test/checkpoint_2");
streamingDF
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("fanout-enabled", "true")
.partitionBy("date", "date2", "event_type")
.toTable(args[0])
.awaitTermination();
}
}
It streaming for a while, it generated some data for a few days, like 2k files or so, then I started using this:
spark.sql("CALL iceberg_catalog.system.rewrite_data_files(table => 'stage.test_partitioned')");
or this:
spark.sql("CALL iceberg_catalog.system.rewrite_data_files(table => 'stage.test_partitioned', options => map('partial-progress.enabled', 'true'))");
for compaction, and it fails with that content length exception.
Btw, as I said, the Scala DSL for compaction works, Spark SQL doesn't.
I compared the job parameters in the Spark UI tab, they are absolutely identical, so, it's not like there was some Spark Session config set that made it work for the Scala DSL.
Thanks @paulpaul1076, I will try and reproduce this next week on my end
Let me know if you manage to do it or not.
@nastra The easiest way to reproduce it is just use my streaming job, just leave it running, maybe for a few days even. And also schedule in airflow the compaction job (here you must use SQL specifically, not Scala DSL) to run every 3 hours for example. Just leave them be and come back to them in a day or more, this is what I did and I saw failed runs of the compaction job in airflow (sometimes it was successful, too, so not all runs of compaction were failures). When I ran with Scala DSL, there were no failed runs at all.
@paulpaul1076 I don't have an Airflow setup but I ran a streaming job locally and created 4000+ files.
The specific setup I used was from the Spark quickstart example with MinIO + REST catalog, but I'm not able to reproduce this issue when using the REST catalog (I don't have a Hive setup to try this out).
Can you please try if you're able to reproduce this against the REST catalog + MinIO from the quickstart example?
Since you mentioned, that you have lots ofs small files of about 200kb, I noticed that Premature end of Content-Length delimited message body (expected: 297,221,738; received: 143,086,296)
indicates that it expects a file of 297MB but only received 143 MB.
I also played with the timeout values from https://github.com/apache/iceberg/blob/29eebdd89efdcb9fb32e53325dfd2b29b9a3377b/aws/src/main/java/org/apache/iceberg/aws/HttpClientProperties.java#L83-L155 and configured them to very low values, but I was not able to get that exact error you were seeing. Can you maybe try in your hive setup and configure timeouts to be around 1-3 seconds just to see if it fails. That way we'd know which of the timeout configurations we'd have to raise in order for this error to go away (if possible).
@paulpaul1076 if you could also provide a full log for me to better understand what's happening on the system, that would be really great.
@nastra thank you very much, I will try with the timeouts tomorrow and let you know! As for the rest catalog I will have to ask our devops team to set it up, which may take a while, so I won't be able to get back ASAP.
@nastra also, regarding the job expecting 297MB and receving only 143, it's because I previously compacted this table multiple times and kept the streaming job running meanwhile, at some point it stopped compacting. I guess I forgot to mention that. That's why there were big files as well as small files.
@nastra these are the logs from the driver that does compaction and fails with this content length exception, and from one of the executors:
Btw, this exception also started popping up when querying with Trino:
Caused by: io.trino.spi.TrinoException: Error opening Iceberg split s3a://obs-zdp-warehouse-stage-mz/stage.db/test_simply_partitioned3/data/date=2024-02-14/00177-46924-33e1de6e-67aa-445d-89b2-a3baaa190813-00001.parquet (offset=4, length=6386727): Read 49152 tail bytes of file s3a://obs-zdp-warehouse-stage-mz/stage.db/test_simply_partitioned3/data/date=2024-02-14/00177-46924-33e1de6e-67aa-445d-89b2-a3baaa190813-00001.parquet failed: Incorrect file size (6386731) for file (end of stream not reached): s3a://obs-zdp-warehouse-stage-mz/stage.db/test_simply_partitioned3/data/date=2024-02-14/00177-46924-33e1de6e-67aa-445d-89b2-a3baaa190813-00001.parquet
at io.trino.plugin.iceberg.IcebergPageSourceProvider.createParquetPageSource(IcebergPageSourceProvider.java:1073)
at io.trino.plugin.iceberg.IcebergPageSourceProvider.createDataPageSource(IcebergPageSourceProvider.java:546)
at io.trino.plugin.iceberg.IcebergPageSourceProvider.createPageSource(IcebergPageSourceProvider.java:333)
at io.trino.plugin.iceberg.IcebergPageSourceProvider.createPageSource(IcebergPageSourceProvider.java:249)
at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider.createPageSource(ClassLoaderSafeConnectorPageSourceProvider.java:48)
at io.trino.split.PageSourceManager.createPageSource(PageSourceManager.java:61)
at io.trino.operator.TableScanOperator.getOutput(TableScanOperator.java:296)
at io.trino.operator.Driver.processInternal(Driver.java:395)
at io.trino.operator.Driver.lambda$process$8(Driver.java:298)
at io.trino.operator.Driver.tryWithLock(Driver.java:694)
at io.trino.operator.Driver.process(Driver.java:290)
at io.trino.operator.Driver.processForDuration(Driver.java:261)
at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:887)
at io.trino.execution.executor.timesharing.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:187)
at io.trino.execution.executor.timesharing.TimeSharingTaskExecutor$TaskRunner.run(TimeSharingTaskExecutor.java:565)
at io.trino.$gen.Trino_428____20240214_031724_2.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
It started happening after I regularly started running compaction. Also, been running compaction in Scala DSL (not Spark SQL)for a few days and it didn't fail a single time... So weird that Spark SQL and Trino fail.
@nastra So, I seem to have discovered new info about what's going on. For some reason in Iceberg metadata there are 2 entries of the same file:
Queried in Trino: Queried in Spark:
This is why all this is happening. How could it be that iceberg write 1 file twice into iceberg metadata but with different file_size_in_bytes?
Discussed in slack that this is due to iceberg's streaming writer not writing unique file names, this PR should fix this: https://github.com/apache/iceberg/pull/9255, waiting for iceberg 1.5.0, then I will test and write an update. Alternatively, there's a similar issue here: https://github.com/apache/iceberg/issues/8953 OP says that his problem went away when he switched to using foreachBatch
.
The problem happens when using direct streaming writer, when you restart your streaming job, it may create files with names that have been used before, that's why files get overwritten and there are multiple entries of the same file in iceberg metadata.
@nastra unfortunately this doesn't seem to be the only reason for the content-length exception. We now discovered that it still fails, even though I stopped using the direct streaming writer and switched to foreachBatch
.
In one of our test tables today I got this exception:
org.apache.iceberg.aws.shaded.org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 223,998,698; received: 143,802,368)
I looked for files in the files meta table whose file_size_in_bytes
is 223,998,698, and found nothing, where could it even get this number?
And once again, Spark actions work from Java/Scala DSL, but don't work from Spark SQL.
@nastra so, I deployed rest catalog locally, registered this table in it and now Spark SQL's rewrite_data_files
works. We will have to deploy it in production and do a more extensive test, but I believe you were right, hive catalog is buggy and we should use rest catalog.
@nastra unfortunately this doesn't seem to be the only reason for the content-length exception. We now discovered that it still fails, even though I stopped using the direct streaming writer and switched to
foreachBatch
.In one of our test tables today I got this exception:
org.apache.iceberg.aws.shaded.org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 223,998,698; received: 143,802,368)
I looked for files in the files meta table whose
file_size_in_bytes
is 223,998,698, and found nothing, where could it even get this number?And once again, Spark actions work from Java/Scala DSL, but don't work from Spark SQL.
My guess would be maybe the 223 MB is the uncompressed size, whereas the 143 MB is the compressed size. But this is just a wild guess and I don't know why that would happen, since you mentioned you don't have any files being that large.
Just to update this. We deployed the Nessie catalog in prod and this issue persists for some odd reason.
I don't think this is related to catalogs.
Catalogs just keep track of table metadata file. Here the callstack is about spark reading the parquet file from storage using iceberg-parquet reader and some iceberg metadata.
@ajantha-bhat yea, so, wondering, maybe there are some settings that could be tuned to let this work in spark SQL.
The thing is, I ran both Spark DSL and Spark SQL and compared the settings in the Environment tab of the Spark UI, and it's absolutely identical. So it boggles my mind that this happens.
For future reference the difference between Procedure and Action here was down the the Catalog being configured differently in both contexts. For Action HadoopFileIO was running and for Procedure S3FileIO was in use. When the action was switched to also use S3FileiO the same error replicated. So the underlying issue here is with S3FileIO.
Thanks for helping in narrowing down @RussellSpitzer 👍 We still need to figureout the solution to this problem. But I am not sure how to reproduce locally with small data / testcase.
@RussellSpitzer thanks a lot for helping with this. Want to give a bit more details (we discussed this with Russell in iceberg slack).
This is how I would load my catalog for the RewriteDataFiles action (Tried this with Nessie and Hive, so, what kind of catalog you use does not matter):
HiveCatalog catalog = new HiveCatalog();
catalog.setConf(spark.sparkContext().hadoopConfiguration());
Map<String, String> properties = new HashMap<>();
properties.put("warehouse", "s3a://obs-zdp-warehouse-stage-mz/");
properties.put("uri", "thrift://******");
catalog.initialize("hive", properties);
String tableName = args[0];
String[] schemaAndTable = tableName.split("\\.");
TableIdentifier tableId = TableIdentifier.of(schemaAndTable);
Table table = catalog.loadTable(tableId);
For the rewrite_data_files procedure this is how the catalog is loaded:
Table table = Spark3Util.loadIcebergTable(spark, tableNameStr);
As you can see in the case of RewriteDataFiles action I would construct the catalog myself from scratch and supply all the settings manually. Whereas, for the rewrite_data_files procedure it would take all its configs from SparkSession settings:
spark.sql.catalog.iceberg_catalog.io-impl: org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.iceberg_catalog.s3.access-key-id: (redacted)
spark.sql.catalog.iceberg_catalog.s3.endpoint: https://redacted
spark.sql.catalog.iceberg_catalog.s3.secret-access-key: (redacted)
So, as you can see for the procedure I was supplying S3FileIO, and this is what caused this bug. As soon as I removed these 4 settings from my SparkSession conf, this bug went away, because instead of using S3FileIO rewrite_data_files procedure started using HadoopFileIO.
@danielcweeks FYI.
@ajantha-bhat you don't need large scale to reproduce it at all. For me this problem started happening after the first run of rewrite_data_files. The second run started failing as soon as my iceberg table started having files around 300-500mb. Somehow, when S3FileIO loads these files it throws this content-length exception.
I am experiencing the same issues, using the same setup as paulpaul1076. Thanks to this discussion I also tried changing from S3FileIO to the default and so far it seems to be working well.
Apache Iceberg version
1.4.3 (latest release)
Query engine
Spark
Please describe the bug 🐞
I wrote this code in Scala:
And this code in SQL:
They are supposed to do the same thing, they even have the absolute same parameters.
Yet, the Scala one works fine, but the SQL one constantly throws this exception:
I run this code on a table with 1800 small files around 200kb each.