lensesio / stream-reactor

A collection of open source Apache 2.0 Kafka Connector maintained by Lenses.io.
https://lenses.io
Apache License 2.0
1k stars 362 forks source link

JMS connector exceptions with JMSReplyTo headers #1026

Closed michaeljwood closed 6 months ago

michaeljwood commented 7 months ago

Issue Guidelines

Please review these questions before submitting any issue?

What version of the Stream Reactor are you reporting this issue for?

6.0.3

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Yes, 3.3.1

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

No

Have you read the docs?

Yes

What is the expected behaviour?

JMS source connector is able to process a message with a JMSReplyTo header set to a queue. JMS sink connector is able to process messages with a null JMSReplyTo header value.

What was observed?

JMS source connector encounters a DataException when processing a message with a JMSReplyTo header set to a queue. JMS sink connector encounters a NullPointerException when processing a message with a null JMSReplyTo header value.

The JMSReplyTo header in the source connector seems to need to be converted to a string here. I believe for the sink connector the problem is the .toString() call here.

What is your Connect cluster configuration (connect-avro-distributed.properties)?

kind: Deployment
apiVersion: apps/v1
metadata:
  name: ck-connect-test-connect
spec:
  replicas: 6
  selector:
    matchLabels:
      strimzi.io/cluster: ck-connect-test
      strimzi.io/kind: KafkaConnect
      strimzi.io/name: ck-connect-test-connect
  template:
    metadata:
      labels:
        app.kubernetes.io/part-of: strimzi-ck-connect-test
        app.kubernetes.io/instance: ck-connect-test
        strimzi.io/name: ck-connect-test-connect
        app.kubernetes.io/managed-by: strimzi-cluster-operator
        strimzi.io/kind: KafkaConnect
        app.kubernetes.io/name: kafka-connect
        strimzi.io/cluster: ck-connect-test
    spec:
      restartPolicy: Always
      serviceAccountName: ck-connect-test-connect
      schedulerName: default-scheduler
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: node-role.kubernetes.io/kafka
                    operator: Exists
      terminationGracePeriodSeconds: 30
      securityContext: {}
      containers:
        - resources:
            limits:
              cpu: '2'
              memory: 20Gi
            requests:
              cpu: 200m
              memory: 5Gi
          readinessProbe:
            httpGet:
              path: /
              port: rest-api
              scheme: HTTP
            initialDelaySeconds: 90
            timeoutSeconds: 10
            periodSeconds: 20
            successThreshold: 1
            failureThreshold: 6
          terminationMessagePath: /dev/termination-log
          name: ck-connect-test-connect
          command:
            - /opt/kafka/kafka_connect_run.sh
          livenessProbe:
            httpGet:
              path: /
              port: rest-api
              scheme: HTTP
            initialDelaySeconds: 90
            timeoutSeconds: 10
            periodSeconds: 20
            successThreshold: 1
            failureThreshold: 6
          env:
            - name: KAFKA_CONNECT_CONFIGURATION
              value: >
                offset.storage.topic=test.connect.cluster.offsets

                value.converter=org.apache.kafka.connect.json.JsonConverter

                config.storage.topic=test.connect.cluster.configs

                key.converter=org.apache.kafka.connect.json.JsonConverter

                group.id=ck-connect-test

                status.storage.topic=test.connect.cluster.status

                access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS

                access.control.allow.origin=*

                config.providers=file

                config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider

                connector.client.config.override.policy=All

                offset.flush.interval.ms=15000

                offset.flush.timeout.ms=45000

                rebalance.timeout.ms=180000

                task.shutdown.graceful.timeout.ms=45000
            - name: KAFKA_CONNECT_METRICS_ENABLED
              value: 'true'
            - name: KAFKA_CONNECT_BOOTSTRAP_SERVERS
              value: 'kafka-bootstrap.kafka-test.svc.cluster.local:9093'
            - name: STRIMZI_KAFKA_GC_LOG_ENABLED
              value: 'false'
            - name: STRIMZI_DYNAMIC_HEAP_PERCENTAGE
             value: '75'
            - name: STRIMZI_JAVA_SYSTEM_PROPERTIES
              value: >-
                -Djava.security.disableSystemPropertiesFile=true
                -Dcom.ibm.mq.jms.SupportMQExtensions=true
            - name: KAFKA_CONNECT_TLS
              value: 'true'
            - name: KAFKA_CONNECT_TRUSTED_CERTS
              value: cluster-ca-cert-proxy/ca.crt
            - name: KAFKA_CONNECT_TLS_AUTH_KEY
              value: ck-connect-test-proxy/user.key
            - name: KAFKA_CONNECT_TLS_AUTH_CERT
              value: ck-connect-test-proxy/user.crt
            - name: MQAPPLNAME
              value: ck-connect-test
          ports:
            - name: rest-api
              containerPort: 8083
              protocol: TCP
            - name: tcp-prometheus
              containerPort: 9404
              protocol: TCP
          imagePullPolicy: Always
          volumeMounts:
            - name: strimzi-tmp
              mountPath: /tmp
            - name: kafka-metrics-and-logging
              mountPath: /opt/kafka/custom-config/
            - name: cluster-ca-cert-proxy
              mountPath: /opt/kafka/connect-certs/cluster-ca-cert-proxy
            - name: ck-connect-test-proxy
              mountPath: /opt/kafka/connect-certs/ck-connect-test-proxy
            - name: ext-conf-openshift-credentials
              mountPath: /opt/kafka/external-configuration/openshift-credentials
            - name: ext-conf-ibm-mq-jndi
              mountPath: /opt/kafka/external-configuration/ibm-mq-jndi
          terminationMessagePolicy: File
          image: >-
            image-registry.openshift-image-registry.svc:5000/kafka-test/ck-connect:657f456bce
      serviceAccount: ck-connect-test-connect
      volumes:
        - name: strimzi-tmp
          emptyDir:
            medium: Memory
            sizeLimit: 10Mi
        - name: kafka-metrics-and-logging
          configMap:
            name: ck-connect-test-connect-config
            defaultMode: 420
        - name: cluster-ca-cert-proxy
          secret:
            secretName: cluster-ca-cert-proxy
            defaultMode: 288
        - name: ck-connect-test-proxy
          secret:
            secretName: ck-connect-test-proxy
            defaultMode: 288
        - name: ext-conf-openshift-credentials
          secret:
            secretName: ck-connect-test-properties
            defaultMode: 288
        - name: ext-conf-ibm-mq-jndi
          configMap:
            name: ibm-mq-jndi-test
            defaultMode: 288
      dnsPolicy: ClusterFirst
      tolerations:
        - key: node-role.kubernetes.io/kafka
          operator: Exists
          effect: NoSchedule
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 0
      maxSurge: 1
  revisionHistoryLimit: 10
  progressDeadlineSeconds: 600

