kestra-io / plugin-gcp

Apache License 2.0
6 stars 10 forks source link

PubSub Publish task fails when message contains orderingKey #348

Open shrutimantri opened 5 months ago

shrutimantri commented 5 months ago

Expected Behavior

Message has an attribute orderingKey. Messages with orderingKey should be published.

Actual Behaviour

If the task has messages with orderingKey, the execution fails. Here is the error log:

2024-03-23 12:31:27.869 Cannot publish a message with an ordering key when message ordering is not enabled in the Publisher client. Please create a Publisher client with setEnableMessageOrdering(true) in the builder.
2024-03-23 12:31:27.869 java.lang.IllegalStateException: Cannot publish a message with an ordering key when message ordering is not enabled in the Publisher client. Please create a Publisher client with setEnableMessageOrdering(true) in the builder.
    at com.google.common.base.Preconditions.checkState(Preconditions.java:512)
    at com.google.cloud.pubsub.v1.Publisher.publish(Publisher.java:255)
    at io.kestra.plugin.gcp.pubsub.Publish.lambda$buildFlowable$1(Publish.java:116)
    at io.kestra.core.utils.Rethrow.lambda$throwFunction$4(Rethrow.java:90)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171)
    at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
    at reactor.core.publisher.MonoReduce$ReduceSubscriber.request(MonoReduce.java:222)
    at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:53)
    at reactor.core.publisher.MonoReduce$ReduceSubscriber.onSubscribe(MonoReduce.java:98)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
    at reactor.core.publisher.Mono.block(Mono.java:1727)
    at io.kestra.plugin.gcp.pubsub.Publish.run(Publish.java:97)
    at io.kestra.plugin.gcp.pubsub.Publish.run(Publish.java:28)
    at io.kestra.core.runners.Worker$WorkerThread.run(Worker.java:710)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:103)
        at reactor.core.publisher.Mono.block(Mono.java:1728)
        ... 3 more

Looks like we should enableMessageOrdering on the publisher when the messages contain orderingKey (or we can by default set it to enabled).

Steps To Reproduce

  1. Take the example flow, and set correct values (ensure orderingKey is set for atleast one of the messages).
  2. Execute the flow.

Environment Information

Example flow

id: pubsub-publish
namespace: dev
tasks:
  - id: "publish"
    type: "io.kestra.plugin.gcp.pubsub.Publish"
    projectId: <project-id>
    serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
    topic: smantri-test
    from:
    -  data: "{{ 'base64-encoded-string-1' | base64encode }}"
       attributes:
           testAttribute: KestraTest
       messageId: '1234'
       orderingKey: 'foo'
    -  data: "{{ 'base64-encoded-string-2' | base64encode }}"
    -  attributes:
           testAttribute: KestraTest