apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.87k stars 4.26k forks source link

[Bug]: PubsubIO.readMessagesWithAttributesAndMessageIdAndOrderingKey is not working with Direct and Dataflow Runner [2.53.0] #30078

Closed nrj179 closed 9 months ago

nrj179 commented 9 months ago

What happened?

With Direct Runner it throws the below exception :

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null String
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null String
    at org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:74)
    at org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:68)
    at org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:37)
    at org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.encode(PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java:64)

With dataflow runner, it does not receive the message it all.

RCA:

Below method does not set "setNeedsAttributes(true)" and "setNeedsMessageId(true)" like PubsubIO.readMessagesWithAttributesAndMessageId.

public static Read<PubsubMessage> readMessagesWithAttributesAndMessageIdAndOrderingKey() {
    return Read.newBuilder()
        .setCoder(PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of())
        .setNeedsOrderingKey(true)
        .build();
  }

Due to this, in the PubsubUnboundedSource.expand method, function which serializes only payload is picked (not the one which serializes message attributes, ordering key and message Id) before passing it to PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.encode method.

if (getNeedsAttributes() || getNeedsMessageId()) {
      function = new PubsubMessages.ParsePubsubMessageProtoAsPayload();
    } else {
      function = new DeserializeBytesIntoPubsubMessagePayloadOnly();
    }

So it always fail while trying to encode the MessageId in PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.encode.

@Override
  public void encode(PubsubMessage value, OutputStream outStream) throws IOException {
    PAYLOAD_CODER.encode(value.getPayload(), outStream);
    ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream);
    MESSAGE_ID_CODER.encode(value.getMessageId(), outStream); // fails here
    // TODO(discuss what to do with publish_time field)
    PUBLISH_TIME_CODER.encode(Timestamp.getDefaultInstance(), outStream);
    ORDERING_KEY_CODER.encode(value.getOrderingKey(), outStream);
  }

Tested with latest beam version 2.53.0

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

Abacn commented 9 months ago

Fixed by #30079