Open DanielMorales9 opened 1 month ago
Hmmm, for streaming mode there should be one RecordWriterManager per bundle so I don't think there should be multiple threads trying to access one instance.
But it is weird. If you're writing to one destination with no partitions, there should be exactly one writer per bundle at any given time. Perhaps old writers are not closing properly? Can you check if you have any logs for "Encountered an error when closing data writer..." (line 118)?
P.S. are you running at Beam HEAD? We recently merged a change that adds a static table cache (#32686), so it should be loaded only once per RecordWriterManager
Currently running a pipeline to try to repro your error:
Map<String, Object> config =
ImmutableMap.<String, Object>builder()
.put("table", table)
.put("catalog_name", "test")
.put("catalog_properties",
ImmutableMap.<String, String>builder()
.put("warehouse", warehouse)
.put("gcp_project", "apache-beam-testing")
.put("gcp_location", "us-central1")
.put("catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog")
.build())
.put("triggering_frequency_seconds", 5)
.build();
Schema taxiSchema =
Schema.builder()
.addStringField("ride_id")
.addInt32Field("point_idx")
.addDoubleField("latitude")
.addDoubleField("longitude")
.addStringField("timestamp")
.addDoubleField("meter_reading")
.addDoubleField("meter_increment")
.addStringField("ride_status")
.addInt32Field("passenger_count")
.build();
Pipeline q = Pipeline.create(options);
q
.apply(PubsubIO.readStrings().fromTopic("projects/pubsub-public-data/topics/taxirides-realtime"))
.apply(JsonToRow.withSchema(taxiSchema))
.apply(Managed.write(Managed.ICEBERG).withConfig(config));
q.run();
Will let it run for some time. The throughput is sitting at ~2k rows per second.
Hmmm, for streaming mode there should be one RecordWriterManager per bundle so I don't think there should be multiple threads trying to access one instance.
Strange, because I can see threads writing multiple files with few rows.
Can you check if you have any logs for "Encountered an error when closing data writer..." (line 118)?
I do not have any errors when closing writers.
P.S. are you running at Beam HEAD? We recently merged a change that adds a static table cache (https://github.com/apache/beam/pull/32686), so it should be loaded only once per RecordWriterManager
I am running version 2.59.0.
There's been quite a bit of improvements recently (see here). They should be available for 2.60.0
-- all except the two most recent PRs (#32686, #32688).
Can you try running the same pipeline against HEAD or the latest snapshot?
I noticed, however, that I have not provided the triggering frequency. I will check if it changes something and let you know.
I also found that the writer hangs indefinitely while trying to update the manifest file, but not timing out. Not sure why is that🤔
Operation ongoing in bundle for at least 02h10m00s without completing
Current user step name: Managed.ManagedTransform/ManagedSchemaTransformProvider.ManagedSchemaTransform/IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform/IcebergIO.WriteRows/Write Rows to Destinations/AppendFilesToTables/Append metadata updates to tables
Time spent in this step(millis): 1728598024092
Processing times in each step(millis)
Step name: Managed.ManagedTransform/ManagedSchemaTransformProvider.ManagedSchemaTransform/IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform/MapElements/Map
Time spent in this step: IntSummaryStatistics{count=90, sum=124, min=0, average=1.377778, max=53}
Step name: Managed.ManagedTransform/ManagedSchemaTransformProvider.ManagedSchemaTransform/IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform/IcebergIO.WriteRows/Write Rows to Destinations/AppendFilesToTables/Append metadata updates to tables
Time spent in this step: IntSummaryStatistics{count=90, sum=17162211, min=34058, average=190691.233333, max=996066}
Step name: Managed.ManagedTransform/ManagedSchemaTransformProvider.ManagedSchemaTransform/IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform/IcebergIO.WriteRows/Write Rows to Destinations/AppendFilesToTables/Group metadata updates by table/ReadStream
Time spent in this step: IntSummaryStatistics{count=942, sum=1470, min=0, average=1.560510, max=234}
at java.base@11.0.20/java.net.SocketInputStream.socketRead0(Native Method)
at java.base@11.0.20/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
at java.base@11.0.20/java.net.SocketInputStream.read(SocketInputStream.java:168)
at java.base@11.0.20/java.net.SocketInputStream.read(SocketInputStream.java:140)
at java.base@11.0.20/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:484)
at java.base@11.0.20/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:478)
at java.base@11.0.20/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
at java.base@11.0.20/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1459)
at java.base@11.0.20/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:1070)
at java.base@11.0.20/java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
at java.base@11.0.20/java.io.BufferedInputStream.read1(BufferedInputStream.java:292)
at java.base@11.0.20/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
at java.base@11.0.20/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:789)
at java.base@11.0.20/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:724)
at java.base@11.0.20/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1615)
at java.base@11.0.20/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1520)
at java.base@11.0.20/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527)
at java.base@11.0.20/sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:334)
at app//com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at app//com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:152)
at app//com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
at app//com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:525)
at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:466)
at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:490)
at app//com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:6522)
at app//com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStream(GoogleCloudStorageReadChannel.java:933)
at app//com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openContentChannel(GoogleCloudStorageReadChannel.java:724)
at app//com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:715)
at app//com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:307)
at app//com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:142)
at java.base@11.0.20/java.io.DataInputStream.read(DataInputStream.java:149)
at app//org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.read(HadoopStreams.java:123)
at app//org.apache.iceberg.avro.AvroIO$AvroInputStreamAdapter.read(AvroIO.java:117)
at app//org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:65)
at app//org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
at app//org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:76)
at app//org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:188)
at app//org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:187)
at app//org.apache.iceberg.ManifestFiles.copyManifestInternal(ManifestFiles.java:312)
at app//org.apache.iceberg.ManifestFiles.copyAppendManifest(ManifestFiles.java:264)
at app//org.apache.iceberg.MergingSnapshotProducer.copyManifest(MergingSnapshotProducer.java:288)
at app//org.apache.iceberg.MergingSnapshotProducer.add(MergingSnapshotProducer.java:279)
at app//org.apache.iceberg.MergeAppend.appendManifest(MergeAppend.java:68)
at app//org.apache.beam.sdk.io.iceberg.AppendFilesToTables$AppendFilesToTablesDoFn.processElement(AppendFilesToTables.java:93)
at app//org.apache.beam.sdk.io.iceberg.AppendFilesToTables$AppendFilesToTablesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
There's been quite a bit of improvements recently (see here). They should be available for
2.60.0
-- all except the two most recent PRs (#32686, #32688).Can you try running the same pipeline against HEAD or the latest snapshot?
Sure thing, will do that!
I noticed, however, that I have not provided the triggering frequency. I will check if it changes something and let you know.
Yep the triggering frequency was one of the recently added features. Streaming writes before 2.60.0 may work in some cases but its rather unpredictable.
I also found that the writer hangs indefinitely while trying to update the manifest file, but not timing out. Not sure why is that🤔
This is also one of the improvements -- we drastically reduced the number of manifest files we write. From my experience, the more manifest files we have, the longer it takes to update the next one
Let me know how the new pipeline goes!
I used 2.60.0-SNAPSHOT and a triggering frequency of 60s, but after some time I see the errors again:
at java.base@11.0.20/jdk.internal.misc.Unsafe.park(Native Method)
at java.base@11.0.20/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
at java.base@11.0.20/java.util.concurrent.FutureTask.awaitDone(FutureTask.java:447)
at java.base@11.0.20/java.util.concurrent.FutureTask.get(FutureTask.java:190)
at app//com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(BaseAbstractGoogleAsyncWriteChannel.java:247)
at app//com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.close(BaseAbstractGoogleAsyncWriteChannel.java:168)
at java.base@11.0.20/java.nio.channels.Channels$1.close(Channels.java:177)
at java.base@11.0.20/java.io.FilterOutputStream.close(FilterOutputStream.java:188)
at app//com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.close(GoogleHadoopOutputStream.java:119)
at app//org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at app//org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
at app//org.apache.iceberg.hadoop.HadoopStreams$HadoopPositionOutputStream.close(HadoopStreams.java:188)
at java.base@11.0.20/java.io.FilterOutputStream.close(FilterOutputStream.java:188)
at java.base@11.0.20/java.io.FilterOutputStream.close(FilterOutputStream.java:188)
at app//org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:461)
at app//org.apache.iceberg.avro.AvroFileAppender.close(AvroFileAppender.java:94)
at app//org.apache.iceberg.ManifestWriter.close(ManifestWriter.java:213)
at app//org.apache.iceberg.ManifestFiles.copyManifestInternal(ManifestFiles.java:337)
at app//org.apache.iceberg.ManifestFiles.copyAppendManifest(ManifestFiles.java:264)
at app//org.apache.iceberg.MergingSnapshotProducer.copyManifest(MergingSnapshotProducer.java:288)
at app//org.apache.iceberg.MergingSnapshotProducer.add(MergingSnapshotProducer.java:279)
at app//org.apache.iceberg.MergeAppend.appendManifest(MergeAppend.java:68)
at app//org.apache.beam.sdk.io.iceberg.AppendFilesToTables$AppendFilesToTablesDoFn.processElement(AppendFilesToTables.java:104)
at app//org.apache.beam.sdk.io.iceberg.AppendFilesToTables$AppendFilesToTablesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
However, it looks like it eventually succeeds and one snapshot is produced:
Not sure if it can be of any help, but I am using the following gcs-connector
version.
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>hadoop3-2.2.11</version>
</dependency>
We're using version hadoop2-2.2.16
(I'm not familiar if there's any performance differences, but may be worth trying it)
I canceled my previous repro attempt because it was healthy for 3+ hours, and attempting another run now with higher throughput and 60s triggering frequency.
Any chance you can provide a repro?
Ahh I'm seeing the error now
java.lang.IllegalArgumentException: Expected all data writers to be closed, but found 1 data writer(s) still open
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
at org.apache.beam.sdk.io.iceberg.RecordWriterManager.close(RecordWriterManager.java:295)
at org.apache.beam.sdk.io.iceberg.WriteGroupedRowsToFiles$WriteGroupedRowsToFilesDoFn.processElement(WriteGroupedRowsToFiles.java:109)
And also seeing the same number of these errors:
Fortunately when this error happens, the bundle is retried so there is no data-loss. The pipeline also adapts well by auto-scaling to more workers:
Seeing the following root causes:
java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: Java heap space
My guess is a higher triggering frequency means the worker has to store more records in memory when writing to files. We could also be doing something inefficient with memory -- will investigate further
I updated the GCS-Connector to hadoop3-2.2.16 and I can see high latency warnings on all type of operations on the manifest files.
Also, the request count on GCS is quite high compared to 20RPS on Pubsub.
Data files are written but the consumer is slow on committing a snapshot and a new manifest list.
The high latency warnings can be a little noisy -- they were added in in 2.2.16
(ref)
Also, the request count on GCS is quite high compared to 20RPS on Pubsub.
Agreed that's weird for just 20 rows/sec. Is this a project-based metric? Are you sure nothing else is writing to GCS at the same time?
Is this a project-based metric? Are you sure nothing else is writing to GCS at the same time?
No, it's at the bucket level. I know that most of the traffic is coming from this job because of the spike.
I can see that the job creates too many small files from the metatada. It's an average of 5 records per file.
I believe my pipeline was suffering from too much parallelism -- the work was split over too many concurrent threads within a worker, each one creating its own writer and eating up the worker's memory. High parallelism would also explain the many small files. There's an activeIcebergWriters
metric that roughly tracks how many open writers there are. Here's an example of way too many writers for only 1 worker:
I dropped the table but it's the same situation, still 5 records per file, not sure how to control the number of files per bundle. Do you respect the target file size as per spec?
not sure how to control the number of files per bundle
It's always one data file per bundle. The idea is to control the number of (concurrent) bundles
Do you respect the target file size as per spec?
We have a fixed 512MB max file size (ref)
Yes, I suffer the same parallelism problem:
- Adding .apply(Redistribute.
arbitrarily().withNumBuckets(
)) before the write step, reducing the parallelism to N
Is it similar to the Spark repartition
? Does it shuffle data? How will it work with autoscaling enabled?
- Adding .apply(Redistribute.
arbitrarily().withNumBuckets(
)) before the write step, reducing the parallelism to N - Use the --numberOfWorkerHarnessThreads=N pipeline option, which sets an upper bound on the number of threads per worker
Right now, I have autoscaling disabled an I will try to set N=2
and machineType=n1-standard-4
.
Hmmm, I'm seeing an old metric that we dropped (manifestFilesWritten
). Can you try using beam version 2.61.0-SNAPSHOT
?
Is it similar to the Spark repartition? Does it shuffle data?
Yes they're similar. The idea is to redistribute data across workers.
How will it work with autoscaling enabled?
This is hard to predict. In general autoscaling reacts to your backlog and throughput, and it may autoscale to more than the number of keys in your Redistribute.
Right now, I have autoscaling disabled an I will try to set N=2 and machineType=n1-standard-4
That's a good first step! Let me know how it goes -- honestly you may end up only needing the Redistribute
Yep, It works! 🥳 I can see only two threads are writing now and a stable commit-interval distribution.
However, I'm still uncertain about how Redistribute
behaves when autoscaling is enabled in Dataflow. 🤔
I might need to run a load test 📈
My concern is that with a fixed number of buckets defined using withNumBuckets
, autoscaling may cause inefficiencies. When autoscaling kicks in, if the number of workers exceeds the number of buckets numOfBuckets < numWorkers
, many workers could remain idle, leading to underutilization. This creates a scenario where the pipeline isn't truly elastic, as it can't dynamically scale with fluctuations in data volume.
At the same time, it's not feasible to skip redistribution entirely, as seen from earlier attempts—the job becomes non-performant and, in some cases, indefinitely stuck without it.
In contrast, I would expect IcebergIO
to support dynamic redistribution behavior of Iceberg with Spark, where partitions are automatically adjusted based on target file sizes or other heuristics (e.g., the default 512MB). Such dynamic repartitioning ensures that as data volume grows or shrinks, the system can adjust on the fly to maintain efficiency.
Yep, It works! 🥳
Great stuff! Glad to see it getting off the ground :)
I would expect IcebergIO to support dynamic redistribution
Yep that's very valid. I was hoping #32612 would take care of this, but looks like we're not quite there yet.
In the meantime, for your concern about idle workers, you can always set an upper bound on autoscaling with the --maxNumWorkers=N
pipeline option
What happened?
I am trying to stream data from PubSub (with a throughput of 10-50 RPS) into an Iceberg Table (not partitioned) using the IcebergIO connector and Hive Metastore.
However, after some time, I see warning logs in the console like below:
From the thread dump (see threadump.txt) it looks like a significant number of DataflowWorkUnits threads (287) are in the
TIMED_WAITING
state, specifically waiting for a Hive client from the pool.However, I find it surprising that all these threads are attempting to create a new writer each time, which results in the concurrent reload of the table we saw above. Is this behavior expected? I suspect that the performance issues stem from the
RecordWriterManager
class not being thread-safe. Specifically, it appears that a race condition is occurring in this code snippet due to thecheck-then-act
logic: https://github.com/apache/beam/blob/4cf99ea31aa2f3b4e8fa0da479939bbe92bd46dd/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java#L151-L161Indeed, when I stop the job I am prompted with the error message: the evidence that too many writers got created.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components