IBMStreams / streamsx.kafka

Repository for integration with Apache Kafka
https://ibmstreams.github.io/streamsx.kafka/
Apache License 2.0
13 stars 9 forks source link

The "nConsecutiveRuntimeExc" variable never reaches 50 when exceptions occur #198

Closed anthonynassar closed 4 years ago

anthonynassar commented 4 years ago

I'm having an issue with making the job stop after several exceptions have arisen, which are supposedly consecutive errors. So, I tried to simulate an SaslAuthenticationException by providing wrong credentials to KafkaConsumer client, expecting that after 50 retries the job crashes after raising an exception, as it is designed in the code. Except that the nConsecutiveRuntimeExc variable which increments after each Exception, is being reinitialized in between two SaslAuthenticationException. This is produced in the runPollLoop() function of the AbstractKafkaConsumerConsumerClient class under the com.ibm.streamsx.kafka.clients.consumer namespace, specifically, in the while (eventQueue.isEmpty()) { ... } loop. The reinitialization is happening at line 827. (nConsecutiveRuntimeExc = 0;) https://github.com/IBMStreams/streamsx.kafka/blob/ff349d26d8cf1a395e65d71850c9db877fa0da55/com.ibm.streamsx.kafka/impl/java/src/com/ibm/streamsx/kafka/clients/consumer/AbstractKafkaConsumerClient.java#L827

What could be the reason behind the successful execution of the code in the while loop in between the two SaslAuthenticationException?

int nConsecutiveRuntimeExc = 0;
        while (eventQueue.isEmpty()) {
            boolean doPoll = true;
            // can wait for 100 ms; throws InterruptedException:
            try {
                checkSpaceInMessageQueueAndPauseFetching (false);
            }
            catch (IllegalStateException e) {
                logger.warn ("runPollLoop(): " + e.getLocalizedMessage());
                // no space, could not pause - do not call poll
                doPoll = false;
            }
            if (doPoll) {
                try {
                    final long now = System.currentTimeMillis();
                    final long timeBetweenPolls = now -lastPollTimestamp;
                    if (lastPollTimestamp > 0) {
                        // this is not the first 'poll'
                        if (timeBetweenPolls >= maxPollIntervalMs) {
                            logger.warn("Kafka client did'nt poll often enaugh for messages. "  //$NON-NLS-1$
                                    + "Maximum time between two polls is currently " + maxPollIntervalMs //$NON-NLS-1$
                                    + " milliseconds. Consider to set consumer property '" //$NON-NLS-1$
                                    + ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + "' to a value higher than " + timeBetweenPolls); //$NON-NLS-1$
                        }
                    }
                    lastPollTimestamp = System.currentTimeMillis();
                    EnqueResult r = pollAndEnqueue (pollTimeout, throttleSleepMillis > 0l);
                    final int nMessages = r.getNumRecords();
                    final long nQueuedBytes = r.getSumTotalSize();
                    final Level l = Level.DEBUG;
                    //                    final Level l = DEBUG_LEVEL;
                    if (logger.isEnabledFor (l) && nMessages > 0) {
                        logger.log (l, MsgFormatter.format ("{0,number,#} records with total {1,number,#}/{2,number,#}/{3,number,#} bytes (key/value/sum) fetched and enqueued",
                                nMessages, r.getSumKeySize(), r.getSumValueSize(), nQueuedBytes));
                    }
                    tryAdjustMinFreeMemory (nQueuedBytes, nMessages);
                    nConsecutiveRuntimeExc = 0;
                    nPendingMessages.setValue (messageQueue.size());
                    if (throttleSleepMillis > 0l) {
                        synchronized (throttledPollWaitMonitor) {
                            throttledPollWaitMonitor.wait (throttleSleepMillis);
                        }
                    }
                } catch (SerializationException e) {
                    // The default deserializers of the operator do not 
                    // throw SerializationException, but custom deserializers may throw...
                    // cannot do anything else at the moment
                    // (may be possible to handle this in future Kafka releases
                    // https://issues.apache.org/jira/browse/KAFKA-4740)
                    throw e;
                } catch (Exception e) {
                    // catches also 'java.io.IOException: Broken pipe' when SSL is used
                    logger.warn ("Exception caugt: " + e, e);
                    if (++nConsecutiveRuntimeExc >= 50) {
                        logger.error (e);
                        throw new KafkaOperatorRuntimeException ("Consecutive number of exceptions too high (50).", e);
                    }
                    logger.info ("Going to sleep for 100 ms before next poll ...");
                    Thread.sleep (100l);
                }
            }
        }
