apache / skywalking

APM, Application Performance Monitoring System
https://skywalking.apache.org/
Apache License 2.0
23.71k stars 6.5k forks source link

NullPointerException when using KafkaProducer.send(record) #6481

Closed yaojingguo closed 3 years ago

yaojingguo commented 3 years ago

Please answer these questions before submitting your issue.


Bug

  1. SkyWalking 8.4.0
  2. OS: macOS 10.15.5
  3. JDK: Oracle JDK 1.8.0_162
  4. kafka_2.13-2.7.0 server
  5. spring-kafka:2.2.6.RELEASE which depends on kafka-clients:2.6.0

After running the above code, I found the following exception in agent/logs/skywalking-api.log:

ERROR 2021-03-03 23:40:36:098 kafka-producer-network-thread | producer-1 InstMethodsInter : class[class org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback] before method[onCompletion] intercept failure 
java.lang.NullPointerException
    at org.apache.skywalking.apm.plugin.kafka.CallbackInterceptor.getSnapshot(CallbackInterceptor.java:83)
    at org.apache.skywalking.apm.plugin.kafka.CallbackInterceptor.beforeMethod(CallbackInterceptor.java:44)
    at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:76)
    at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:228)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:653)
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634)
    at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:554)
    at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$0(Sender.java:743)
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:566)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
    at java.lang.Thread.run(Thread.java:748)

Requirement or improvement

If KafkaProducer.send(record) is used, there is no callback. So allArguments[0] is null in CallbackConstructorInterceptor.onConstruct. When CallbackInterceptor.getSnapshot executes, cache.getCallback() returns null.

My first throught is to add null check for cache.getCallback() in CallbackInterceptors beforeMethod and afterMethod methods. If cache.getCallback() is null, do nothing.

wu-sheng commented 3 years ago

@zifeihan Please confirm the case.

zifeihan commented 3 years ago

thanks for report this bug, by looking at the source code of different versions, it was found that their processing logic did have a certain change, which may caused the problem. other words, our kafka-scenario test not covered this.

It is recommended to use another way to fix it, judge whether callback is empty in CallbackConstructorInterceptor, like this

public class CallbackConstructorInterceptor implements InstanceConstructorInterceptor {

    @Override
    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
        Callback callback = (Callback) allArguments[0];
        if (null == callback) {
            CallbackCache cache;
            if (null != objInst.getSkyWalkingDynamicField()) {
                cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
            } else {
                cache = new CallbackCache();
            }
            cache.setCallback(callback);
            objInst.setSkyWalkingDynamicField(cache);
        }
    }
}

I noticed that a similar judgment was made in kafka-client, like

    private static class InterceptorCallback<K, V> implements Callback {
        private final Callback userCallback;
        private final ProducerInterceptors<K, V> interceptors;
        private final TopicPartition tp;

        private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, TopicPartition tp) {
            this.userCallback = userCallback;
            this.interceptors = interceptors;
            this.tp = tp;
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            metadata = metadata != null ? metadata : new RecordMetadata(this.tp, -1L, -1L, -1L, -1L, -1, -1);
            this.interceptors.onAcknowledgement(metadata, exception);
            if (this.userCallback != null) {
                this.userCallback.onCompletion(metadata, exception);
            }

        }
    }

Is it convenient for you to improve the test content in kafka-scenario? If not, I will improve kafka-scenario after you fix this bug.

yaojingguo commented 3 years ago

Is it convenient for you to improve the test content in kafka-scenario?

I will have a try to improve kafka-scenario after I fix this bug.

I think that you mean null != callback by writing null == callback:

public class CallbackConstructorInterceptor implements InstanceConstructorInterceptor {

    @Override
    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
        Callback callback = (Callback) allArguments[0];
        if (null == callback) {
            CallbackCache cache;
            if (null != objInst.getSkyWalkingDynamicField()) {
                cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
            } else {
                cache = new CallbackCache();
            }
            cache.setCallback(callback);
            objInst.setSkyWalkingDynamicField(cache);
        }
    }
}

And I think that we should add a null check for ContextManager.activeSpan() in CallbackInterceptor.handleMethodException:

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
    Class<?>[] argumentsTypes, Throwable t) {
    ContextManager.activeSpan().log(t);
}

@zifeihan

zifeihan commented 3 years ago

And I think that we should add a null check for ContextManager.activeSpan() in CallbackInterceptor.handleMethodException:

Sorry this is my hand error.

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
    Class<?>[] argumentsTypes, Throwable t) {
    ContextManager.activeSpan().log(t);
}

How about using this judgment,

    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
        Class<?>[] argumentsTypes, Throwable t) {
        CallbackCache cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
        if (null != cache) {
            ContextManager.activeSpan().log(t);
        }
    }