apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.69k stars 4.2k forks source link

[Bug]: PubsubIO used in batch incorrect batch cutoff size #28011

Open slilichenko opened 11 months ago

slilichenko commented 11 months ago

What happened?

This is due to incorrect initialization of a transform [1] - instead of the max batch size in bytes the max number of batch records is passed. Addition issues with the message validation: Error message here [2] is misleading. It should state that a single record size exceeds maximum batch size, rather than just the number of bytes referenced in the generic Pub/Sub limits.

[1] https://github.com/apache/beam/blob/e59f001fdb3d5104cf7f82cdbe0b099e32c7b9c1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1363

[2] https://github.com/apache/beam/blob/e59f001fdb3d5104cf7f82cdbe0b099e32c7b9c1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java#L100

Issue Priority

Priority: 3 (minor)

Issue Components

liferoad commented 11 months ago

is this same as #27000?

slilichenko commented 11 months ago

Yes, I was using 2.48.0 and validated that things work after upgrading to 2.49.0.

Technically, there is still a bug - total message size including attribute data returned by the "validate" method is ignored and instead the message data size is used to calculate the batch cutoff:

https://github.com/apache/beam/blob/369265db4cd2aa294e6c4c4a4cfc1c8a9e4614c3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1441

Abacn commented 1 month ago

Per the latest comment, changed the Issue title and put proper priority tag