ghost commented 4 years ago

I can reproduce this behaviour (I used SASL_PLAIN). Obviously, the SASL authentication happens asynchronous with the consumer.poll invocations. As long as the SASL client state is not FAILED, consumer.poll returns without throwing an exception.

We see this here: (I have added an additional line in the code to see the successful return of pollAndEnqueue)

09 Jan 2020 08:54:42.687+0100 [2875] DEBUG #splapptrc,J[6],P[2],Msgs M[SslTransportLayer.java:org.apache.kafka.common.network.SslTransportLayer.handshakeFinished:424]  - [SslTransportLayer channelId=-6 key=sun.nio.ch.SelectionKeyImpl@40ffc512] SSL handshake completed successfully with peerHost 'broker-5-crbn2y7t0p5mytnz.kafka.svc01.us-south.eventstreams.cloud.ibm.com' peerPort 9093 peerPrincipal 'CN=*.svc01.us-south.eventstreams.cloud.ibm.com, OU=Armonk, O=International Business Machines Corporation, L=Armonk, ST=New York, C=US' cipherSuite 'SSL_ECDHE_RSA_WITH_AES_256_GCM_SHA384'
09 Jan 2020 08:54:42.688+0100 [2875] DEBUG #splapptrc,J[6],P[2],Msgs M[SaslClientAuthenticator.java:org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState:351]  - Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
09 Jan 2020 08:54:42.750+0100 [2875] INFO #splapptrc,J[6],P[2],Msgs M[AbstractKafkaConsumerClient.java:com.ibm.streamsx.kafka.clients.consumer.AbstractKafkaConsumerClient.runPollLoop:818]  - ======= poll succeeded); resetting nConsecutiveRuntimeExc. r = n=0; kSz=0; vSz = 0
09 Jan 2020 08:54:42.828+0100 [2875] DEBUG #splapptrc,J[6],P[2],Msgs M[SaslClientAuthenticator.java:org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState:351]  - Set SASL client state to SEND_HANDSHAKE_REQUEST
09 Jan 2020 08:54:42.829+0100 [2875] DEBUG #splapptrc,J[6],P[2],Msgs M[SaslClientAuthenticator.java:org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState:351]  - Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
09 Jan 2020 08:54:42.850+0100 [2875] INFO #splapptrc,J[6],P[2],Msgs M[AbstractKafkaConsumerClient.java:com.ibm.streamsx.kafka.clients.consumer.AbstractKafkaConsumerClient.runPollLoop:818]  - ======= poll succeeded); resetting nConsecutiveRuntimeExc. r = n=0; kSz=0; vSz = 0
09 Jan 2020 08:54:42.951+0100 [2875] INFO #splapptrc,J[6],P[2],Msgs M[AbstractKafkaConsumerClient.java:com.ibm.streamsx.kafka.clients.consumer.AbstractKafkaConsumerClient.runPollLoop:818]  - ======= poll succeeded); resetting nConsecutiveRuntimeExc. r = n=0; kSz=0; vSz = 0
09 Jan 2020 08:54:42.969+0100 [2875] DEBUG #splapptrc,J[6],P[2],Msgs M[SaslClientAuthenticator.java:org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState:351]  - Set SASL client state to INITIAL
09 Jan 2020 08:54:42.970+0100 [2875] DEBUG #splapptrc,J[6],P[2],Msgs M[SaslClientAuthenticator.java:org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState:351]  - Set SASL client state to INTERMEDIATE
09 Jan 2020 08:54:43.052+0100 [2875] INFO #splapptrc,J[6],P[2],Msgs M[AbstractKafkaConsumerClient.java:com.ibm.streamsx.kafka.clients.consumer.AbstractKafkaConsumerClient.runPollLoop:818]  - ======= poll succeeded); resetting nConsecutiveRuntimeExc. r = n=0; kSz=0; vSz = 0
09 Jan 2020 08:54:43.110+0100 [2875] DEBUG #splapptrc,J[6],P[2],Msgs M[SaslClientAuthenticator.java:org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState:351]  - Set SASL client state to FAILED
09 Jan 2020 08:54:43.111+0100 [2875] INFO #splapptrc,J[6],P[2],Msgs M[Selector.java:org.apache.kafka.common.network.Selector.pollSelectionKeys:617]  - [Consumer clientId=C-J6-Msgs, groupId=iP1989751531-j6-oP2407730] Failed authentication with broker-5-crbn2y7t0p5mytnz.kafka.svc01.us-south.eventstreams.cloud.ibm.com/169.48.196.106 (Authentication failed due to invalid credentials with SASL mechanism PLAIN)
09 Jan 2020 08:54:43.111+0100 [2875] DEBUG #splapptrc,J[6],P[2],Msgs M[NetworkClient.java:org.apache.kafka.clients.NetworkClient.handleDisconnections:882]  - [Consumer clientId=C-J6-Msgs, groupId=iP1989751531-j6-oP2407730] Node -6 disconnected.
09 Jan 2020 08:54:43.112+0100 [2875] ERROR #splapptrc,J[6],P[2],Msgs M[NetworkClient.java:org.apache.kafka.clients.NetworkClient.processDisconnection:737]  - [Consumer clientId=C-J6-Msgs, groupId=iP1989751531-j6-oP2407730] Connection to node -6 (broker-5-crbn2y7t0p5mytnz.kafka.svc01.us-south.eventstreams.cloud.ibm.com/169.48.196.106:9093) failed authentication due to: Authentication failed due to invalid credentials with SASL mechanism PLAIN
09 Jan 2020 08:54:43.112+0100 [2875] WARN #splapptrc,J[6],P[2],Msgs M[AbstractKafkaConsumerClient.java:com.ibm.streamsx.kafka.clients.consumer.AbstractKafkaConsumerClient.runPollLoop:844]  - Exception caugt: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed due to invalid credentials with SASL mechanism PLAIN

