kestra-io / plugin-debezium

Apache License 2.0
5 stars 5 forks source link

Debezium Postgres trigger is evaluating but don't see any records #39

Closed Frolov-Viacheslav closed 5 months ago

Frolov-Viacheslav commented 11 months ago

Expected Behavior

The Debezium trigger should see all new records added during the interval. I have 2 flows with Debezium trigger connected to different Postgres servers. I think it's quite stable with only 1 flow that includes the Debezium trigger.

Actual Behaviour

It worked fine after setup. However, one of the triggers was evaluating but didn't see any records after a few days. I see in the logs: Ended after receiving 0 records: {} but I added a new record to the Postgres database. A restart of Kestra Worker helped to resume catching new records (almost immediately trigger found new records that were added before the restart). However, my second flow with the Debezium trigger worked correctly all the time so the issue was with only one flow.

There was another issue when Debezium Postgres trigger stopped the evaluation and did not provide any logs. Then the restart helped again. See the screenshot for an example of this case. Debezium ran at 04:13:05.901 then only at 22:23:47.637 (after my restart).

Also, it's possible that trigger found new records after some time. I set interval: PT30S but the trigger evaluates that 0 records were received a few times and only after 5 mins approximately catch the new records.

I tried to solve the issues myself by the next steps:

  1. Change Kestra architecture flow standalone to webserver, executor, scheduler, worker setup. I had an idea that it could improve performance.
  2. Rename Debezium trigger to something unique (the ids of triggers in different flows were identical previously)

Maybe, there is any conflict between Debezium triggers and something like different stateName or something can help.

image

Steps To Reproduce

  1. Create PUBLICATION in 2 different Postgres servers for some tables.
  2. Create 2 Kestra flows with Debezium Postgres trigger. For example:
    triggers:
    - id: listen-debezium-identityserver
    type: "io.kestra.plugin.debezium.postgres.Trigger"
    hostname: "{{vars.sourceServer}}"
    port: '5432'
    username: "{{vars.replicationUser}}"
    password: "{{secret('PG_PROD_IDP_REPLICAION_USER_PASSWORD')}}"
    database: "{{vars.databaseName}}"
    sslMode: REQUIRE
    snapshotMode: INITIAL
    slotName: "{{vars.databaseName}}"
    includedTables: public.identifications,public.external_identifications
    pluginName: PGOUTPUT
    format: INLINE
    metadata: ADD_FIELD
    deleted: DROP
    interval: PT60S
  3. Run some SQL commands against Postgres servers connected to Debezium triggers.
  4. Check that Debezium evaluations work fine.
  5. Leave Kestra flows for a few days (Postgres servers should be in use all this time ideally)
  6. Run SQL commands again and check logs in Kestra flows)

Environment Information

Example flow


tasks:
  - id: parallel
    type: io.kestra.core.tasks.flows.EachParallel
    value: "{{vars.tableNameList}}"
    tasks:
    - id: if
      type: io.kestra.core.tasks.flows.If
      condition: "{{trigger.uris[taskrun.value] ?? false }}"
      then:  
      - id: launch
        type: io.kestra.core.tasks.flows.Flow
        namespace: common
        flowId: syncTable
        inputs:
          fullTableName: "{{parents[0].taskrun.value}}"
          debeziumFile: "{{trigger.uris[parents[0].taskrun.value]}}"
          databaseUserName: "{{vars.destinationDatabaseUserName}}"
          databaseUserSecret: "{{vars.destinationDatabaseUserSecret}}"
          serverName: "{{vars.destinationServer}}"
          databaseName: "{{vars.databaseName}}"
        labels:
          destinationServer: "{{vars.destinationServer}}"
          sourceServer: "{{vars.sourceServer}}"
          tableName: "{{parents[0].taskrun.value}}"
        wait: true
        transmitFailed: true
        outputs:
          sql: "{{ outputs.node.outputFiles['script.sql'] }}"
          json: "{{ outputs.json.uri }}"

errors:
  - id: send_email
    type: io.kestra.plugin.notifications.mail.MailSend
    from: "{{vars.fromEmail}}"
    to: "{{vars.toEmail}}"
    username: "{{ secret('EMAIL_USERNAME') }}"
    password: "{{ secret('EMAIL_PASSWORD') }}"
    host: smtp.postmarkapp.com
    port: 587
    transportStrategy: SMTP
    subject: "Synchronization of {{flow.id}} database failed"
    htmlTextContent: |
      Synchronization of database from {{vars.sourceServer}}  to {{vars.sourceServer}} failed.<br />
      Please check Kestra server for more details.

#logical decoding requires wal_level = logical
triggers:
  - id: listen-debezium-identityserver
    type: "io.kestra.plugin.debezium.postgres.Trigger"
    hostname: "{{vars.sourceServer}}"
    port: '5432'
    username: "{{vars.replicationUser}}"
    password: "{{secret('PG_PROD_IDP_REPLICAION_USER_PASSWORD')}}"
    database: "{{vars.databaseName}}"
    sslMode: REQUIRE
    snapshotMode: INITIAL
    slotName: "{{vars.databaseName}}"
    includedTables: public.identifications,public.external_identifications
    pluginName: PGOUTPUT
    format: INLINE
    metadata: ADD_FIELD
    deleted: DROP
    interval: PT60S
loicmathieu commented 5 months ago

Hi, Sorry for the late reply.

If I understand it correctly, you setup two flows with the same trigger using Debezium and PostgreSQL and after some time only one still receive messages.

Maybe, there is any conflict between Debezium triggers and something like different stateName or something can help.

As the two triggers are in different flows, they will use a different state to store the offset data so it should not have any impact.

I'm not an expert on Debezium but if you want to consume multiple time the same event source I think you need to configure different slot name via the slotName property.

loicmathieu commented 5 months ago

Hi @Frolov-Viacheslav

If I understand correctly, it works fine for some time, then nothing is extracted, then sometimes it works fine again, and sometimes it needs a restart.

This really seems hard to debug!

To answer your question:

Maybe, there is any conflict between Debezium triggers and something like different stateName or something can help.

The state is per flow so you should change it only if you have two Debezium trigger on the same flow. So you are safe as you said you uses two flows.

Since 0.13.1, when you reported this issue, we refactored the scheduler, the component responsible for evaluating triggers. This may have fixed the issue, as you said that you didn't see any log for some period of time. This seems to be an issue with trigger scheduling, not the Debezium trigger itself, as if it was scheduled, you would have seen a log.

Can you test with 0.16?

loicmathieu commented 5 months ago

Hearing no feedback and having tested it OK, I'll close this issue.