apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.78k stars 4.22k forks source link

[Bug]: Default PubsubMessage coder will drop message id and orderingKey #23525

Open TheNeuralBit opened 1 year ago

TheNeuralBit commented 1 year ago

What happened?

We register multiple coders for PubsubMessage: https://github.com/apache/beam/blob/9b2f87d0201e8923a021c6bedbe0f64e37704014/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java#L32-L40

But in practice everything but the first one is a no-op. We will never use them (see https://github.com/apache/beam/pull/22216#discussion_r951821329 for a detailed explanation). This means that in cases where the coder for a PCollection<PubsubMessage> is inferred, we will use PubsubMessageWithAttributesCoder, which does not handle messageID or orderingKey.

Issue Priority

Priority: 2

Issue Component

Component: io-java-gcp

TheNeuralBit commented 1 year ago

Marking this P1 since it could lead to data loss (in rare cases). We might mitigate by making PubsubMessageWithAttributesCoder (and other PubsubMessage coders) raise an error if the unsupported fields are non-null.

A better long-term fix would be to default to a coder that handles all fields, but this could break update compatibility so we'd need to be careful.

apilloud commented 1 year ago

@egalpin @TheNeuralBit Is this fixed by #22216?

egalpin commented 1 year ago

@apilloud It's introduced by https://github.com/apache/beam/pull/22216 rather than fixed. Part of the complexity, at least for me, is the multiple coders registered against the same class in PubsubCoderProviderRegistrar. Since all pubsub messages of type org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage appear to use the first coder in the registrar by default, any messages with a superset of properties compared to PubsubMessageWithAttributesCoder will have those properties nullified silently.

Assuming that the "superset" coder which supports all the fields were the default, what would the drawback be in the case where, say, the messageId would never be used? It would mean more bytes to represent the PubsubMessage, but are there other drawbacks? Otherwise it seems like it might be easiest to make the "superset" coder the default, and view other coders as data-size optimizations that users could opt in to.

I do like the idea of raising an error in the coders in the case where unsupported fields are non-null, I think this could be an effective way to defend against data loss that could be implemented relatively quickly.

TheNeuralBit commented 1 year ago

are there other drawbacks?

The only other drawback I'm aware of is that we'd likely break update compatibility for pipelines that were using the default coder. It might be good to discuss this on the dev list to weigh our options here. Would you want to start that thread @egalpin?

apilloud commented 1 year ago

Only a small subset of dataflow users care about upgrade compatibility and we don't actually guarantee it is forever. As long as there is some path (it could require a code change by the user, such as explicitly declaring the old coder) to produce the previous behavior that should be sufficient.

egalpin commented 1 year ago

Good idea, I'll start a thread on dev list

kennknowles commented 1 year ago

Any update on this one?

egalpin commented 1 year ago

Ah this one fell off the radar for me unfortunately. I've started a dev@ thread just now. Thanks for the reminder!

kennknowles commented 1 year ago

Noting that there is an open PR and I pinged it.

kennknowles commented 1 year ago

PR closed to lack of activity. I presume you ran out of time? Maybe someone else can pick this up.

egalpin commented 1 year ago

thanks @kennknowles 👍 I just have not been able to squeeze in the time for testing. Agreed, happy to collaborate or hand off if someone else has capacity!

codertimu commented 10 months ago

I've stumbled upon this bug once again. To assist anyone, I'm sharing the stack trace of the error that occurs in the dataflow console when using PubsubIO.readMessagesWithAttributesAndMessageIdAndOrderingKey().

Beam version: 2.51.0

org.apache.beam.sdk.coders.CoderException: NullableCoder expects either a byte valued 0 (null) or 1 (present), got 50
        org.apache.beam.sdk.coders.NullableCoder.decode(NullableCoder.java:90)
        org.apache.beam.sdk.coders.NullableCoder.decode(NullableCoder.java:79)
        org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.decode(PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java:73)
        org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.decode(PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java:35)
        org.apache.beam.sdk.coders.Coder.decode(Coder.java:154)
        org.apache.beam.runners.dataflow.worker.PubsubReader$PubsubReaderIterator.decodeMessage(PubsubReader.java:129)
        org.apache.beam.runners.dataflow.worker.WindmillReaderIteratorBase.advance(WindmillReaderIteratorBase.java:60)
        org.apache.beam.runners.dataflow.worker.WindmillReaderIteratorBase.start(WindmillReaderIteratorBase.java:46)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
        org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:834)