Here, the SASL state progresses from SEND_APIVERSIONS_REQUEST to FAILED while multiple calls to poll are made. Only in state FAILED, poll throws the SaslAuthenticationException.

anthonynassar commented 4 years ago

So to bypass the problem, I added a flag that will be activated after the reception of an exception. As a consequence, the nConsecutiveRuntimeExc variable will keep on incrementing as long as, after detecting the exception, no messages/tuples were received by the client after each poll. Therefore, no reception of messages, in this case, will be considered as part of the exception. Moreover, the mechanism gets deactivated after the reception of messages by setting the flag to false. In case of no exception occurred, the code will not execute this mechanism since it is deactivated by default. Thus, it is safe to poll from Kafka and receive no messages when the topic is fully consumed.

I don't know yet the impact this change could bring to the functioning of the application. Still, I suppose that other types of exceptions will not rely on that mechanism and, therefore, increment the nConsecutiveRuntimeExc variable in the catch section after capturing the exception. The SaslAuthenticationException is one of many other types of exceptions, but what is particular about that one is that the poll function returns successfully when jumping between states, even if later on, it will crash. I suppose that the other types of exception will keep on throwing exceptions, therefore, incrementing the counter in the catch section. P.S. I didn't experiment with any other exception to validate this hypothesis, and some are complicated to simulate.

