ibm-messaging / kafka-connect-mq-sink

This repository contains a Kafka Connect sink connector for copying data from Apache Kafka into IBM MQ.
Apache License 2.0
35 stars 43 forks source link

Message builder is not passing custom header and correlation id #66

Closed layshah72 closed 1 month ago

layshah72 commented 1 month ago

I want to pass correlation id from kafka message header to MQ message. My kafka message having the header value JMSCorrelationID but it is not getting passed to MQ message.

Kafka message publish code.

ProducerRecord <String, String> producerRecord = new ProducerRecord <>(topicName, null, System.currentTimeMillis(), correlationId, outputXml, null); producerRecord.headers().add(new RecordHeader("JMSCorrelationID", correlationId.getBytes())); producerRecord.headers().add(new RecordHeader("mq.reply.queue", replytoqueue.getBytes())); kafkaTemplate.send(producerRecord);

dalelane commented 1 month ago

please can you share the configuration you are using for the Connector

layshah72 commented 1 month ago

{ "name": "XXXX-mq-sink", "config": { "connector.class": "com.ibm.eventstreams.connect.mqsink.MQSinkConnector", "tasks.max": "1", "topics": "XXXXX", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "mq.queue.manager": "XXXX", "mq.connection.name.list": "XXXX(1441)", "mq.channel.name": "XXX", "mq.queue": "XXXX", "mq.message.body.jms": "true", "mq.message.builder": "com.ibm.eventstreams.connect.mqsink.builders.BaseMessageBuilder", "mq.kafka.headers.copy.to.jms.properties": "true", "mq.message.builder.key.header": "JMSCorrelationID", "offset.storage.file.filename": "", "mq.userid":"XXX" } }

dalelane commented 1 month ago

It looks like you're attempting a combination of a couple of approaches here.

  1. By setting mq.message.builder.key.header and putting correlationId as the fourth (key) argument in your ProducerRecord, you are providing the JMSCorrelationID value from the key in your Kafka message - I would've expected this to work
  2. By setting mq.kafka.headers.copy.to.jms.properties and creating a JMSCorrelationID header, it looks like you were also trying to set the JMSCorrelationID value in another way at the same time - this isn't currently supported, as JMSCorrelationID cannot be set as a standard String property, so I would've expected you to get a ConnectException saying something about JMSCorrelationID being a reserved property name

Have I understood this correctly?

layshah72 commented 1 month ago

Yes dale you are correct. I tried both approach to set correlation id from message key and setting it via custom header as well. But none of the approach is working . And at IBM CMH queue they are receiving all 000000000.

And moreover I am not getting any connect exception. I am able to send message to CMH queue. Only issue is that it does not have correlation id

layshah72 commented 1 month ago

correaltionid

dalelane commented 1 month ago

okay - so keeping them separate

Setting the correlation ID from a Kafka message key should work - if you're seeing problems with that, then this is a concern. It'd be useful to see logs from Connect to help investigate that.

Setting the correlation ID from a Kafka message header isn't currently possible - so it's interesting as a request for a new feature/enhancement, but not otherwise a surprise that it isn't working

layshah72 commented 1 month ago

we removed extra property and it worked ... Many thanks

{ "name": "XXXX-mq-sink", "config": { "connector.class": "com.ibm.eventstreams.connect.mqsink.MQSinkConnector", "tasks.max": "1", "topics": "XXXXX", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "mq.queue.manager": "XXXX", "mq.connection.name.list": "XXXX(1441)", "mq.channel.name": "XXX", "mq.queue": "XXXX", "mq.message.body.jms": "true", "mq.message.builder": "com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder", "mq.kafka.headers.copy.to.jms.properties": "true", --- removed this "mq.message.builder.key.header": "JMSCorrelationID", "offset.storage.file.filename": "", "mq.userid":"XXX" } }

dalelane commented 1 month ago

that's good to hear - thanks for letting us know