What is your connector properties configuration (my-connector.properties)?

Source connector

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  generation: 2
  labels:
    strimzi.io/cluster: ck-connect-test
  name: my-source-connector
spec:
  class: io.lenses.streamreactor.connect.jms.source.JMSSourceConnector
  config:
    connect.jms.destination.selector: JNDI
   connect.jms.url: 'file:/opt/kafka/external-configuration/ibm-mq-jndi/'
    connect.jms.converter.throw.on.error: true
    connect.jms.kcql: >-
      INSERT INTO OUTPUT_TOPIC SELECT * FROM INPUT_TOPIC WITHTYPE
      TOPIC
    connect.jms.username: >-
      ${file:/opt/kafka/external-configuration/openshift-credentials/openshift.credentials:username}
    connect.jms.subscription.name: my-subscription
    connect.jms.connection.factory: ConnectionFactory
    connect.jms.password: >-
      ${file:/opt/kafka/external-configuration/openshift-credentials/openshift.credentials:password}
    connect.jms.initial.context.factory: com.sun.jndi.fscontext.RefFSContextFactory
  tasksMax: 1

Sink connector

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  labels:
    strimzi.io/cluster: ck-connect-test
  name: my-connector
spec:
  class: io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector
  config:
    connect.jms.destination.selector: JNDI
    value.converter: org.apache.kafka.connect.storage.StringConverter
    topics: SOURCE-TOPIC
    consumer.override.max.poll.records: '100'
    value.converter.schemas.enable: false
    connect.jms.url: 'file:/opt/kafka/external-configuration/ibm-mq-jndi/'
    connect.jms.converter.throw.on.error: true
    key.converter: org.apache.kafka.connect.storage.StringConverter
    connect.jms.kcql: >-
      INSERT INTO OUTPUT_TOPIC SELECT * FROM INPUT_TOPIC WITHFORMAT TEXT
      WITHTYPE TOPIC
    connect.jms.username: >-
      ${file:/opt/kafka/external-configuration/openshift-credentials/openshift.credentials:username}
    connect.jms.connection.factory: ConnectionFactory
    key.converter.schemas.enable: false
    connect.jms.password: >-
      ${file:/opt/kafka/external-configuration/openshift-credentials/openshift.credentials:password}
    connect.jms.initial.context.factory: com.sun.jndi.fscontext.RefFSContextFactory
  tasksMax: 1

