Closed ddebrunner closed 3 years ago
Not sure if we need a new issue, but I think in general, the async call and exception handling is not correct.
In KafkaSink:
@Override
public void process(StreamingInput<Tuple> stream, Tuple tuple) throws FileNotFoundException, IOException, UnsupportedStreamsKafkaConfigurationException{
try {
if(trace.isLoggable(TraceLevel.DEBUG))
trace.log(TraceLevel.DEBUG, "Sending message: " + tuple);
if(!topics.isEmpty())
producerClient.send(tuple, topics);
else
producerClient.send(tuple);
} catch(Exception e) {
trace.log(TraceLevel.ERROR, "Could not send message: " + tuple, e);
e.printStackTrace();
resetProducerIfPropertiesHaveChanged();
}
if(producerClient.hasMessageException() == true){
trace.log(TraceLevel.WARN, "Found message exception");
resetProducerIfPropertiesHaveChanged();
}
}
Problem is that producerClient.send(...) is actually an asynchronous method. The exception can be set at a later time when the callback is called. However, this code assumes send(...) is synchronous and check for message exception. Now, if we get an exception, we reset the producer properties, which I believe we are getting the properties from the app config and reconnecting.
In here:
private void resetProducerIfPropertiesHaveChanged()
throws FileNotFoundException, IOException, UnsupportedStreamsKafkaConfigurationException {
OperatorContext context = this.getOperatorContext();
if (newPropertiesExist(context)){
trace.log(TraceLevel.INFO,
"Properties have changed. Initializing producer with new properties.");
resetProducerClient(context);
} else {
trace.log(TraceLevel.INFO, "Properties have not changed, so we are keeping the same producer client and resetting the message exception.");
producerClient.resetMessageException();
}
}
Why do we want to reset if we have a message exception when the app config properties have not changed? Is this the right logic that app config properties affect whether the producerClient has received an exception or not?
See PR #287
@chanskw Yeah, I noticed that, and I think that send cannot throw an exception, so we shouldn't be catching anything there.
Agreed the resetMessageException() should not happen if an exception can be thrown from send (that specific method goes away with #287 but the same issue is there). Good catch!
Another related issue is what should happen if there's an exception sending a message in an autonomous region. Most operators will fail, unless they have explicit parameter indicating not to (or exceptions are being caught from the SPL app).
Changing the operator to fail would be a change in behaviour, but I think it's the correct thing to do, currently an application can look healthy but it's actually not writing any messages.
@ddebrunner do you have any advise on the best practices to handle exceptions in operators. Before CR was introduced, we generally try to catch the exceptions, log the error and then submit tuples to the error port. The reason is that we want to be more resilient and not cause the application to crash. The downside is that the application may seem healthy when it really is not.
If the operator is in a CR, upon an exception, I believe the operator should now cause a reset to happen. This can be done by rethrowing the exception. However, what happens when the user has added the @exception
annotation to the code. In this case, I believe the exception is caught by the runtime, and the PE will not crash. Therefore, a reset will not happen. I cannot remember exactly, but is there an API for the operator to trigger a reset? Is this the right approach upon an exception?
In the case when the operator is not in a CR, should operators just crash? should we be resilient and submit error tuples? What is the right behavior here?
Short answer: it depends. :-)
If the application has explicitly handled errors in some ways, e.g. optional error port, @catch
annotation parameter indicating ignore errors, etc. then I don't believe the operator should fail or reset the region.
Otherwise I believe the operator should fail which will cause to reset the region or just restart in an autonomous region.
Tracing the tuple to the log or trace is not recommended as it could be sensitive data.
@chanskw While I did create pr #287 after more thought I think we need to define what behaviour we want on error for the operator in a consistent region and autonomous region. I can write up the consistent region and maybe have a stab at the autonomous region one.
One thing I was unclear about was the mechanism that checks for something having changed in the connection configuration. Would this just be if the configuration came from an application configuration (4.2 only) or is there another mechanism that can update it?
I started a writeup here: https://github.com/IBMStreams/streamsx.messaging/wiki/KafkaProducerErrorHandling
@ddebrunner I took a quick stab at answering one of the questions in your proposal. I need to think about the rest a bit more.
re: One thing I was unclear about was the mechanism that checks for something having changed in the connection configuration. Would this just be if the configuration came from an application configuration (4.2 only) or is there another mechanism that can update it?
The mechanism for checking for changes in the connection configuration was solely for application configuration.
@chanskw
Why do we want to reset if we have a message exception when the app config properties have not changed? Is this the right logic that app config properties affect whether the producerClient has received an exception or not?
There is no reset if the appConfig properties haven't changed. The exception was only being used for the purposes of resetting or not, so I wanted to clear the exception in the case that we weren't going to "act on it" or had already acted on it (the exception also gets cleared if the operator is reset.
Thanks @ddebrunner ... I will try to look at this next week.
@chanskw Just to note that there is a reset()
method on ConsistentRegionContext
.
Customers are using the (from-scratch) new toolkit https://github.com/IBMStreams/streamsx.kafka that was started from scratch. If you still encounter the issue there, open an issue over there, please.