Since, in general, the SaslAuthenticationException occurs directly after launching the application, applying this mechanism answered our needs to shut down the application as soon as an SaslAuthenticationException occurs. Rather than keeping it running in a healthy state and not being notified of such a problem that could affect downstream applications/jobs consuming tuples from the Kafka Client job.

Here is the part of the code that I modified in the runPollLoop() function:

int nConsecutiveRuntimeExc = 0;
boolean exceptionDetected = false;
while (eventQueue.isEmpty()) {
            boolean doPoll = true;
            // can wait for 100 ms; throws InterruptedException:
            try {
                checkSpaceInMessageQueueAndPauseFetching (false);
            }
            catch (IllegalStateException e) {
                logger.warn ("runPollLoop(): " + e.getLocalizedMessage());
                // no space, could not pause - do not call poll
                doPoll = false;
            }
            if (doPoll) {
                try {
                    final long now = System.currentTimeMillis();
                    final long timeBetweenPolls = now -lastPollTimestamp;
                    if (lastPollTimestamp > 0) {
                        // this is not the first 'poll'
                        if (timeBetweenPolls >= maxPollIntervalMs) {
                            logger.warn("Kafka client did'nt poll often enaugh for messages. "  //$NON-NLS-1$
                                    + "Maximum time between two polls is currently " + maxPollIntervalMs //$NON-NLS-1$
                                    + " milliseconds. Consider to set consumer property '" //$NON-NLS-1$
                                    + ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + "' to a value higher than " + timeBetweenPolls); //$NON-NLS-1$
                        }
                    }
                    lastPollTimestamp = System.currentTimeMillis();
                    EnqueResult r = pollAndEnqueue (pollTimeout, throttleSleepMillis > 0l);
                    final int nMessages = r.getNumRecords();
                    final long nQueuedBytes = r.getSumTotalSize();
                    final Level l = Level.DEBUG;
                    //                    final Level l = DEBUG_LEVEL;
                    if (logger.isEnabledFor (l) && nMessages > 0) {
                        logger.log (l, MsgFormatter.format ("{0,number,#} records with total {1,number,#}/{2,number,#}/{3,number,#} bytes (key/value/sum) fetched and enqueued",
                                nMessages, r.getSumKeySize(), r.getSumValueSize(), nQueuedBytes));
                    }
                    tryAdjustMinFreeMemory (nQueuedBytes, nMessages);
                    // check if an exception have been already detected and no messages are being received
                    if(exceptionDetected == true && nConsecutiveRuntimeExc < 50 && nMessages == 0){
                        logger.info("No messages received, so it is considered as a failed request. Incrementing nConsecutiveRuntimeExc.");
                        ++nConsecutiveRuntimeExc;
                    } else {
                        logger.debug("Reinitializing nConsecutiveRuntimeExc.");
                        nConsecutiveRuntimeExc = 0;
                        exceptionDetected = false;
                    }
                    nPendingMessages.setValue (messageQueue.size());
                    if (throttleSleepMillis > 0l) {
                        synchronized (throttledPollWaitMonitor) {
                            throttledPollWaitMonitor.wait (throttleSleepMillis);
                        }
                    }
                } catch (SerializationException e) {
                    // The default deserializers of the operator do not
                    // throw SerializationException, but custom deserializers may throw...
                    // cannot do anything else at the moment
                    // (may be possible to handle this in future Kafka releases
                    // https://issues.apache.org/jira/browse/KAFKA-4740)
                    throw e;
                } catch (Exception e) {
                    // catches also 'java.io.IOException: Broken pipe' when SSL is used
                    logger.warn ("Exception caught: " + e, e);
            // activate mechanism that deals with the non reception of tuples after the detection of an exception
            exceptionDetected = true;
                    if (++nConsecutiveRuntimeExc >= 50) {
                        logger.error (e);
                        throw new KafkaOperatorRuntimeException ("Consecutive number of exceptions too high (50).", e);
                    }
                    logger.info ("Going to sleep for 100 ms before next poll ...");
                    Thread.sleep (100l);
                }
            }
        }