Please provide full log files (redact and sensitive information)

Source connector exception

org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type STRING: class com.ibm.mq.jms.MQQueue for field: "reply_to"
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
    at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
    at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
    at io.lenses.streamreactor.connect.jms.source.domain.JMSStructMessage$.getStruct(JMSStructMessage.scala:59)
    at io.lenses.streamreactor.connect.jms.source.converters.JMSStructMessageConverter.convert(JMSStructMessageConverter.scala:26)
    at io.lenses.streamreactor.connect.jms.source.readers.JMSReader.convert(JMSReader.scala:60)
    at io.lenses.streamreactor.connect.jms.source.readers.JMSReader.$anonfun$poll$3(JMSReader.scala:52)
    at scala.collection.immutable.Vector1.map(Vector.scala:1911)
    at scala.collection.immutable.Vector1.map(Vector.scala:377)
    at io.lenses.streamreactor.connect.jms.source.readers.JMSReader.$anonfun$poll$1(JMSReader.scala:52)
    at scala.collection.StrictOptimizedIterableOps.flatMap(StrictOptimizedIterableOps.scala:118)
    at scala.collection.StrictOptimizedIterableOps.flatMap$(StrictOptimizedIterableOps.scala:105)
    at scala.collection.immutable.Vector.flatMap(Vector.scala:113)
    at io.lenses.streamreactor.connect.jms.source.readers.JMSReader.poll(JMSReader.scala:48)
    at io.lenses.streamreactor.connect.jms.source.JMSSourceTask.poll(JMSSourceTask.scala:87)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Sink connector exception

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.lang.NullPointerException
    at io.lenses.streamreactor.common.errors.ThrowErrorPolicy.handle(ErrorPolicy.scala:61)
    at io.lenses.streamreactor.common.errors.ErrorHandler.handleError(ErrorHandler.scala:81)
    at io.lenses.streamreactor.common.errors.ErrorHandler.handleTry(ErrorHandler.scala:60)
    at io.lenses.streamreactor.common.errors.ErrorHandler.handleTry$(ErrorHandler.scala:41)
    at io.lenses.streamreactor.connect.jms.sink.writers.JMSWriter.handleTry(JMSWriter.scala:36)
    at io.lenses.streamreactor.connect.jms.sink.writers.JMSWriter.write(JMSWriter.scala:87)
    at io.lenses.streamreactor.connect.jms.sink.JMSSinkTask.$anonfun$put$2(JMSSinkTask.scala:75)
    at scala.Option.foreach(Option.scala:437)
    at io.lenses.streamreactor.connect.jms.sink.JMSSinkTask.put(JMSSinkTask.scala:75)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    ... 10 more
Caused by: java.lang.NullPointerException
    at io.lenses.streamreactor.connect.jms.sink.converters.JMSHeadersConverterWrapper.$anonfun$convert$1(JMSHeadersConverterWrapper.scala:32)
    at io.lenses.streamreactor.connect.jms.sink.converters.JMSHeadersConverterWrapper.$anonfun$convert$1$adapted(JMSHeadersConverterWrapper.scala:31)
    at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
    at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
    at io.lenses.streamreactor.connect.jms.sink.converters.JMSHeadersConverterWrapper.convert(JMSHeadersConverterWrapper.scala:31)
    at io.lenses.streamreactor.connect.jms.sink.writers.JMSWriter.createJMSRecord(JMSWriter.scala:66)
    at io.lenses.streamreactor.connect.jms.sink.writers.JMSWriter.$anonfun$write$2(JMSWriter.scala:75)
    at scala.collection.immutable.Vector1.map(Vector.scala:1911)
    at scala.collection.immutable.Vector1.map(Vector.scala:377)
    at io.lenses.streamreactor.connect.jms.sink.writers.JMSWriter.$anonfun$write$1(JMSWriter.scala:75)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
    at scala.util.Try$.apply(Try.scala:210)
    at io.lenses.streamreactor.connect.jms.sink.writers.JMSWriter.write(JMSWriter.scala:74)
    ... 14 more
davidsloan commented 7 months ago

Hi @michaeljwood

I've pushed a branch: https://github.com/lensesio/stream-reactor/pull/1027

Do you want to try this and see if it solves the problem you've reported?

Let me know if you need help building it.

michaeljwood commented 7 months ago

Alright, got that built and deployed to my test cluster, and it appeared to be working. Thank you @davidsloan!

davidsloan commented 6 months ago

This has now been released in https://github.com/lensesio/stream-reactor/releases/tag/6.1.0