apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.59k stars 1.68k forks source link

[Bug] [Zeta] CDC doesn't produce events after checkpoints have finished #6900

Closed kaantecik closed 19 hours ago

kaantecik commented 1 month ago

Search before asking

What happened

I want to use SeaTunnel to capture changes from a MySQL database using CDC and then synchronize with another database. However, when I start the job, it only works when a checkpoint is created. Despite the job mode being set to streaming, checkpoint creation stops after 3 to 30 checkpoints.

SeaTunnel Version

2.3.5

SeaTunnel Config

{
    "env": {
        "job.mode": "STREAMING",
        "checkpoint.interval": 5000
    },
    "source": [
        {
            "plugin_name": "MySQL-CDC",
            "base-url": "jdbc:mysql://localhost:3306/cdc",
            "result_table_name": "cdc",
            "username": "***",
            "password": "***",
            "table-names": [
                "cdc.users"
            ],
            "startup.mode": "initial",
            "table-names-config": [
                {
                    "table": "cdc.users",
                    "primaryKeys": [
                        "id"
                    ]
                }
            ]
        }
    ],
    "transform": [],
    "sink": [
        {
            "plugin_name": "jdbc",
            "source_table_name": [
                "cdc"
            ],
            "url": "jdbc:mysql://localhost:3307/cdc?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true",
            "driver": "com.mysql.cj.jdbc.Driver",
            "user": "***",
            "password": "***",
            "generate_sink_sql": true,
            "database": "cdc",
            "primary_keys": [
                "id"
            ],
            "table": "users"
        }
    ]
}

Running Command

hazelcast-client:
  cluster-name: seatunnel
  properties:
    hazelcast.logging.type: log4j2
  network:
    cluster-members:
      - seatunnel-0.seatunnel.default.svc.cluster.local:5801

hazelcast:
  cluster-name: seatunnel
  network:
    rest-api:
      enabled: true
      endpoint-groups:
        CLUSTER_WRITE:
          enabled: true
        DATA:
          enabled: true
    join:
      tcp-ip:
        enabled: true
        member-list:
          - localhost
    port:
      auto-increment: false
      port: 5801
  properties:
    hazelcast.invocation.max.retry.count: 20
    hazelcast.tcp.join.port.try.count: 30
    hazelcast.logging.type: log4j2
    hazelcast.operation.generic.thread.count: 50

seatunnel:
  engine:
    history-job-expire-minutes: 1440
    backup-count: 1
    queue-type: blockingqueue
    print-execution-info-interval: 60
    print-job-metrics-info-interval: 60
    slot-service:
      dynamic-slot: true
    checkpoint:
      interval: 6000
      timeout: 7000
      storage:
        type: hdfs
        max-retained: 3
        plugin-config:
          namespace: /tmp/seatunnel/checkpoint_snapshot
          storage.type: hdfs
          fs.defaultFS: file:///tmp/

apiVersion: v1
kind: Service
metadata:
  name: seatunnel-headless-svc
spec:
  selector:
    app: seatunnel
  ports:
  - port: 5801
    name: seatunnel
  clusterIP: None
---
apiVersion: v1
kind: Service
metadata:
  name: seatunnel-external-service
spec:
  type: NodePort
  selector:
    app: seatunnel
  ports:
    - name: seatunnel
      protocol: TCP
      port: 5801
      targetPort: 5801
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: seatunnel
  annotations:
    configmap.reloader.stakater.com/reload: "hazelcast,hazelcast-client,seatunnelmap"
spec:
  serviceName: "seatunnel"
  replicas: 1 
  selector:
    matchLabels:
      app: seatunnel
  template:
    metadata:
      labels:
        app: seatunnel
    spec:
      containers:
        - name: seatunnel
          image: seatunnel # I followed https://seatunnel.apache.org/docs/2.3.5/start-v2/kubernetes/ document and created image named seatunnel
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 5801
              name: client
          command: ["/bin/sh","-c","/opt/seatunnel/bin/seatunnel-cluster.sh -DJvmOption=-Xms2G -Xmx2G"]
          resources:
            limits:
              cpu: "1"
              memory: 4G
            requests:
              cpu: "1"
              memory: 2G
          volumeMounts:
            - mountPath: "/opt/seatunnel/config/hazelcast.yaml"
              name: hazelcast
              subPath: hazelcast.yaml
            - mountPath: "/opt/seatunnel/config/hazelcast-client.yaml"
              name: hazelcast-client
              subPath: hazelcast-client.yaml
            - mountPath: "/opt/seatunnel/config/seatunnel.yaml"
              name: seatunnelmap
              subPath: seatunnel.yaml
      volumes:
        - name: hazelcast
          configMap:
            name: hazelcast
        - name: hazelcast-client
          configMap:
            name: hazelcast-client
        - name: seatunnelmap
          configMap:
            name: seatunnelmap

kubectl create configmap hazelcast-client  --from-file=hazelcast-client.yaml
kubectl create configmap hazelcast  --from-file=hazelcast.yaml
kubectl create configmap seatunnelmap  --from-file=seatunnel.yaml

kubectl apply -f .\seatunnel-cluster.yaml

Error Exception

-

Zeta or Flink or Spark Version

2.3.5

Java or Scala Version

-

Screenshots

-

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 2 weeks ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 19 hours ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.