ibm-messaging / kafka-connect-mq-sink

This repository contains a Kafka Connect sink connector for copying data from Apache Kafka into IBM MQ.
Apache License 2.0
35 stars 43 forks source link

syncpoint limit reached #10

Closed julienmmm closed 6 years ago

julienmmm commented 6 years ago

Hello,

I am testing the connector from a topic of +-100000 messages and it fails with the following error: [2018-10-26 13:24:25,797] ERROR JMS exception {} (com.ibm.eventstreams.connect.mqsink.JMSWriter:223) com.ibm.msg.client.jms.DetailedTransactionRolledBackRuntimeException: JMSCMQ0002: The method 'MQCMIT' failed. An IBM MQ call failed. Please see the linked exception for more information. at com.ibm.msg.client.jms.DetailedTransactionRolledBackException.getUnchecked(DetailedTransactionRolledBackException.java:274) at com.ibm.msg.client.jms.internal.JmsErrorUtils.convertJMSException(JmsErrorUtils.java:173) at com.ibm.msg.client.jms.internal.JmsContextImpl.commit(JmsContextImpl.java:275) at com.ibm.eventstreams.connect.mqsink.JMSWriter.commit(JMSWriter.java:220) at com.ibm.eventstreams.connect.mqsink.MQSinkTask.flush(MQSinkTask.java:106) at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:379) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:209) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.ibm.msg.client.jms.DetailedJMSException: JMSWMQ0028: At least one asynchronous put message failed or gave a warning. When a message is put asynchronously, any problems are not reported as exceptions thrown from the send() method. Instead they are passed to the exception listener, if one is defined. The exception listener is driven at syncpoint for transacted sessions, or every SEND_CHECK_COUNT asynchronous puts for non-transacted sessions. Please see the linked exception for more information on the failures and/or warnings. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.ibm.msg.client.commonservices.j2se.NLSServices.createException(NLSServices.java:319) at com.ibm.msg.client.commonservices.nls.NLSServices.createException(NLSServices.java:226) at com.ibm.msg.client.wmq.internal.WMQSession.callMQSTAT(WMQSession.java:671) at com.ibm.msg.client.wmq.internal.WMQSession.syncpoint(WMQSession.java:2007) at com.ibm.msg.client.wmq.internal.WMQSession.commit(WMQSession.java:856) at com.ibm.msg.client.jms.internal.JmsSessionImpl.commitTransaction(JmsSessionImpl.java:2871) at com.ibm.msg.client.jms.internal.JmsSessionImpl.commit(JmsSessionImpl.java:775) at com.ibm.msg.client.jms.internal.JmsContextImpl$SessionWrapper.commit(JmsContextImpl.java:2176) at com.ibm.msg.client.jms.internal.JmsContextImpl.commit(JmsContextImpl.java:269) ... 13 more Caused by: com.ibm.mq.MQException: JMSWMQ0002: Extra diagnostics provided by MQSTAT: compcode '2' reason '2024' object name 'TEST.Q1 ' resolved object name 'null' object type '1' queue manager name 'MQ_ESB02D ' resolved queue manager name 'null' put failure count '42858' put success count '10000' put warning count '0'. at com.ibm.msg.client.wmq.internal.WMQSession.callMQSTAT(WMQSession.java:669) ... 19 more [2018-10-26 13:24:25,799] ERROR MQ error: CompCode 2, Reason 2024 (com.ibm.eventstreams.connect.mqsink.JMSWriter:318)

The mq error code 2024 indicates 'MQRC_SYNCPOINT_LIMIT_REACHED'. so I suppose that because of the large number of message in the topic the syncpoint limit has been reached.

Is there a way to force an mq commit every 'X' kafka messages ?

AndrewJSchofield commented 6 years ago

Hi, The short answer is that the sink connector doesn't obviously have a way to control this at the moment. The MQ source connector does have a mechanism for controlling the maximum number of uncommitted messages, and a similar mechanism would be helpful for you here. I'll have a think about how best to do this.

Thanks.

AndrewJSchofield commented 6 years ago

Looking at the latest Kafka Connect interfaces, I think there are more options for controlling things. The real fix here is to take advantage of the new interfaces for requesting a commit and use it to keep the number of uncommitted messages down. Really, we just want to batch them to make disk writing on the queue manager more efficient and batching 50 or so messages at a time would do that nicely.

By default, the sink connector is called to flush only every 60 seconds and this permits a rather large number of messages to build up when things are busy.

Probably the simplest option for now is to set the value of the "offset.flush.interval.ms" configuration value in the worker configuration file to be a much smaller value. For example, if you set it to 2000, the sink connector will flush (and thus call MQCMIT) every 2 seconds.

julienmmm commented 6 years ago

Indeed,

I had the same conslusion as yours. I've set "offset.flush.interval.ms" to 1000 ms and ti works with no error now. Thanks for your help!