ghost commented 4 years ago

Look at following scenario: The topic you are subscribed to does not receive messages. You will not get messages - it's a normal condition. Then a temporary condition causes an exception, that goes away within the interval the counter is increasing. As it is normal to fetch no messages, the application does not leave the "exception" state and will re-launch when the counter has reached it maximum (50 for now).

In my opinion, fetching no messages is not a good indicator to detect that a problem persists. Perhaps it is better to think of a different mechanism, for example, number of exceptions within a time window, or to apply such a mechanism only to a group of exceptions or to exclude a group of exceptions from such a mechanism. I actually don't know the exact exception class hierarchy for Kafka exceptions, but I know that there is a class RetriableException:

package org.apache.kafka.common.errors;

/**
 * A retryable exception is a transient exception that if retried may succeed.
 */
public abstract class RetriableException extends ApiException {
...
ghost commented 4 years ago

I suggest to implement following exception handling in the KafkaConsumer for exceptions that occur when polling for messages:

  1. Exceptions of type RetriableException (transient errors; next poll may succeed) are only logged/traced. The thread doing the poll sleeps for 100 ms before polling again.
  2. All other exceptions, for example authentication related exceptions, are counted in a sliding time window of size T. When the number of exceptions within the time window is higher than N, the exception is thrown causing the operator to fail the PE and re-launch the PE if not configured differently. Counting exceptions is done partitioned per exception class. As values for T, and N, I propose T = 15 seconds, and N = 2. With this settings, the operator would fail if it hit 3 times the same exception within 15 seconds.
  3. When records are fetched (number of records > 0), the exception filter is reset. Counting starts again from zero.

This mechanism should allow to detect persistent error conditions.

anthonynassar commented 4 years ago

Your suggestion is satisfying and answers the needs of handling different type of exceptions, gracefully. I implemented a simplified version of your algorithm that I would like to share here. One without the time window of 15 secs. Apart from that, and for my part, I assume that the issue can be closed.

protected void runPollLoop (long pollTimeout, long throttleSleepMillis) throws InterruptedException {
        if (throttleSleepMillis > 0l) {
            logger.log (DEBUG_LEVEL, MsgFormatter.format ("Initiating throttled polling (sleep time = {0} ms); maxPollRecords = {1}",
                    throttleSleepMillis, getMaxPollRecords()));
        }
        else {
            logger.log (DEBUG_LEVEL, MsgFormatter.format ("Initiating polling; maxPollRecords = {0}", getMaxPollRecords()));
        }
        synchronized (drainBuffer) {
            if (!drainBuffer.isEmpty()) {
                final int bufSz = drainBuffer.size();
                final int capacity = messageQueue.remainingCapacity();
                // restore records that have been put aside to the drain buffer
                if (capacity < bufSz) {
                    String msg = MsgFormatter.format ("drain buffer size {0} > capacity of message queue {1}", bufSz, capacity);
                    logger.error ("runPollLoop() - " + msg);
                    // must restart operator.
                    throw new RuntimeException (msg);
                }
                messageQueue.addAll (drainBuffer);
                final int qSize = messageQueue.size();
                drainBuffer.clear();
                logger.log (DEBUG_LEVEL, MsgFormatter.format ("runPollLoop(): {0,number,#} consumer records added from drain buffer to the message queue. Message queue size is {1,number,#} now.", bufSz, qSize));
            }
        }
        // continue polling for messages until a new event
        // arrives in the event queue
        fetchPaused = consumer.paused().size() > 0;
        logger.log (DEBUG_LEVEL, "previously paused partitions: " + consumer.paused());
        //int nConsecutiveRuntimeExc = 0;
        while (eventQueue.isEmpty()) {
            HashMap<String, Integer> exceptionMap = new HashMap<String, Integer>();
            boolean doPoll = true;
            // can wait for 100 ms; throws InterruptedException:
            try {
                checkSpaceInMessageQueueAndPauseFetching (false);
            }
            catch (IllegalStateException e) {
                logger.warn ("runPollLoop(): " + e.getLocalizedMessage());
                // no space, could not pause - do not call poll
                doPoll = false;
            }
            if (doPoll) {
                try {
                    final long now = System.currentTimeMillis();
                    final long timeBetweenPolls = now -lastPollTimestamp;
                    if (lastPollTimestamp > 0) {
                        // this is not the first 'poll'
                        if (timeBetweenPolls >= maxPollIntervalMs) {
                            logger.warn("Kafka client did'nt poll often enaugh for messages. "  //$NON-NLS-1$
                                    + "Maximum time between two polls is currently " + maxPollIntervalMs //$NON-NLS-1$
                                    + " milliseconds. Consider to set consumer property '" //$NON-NLS-1$
                                    + ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + "' to a value higher than " + timeBetweenPolls); //$NON-NLS-1$
                        }
                    }
                    lastPollTimestamp = System.currentTimeMillis();
                    EnqueResult r = pollAndEnqueue (pollTimeout, throttleSleepMillis > 0l);
                    final int nMessages = r.getNumRecords();
                    final long nQueuedBytes = r.getSumTotalSize();
                    final Level l = Level.DEBUG;
                    //                    final Level l = DEBUG_LEVEL;
                    if (logger.isEnabledFor (l) && nMessages > 0) {
                        logger.log (l, MsgFormatter.format ("{0,number,#} records with total {1,number,#}/{2,number,#}/{3,number,#} bytes (key/value/sum) fetched and enqueued",
                                nMessages, r.getSumKeySize(), r.getSumValueSize(), nQueuedBytes));
                    }
                    tryAdjustMinFreeMemory (nQueuedBytes, nMessages);

                    //nConsecutiveRuntimeExc = 0;
                    if (nMessages != 0l) exceptionMap.clear();

                    nPendingMessages.setValue (messageQueue.size());
                    if (throttleSleepMillis > 0l) {
                        synchronized (throttledPollWaitMonitor) {
                            throttledPollWaitMonitor.wait (throttleSleepMillis);
                        }
                    }
                } catch (SerializationException e) {
                    // The default deserializers of the operator do not
                    // throw SerializationException, but custom deserializers may throw...
                    // cannot do anything else at the moment
                    // (may be possible to handle this in future Kafka releases
                    // https://issues.apache.org/jira/browse/KAFKA-4740)
                    throw e;

                }   catch (RetriableException e) {
                    // catches also 'java.io.IOException: Broken pipe' when SSL is used
                    logger.warn ("Exception caught: " + e, e);
                    logger.info ("Going to sleep for 100 ms before next poll ...");
                    Thread.sleep (100l);
                }   catch(Exception e) 
                { 
                    String exceptionName = e.getClass().getCanonicalName();
                    if(exceptionMap.containsKey(exceptionName))
                        exceptionMap.put(exceptionName, exceptionMap.get(exceptionName) + 1);
                    else
                        exceptionMap.put(exceptionName, 0);

                    if (exceptionMap.get(exceptionName) >= 4) {
                        logger.error (e);
                        throw new KafkaOperatorRuntimeException ("Number of " + exceptionName + " exceptions is too high (5).", e);
                    }
                    logger.info ("Going to sleep for 120 ms before next poll ...");
                    Thread.sleep (120l);
                }
            }
        }
        logger.debug("Stop polling. Message in event queue: " + eventQueue.peek().getEventType()); //$NON-NLS-1$
    }
ghost commented 4 years ago

resolved in release 3.0.3