kestra-io / plugin-gcp

Apache License 2.0
6 stars 9 forks source link

PubSub Realtime Trigger doesn't work #413

Closed wrussell1999 closed 2 weeks ago

wrussell1999 commented 2 weeks ago

Expected Behavior

Using the example, it should receive messages from the web console:

id: realtime-pubsub
namespace: company.team

tasks:
  - id: log
    type: io.kestra.plugin.core.log.Log
    message: "Received: {{ trigger.data }}"

triggers:
  - id: trigger
    type: io.kestra.plugin.gcp.pubsub.RealtimeTrigger
    projectId: snappy-nomad-422714-t8
    topic: test-topic
    subscription: test-subscription
    serviceAccount: "{{ secret('GOOGLE_SA') }}"

also this example doesn't work (same but Service Account is pasted in - fill in the ...):

**```yaml
id: realtime-pubsub
namespace: company.team

tasks:
  - id: log
    type: io.kestra.plugin.core.log.Log
    message: "Received: {{ trigger.data }}"

triggers:
  - id: trigger
    type: io.kestra.plugin.gcp.pubsub.RealtimeTrigger
    projectId: snappy-nomad-422714-t8
    topic: test-topic
    subscription: test-subscription
    serviceAccount: |
      {
        "type": "service_account",
        "project_id": "...",
        "private_key_id": "...",
        "private_key": "...",
        "client_email": "...",
        "client_id": "...",
        "auth_uri": "https://accounts.google.com/o/oauth2/auth",
        "token_uri": "https://oauth2.googleapis.com/token",
        "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
        "client_x509_cert_url": "...", 
        "universe_domain": "googleapis.com"
      }

Actual Behaviour

When the service account is provided as a secret (which works for other GCP tasks), I get this error before trying to send any messages to the topic:

kestra-1    | 2024-07-05 11:32:31,295 WARN  Gax-1574     flow.realtime-pubsub.trigger [namespace: company.team] [flow: realtime-pubsub] [trigger: trigger] [date: 2024-07-05T11:31:28.041Z] Worker Evaluate Failed with error 'Last unit does not have enough valid bits'
kestra-1    | java.lang.IllegalArgumentException: Last unit does not have enough valid bits
kestra-1    |   at java.base/java.util.Base64$Decoder.decode0(Unknown Source)
kestra-1    |   at java.base/java.util.Base64$Decoder.decode(Unknown Source)
kestra-1    |   at io.kestra.plugin.gcp.pubsub.model.Message.of(Message.java:75)
kestra-1    |   at io.kestra.plugin.gcp.pubsub.RealtimeTrigger.lambda$publisher$1(RealtimeTrigger.java:150)
kestra-1    |   at com.google.cloud.pubsub.v1.MessageDispatcher$3.run(MessageDispatcher.java:514)
kestra-1    |   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
kestra-1    |   at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
kestra-1    |   at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
kestra-1    |   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
kestra-1    |   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
kestra-1    |   at java.base/java.lang.Thread.run(Unknown Source)

as well as

kestra-1    | 2024-07-05 11:33:08,244 WARN  Gax-1584     flow.realtime-pubsub.trigger [namespace: company.team] [flow: realtime-pubsub] [trigger: trigger] [date: 2024-07-05T11:32:32.246Z] Worker Evaluate Failed with error 'Illegal base64 character 20'
kestra-1    | java.lang.IllegalArgumentException: Illegal base64 character 20
kestra-1    |   at java.base/java.util.Base64$Decoder.decode0(Unknown Source)
kestra-1    |   at java.base/java.util.Base64$Decoder.decode(Unknown Source)
kestra-1    |   at io.kestra.plugin.gcp.pubsub.model.Message.of(Message.java:75)
kestra-1    |   at io.kestra.plugin.gcp.pubsub.RealtimeTrigger.lambda$publisher$1(RealtimeTrigger.java:150)
kestra-1    |   at com.google.cloud.pubsub.v1.MessageDispatcher$3.run(MessageDispatcher.java:514)
kestra-1    |   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
kestra-1    |   at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
kestra-1    |   at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
kestra-1    |   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
kestra-1    |   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
kestra-1    |   at java.base/java.lang.Thread.run(Unknown Source)

If I paste the service account in manually, I get a different error:

kestra-1    | 2024-07-05 12:13:48,092 ERROR WorkerThread flow.realtime-pubsub.trigger com.google.common.io.BaseEncoding$DecodingException: Invalid input length 1625
kestra-1    | java.lang.IllegalArgumentException: com.google.common.io.BaseEncoding$DecodingException: Invalid input length 1625
kestra-1    |   at com.google.common.io.BaseEncoding.decode(BaseEncoding.java:219)
kestra-1    |   at com.google.api.client.util.Base64.decodeBase64(Base64.java:106)
kestra-1    |   at com.google.api.client.util.PemReader.readNextSection(PemReader.java:99)
kestra-1    |   at com.google.api.client.util.PemReader.readFirstSectionAndClose(PemReader.java:128)
kestra-1    |   at com.google.auth.oauth2.OAuth2Utils.privateKeyFromPkcs8(OAuth2Utils.java:239)
kestra-1    |   at com.google.auth.oauth2.ServiceAccountCredentials.fromPkcs8(ServiceAccountCredentials.java:436)
kestra-1    |   at com.google.auth.oauth2.ServiceAccountCredentials.fromJson(ServiceAccountCredentials.java:199)
kestra-1    |   at com.google.auth.oauth2.GoogleCredentials.fromStream(GoogleCredentials.java:200)
kestra-1    |   at com.google.auth.oauth2.ServiceAccountCredentials.fromStream(ServiceAccountCredentials.java:469)
kestra-1    |   at com.google.auth.oauth2.ServiceAccountCredentials.fromStream(ServiceAccountCredentials.java:452)
kestra-1    |   at io.kestra.plugin.gcp.CredentialService.credentials(CredentialService.java:26)
kestra-1    |   at io.kestra.plugin.gcp.AbstractTask.credentials(AbstractTask.java:28)
kestra-1    |   at io.kestra.plugin.gcp.pubsub.AbstractPubSub.createSubscription(AbstractPubSub.java:49)
kestra-1    |   at io.kestra.plugin.gcp.pubsub.Consume.createSubscription(Consume.java:35)
kestra-1    |   at io.kestra.plugin.gcp.pubsub.RealtimeTrigger.publisher(RealtimeTrigger.java:142)
kestra-1    |   at io.kestra.plugin.gcp.pubsub.RealtimeTrigger.evaluate(RealtimeTrigger.java:137)
kestra-1    |   at io.kestra.core.runners.WorkerTriggerRealtimeThread.doRun(WorkerTriggerRealtimeThread.java:31)
kestra-1    |   at io.kestra.core.runners.AbstractWorkerThread.run(AbstractWorkerThread.java:56)
kestra-1    | Caused by: com.google.common.io.BaseEncoding$DecodingException: Invalid input length 1625
kestra-1    |   at com.google.common.io.BaseEncoding$Base64Encoding.decodeTo(BaseEncoding.java:1030)
kestra-1    |   at com.google.common.io.BaseEncoding.decodeChecked(BaseEncoding.java:234)
kestra-1    |   at com.google.common.io.BaseEncoding.decode(BaseEncoding.java:217)
kestra-1    |   ... 17 common frames omitted

All these errors happen while the trigger is enabled.

Side note, we need more documentation on how to setup and test this plugin.

Steps To Reproduce

No response

Environment Information

Example flow

No response

loicmathieu commented 2 weeks ago

I made a small improvement to the trigger, it may had a race before.

Also, I use this command to create a message:

gcloud pubsub topics publish test-topic --message="SGVsbG8gV29ybGQ="

And I tested it, it works like a charm. Can you retest and give feedback?

wrussell1999 commented 2 weeks ago

I was successfully able to get it working with the service account as a secret as well as hard coded into the flow. Thanks!

Service Account as a secret: Screenshot 2024-07-09 at 18 04 13 A few tests

Service Account hard coded: Screenshot 2024-07-09 at 18 03 56

wrussell1999 commented 2 weeks ago

Resolved by https://github.com/kestra-io/kestra/commit/bf32d6f6e738f47142158774a36075d52550033f