apache / skywalking

APM, Application Performance Monitoring System
https://skywalking.apache.org/
Apache License 2.0
23.9k stars 6.53k forks source link

[Bug] concurrent increase by CounterWindow may break the PriporityQueue #12503

Closed kael-aiur closed 3 months ago

kael-aiur commented 3 months ago

Search before asking

Apache SkyWalking Component

OAP server (apache/skywalking)

What happened

I have a custom meter yaml:

filter: "{ tags -> tags['service'] != null && tags['instance'] != null && tags['resource'] != null  }"
expSuffix: instance(['service'],['instance'], Layer.GENERAL)
metricPrefix: sentinel
metricsRules:
  - name: sw8_receive_request_total_sum
    exp:  sw8_receive_request_total.sum(['service','instance','resource']).increase('PT1M')
  - name: sw8_pass_requests_total_sum
    exp: sw8_pass_requests_total.sum(['service','instance','resource']).increase('PT1M')

when I have many instance to report this meter, sometime get exception stack like blow:

2024-08-02 13:58:24,274 org.apache.skywalking.oap.meter.analyzer.dsl.Expression 89 [KafkaConsumer-1-thread-14] ERROR [] - [9.5.0-d47165d] failed to run "(sw8_receive_request_total.sum(['service','instance','resource']).increase('PT1M')).instance(['service'],['instance'], Layer.GENERAL)"
java.lang.NullPointerException: Cannot read field "_1" because "t2" is null
    at io.vavr.Tuple2.compareTo(Tuple2.java:90) ~[vavr-0.10.3.jar:?]
    at io.vavr.Tuple2.compareTo(Tuple2.java:111) ~[vavr-0.10.3.jar:?]
    at io.vavr.Tuple2.compareTo(Tuple2.java:43) ~[vavr-0.10.3.jar:?]
    at java.util.PriorityQueue.siftDownComparable(PriorityQueue.java:694) ~[?:?]
    at java.util.PriorityQueue.poll(PriorityQueue.java:583) ~[?:?]
    at org.apache.skywalking.oap.meter.analyzer.dsl.counter.CounterWindow.increase(CounterWindow.java:62) ~[classes/:?]
    at org.apache.skywalking.oap.meter.analyzer.dsl.Sample.increase(Sample.java:50) ~[classes/:?]
    at org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily.lambda$increase$16(SampleFamily.java:279) ~[classes/:?]
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[?:?]
    at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992) ~[?:?]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[?:?]
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[?:?]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575) ~[?:?]
    at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:?]
    at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616) ~[?:?]
    at org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily.increase(SampleFamily.java:280) ~[classes/:?]
    at org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily$increase$0.call(Unknown Source) ~[?:?]
    at Script1.run(Script1.groovy:1) ~[?:?]
    at org.apache.skywalking.oap.meter.analyzer.dsl.Expression.run(Expression.java:78) ~[classes/:?]
    at org.apache.skywalking.oap.meter.analyzer.Analyzer.analyse(Analyzer.java:133) ~[classes/:?]
    at org.apache.skywalking.oap.meter.analyzer.MetricConvert.toMeter(MetricConvert.java:109) ~[classes/:?]
    at org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessor.lambda$process$6(MeterProcessor.java:135) ~[classes/:?]
    at java.util.ArrayList.forEach(ArrayList.java:1511) ~[?:?]
    at org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessor.process(MeterProcessor.java:135) ~[classes/:?]
    at org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.MeterServiceHandler.handle(MeterServiceHandler.java:85) ~[classes/:?]
    at org.apache.skywalking.oap.server.analyzer.agent.kafka.KafkaFetcherHandlerRegister.lambda$runTask$1(KafkaFetcherHandlerRegister.java:122) ~[classes/:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:840) [?:?]

as org.apache.skywalking.oap.meter.analyzer.dsl.counter.CounterWindow are single instance, the windows which type ConcurrentHashMap will return the same PriorityQueue object for the same meter, when the same meter increase with concurrent call, some unexcpected exception will occur in PriorityQueue object, source code:

public class CounterWindow {

    public static final CounterWindow INSTANCE = new CounterWindow();

    private final Map<ID, Tuple2<Long, Double>> lastElementMap = new ConcurrentHashMap<>();
    private final Map<ID, Queue<Tuple2<Long, Double>>> windows = new ConcurrentHashMap<>();

    public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) {
        ID id = new ID(name, labels);
        // there will return the same PriorityQueue object for concurrent operation
        Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, unused -> new PriorityQueue<>());
        window.offer(Tuple.of(now, value));  // some unexpected error will occur in this line when concurrent operation
        long waterLevel = now - windowSize;
        Tuple2<Long, Double> peek = window.peek();
        if (peek._1 > waterLevel) {
            return peek;
        }

        Tuple2<Long, Double> result = peek;
        while (peek._1 < waterLevel) {
            result = window.poll();  // some unexpected error will occur in this line when concurrent operation
            peek = window.element();  // some unexpected error will occur in this line when concurrent operation
        }

        // Choose the closed slot to the expected timestamp
        if (waterLevel - result._1 <= peek._1 - waterLevel) {
            return result;
        }

        return peek;
    }
    //....
}

What you expected to happen

for the same PriorityQueue, here a synchronization lock should be added to the window to ensure synchronization operations:

    public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) {
        ID id = new ID(name, labels);
        Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, unused -> new PriorityQueue<>());
        synchronized (window) { // use synchronized key word to lock window
            window.offer(Tuple.of(now, value));
            long waterLevel = now - windowSize;
            Tuple2<Long, Double> peek = window.peek();
            if (peek._1 > waterLevel) {
                return peek;
            }

            Tuple2<Long, Double> result = peek;
            while (peek._1 < waterLevel) {
                result = window.poll();
                peek = window.element();
            }

            // Choose the closed slot to the expected timestamp
            if (waterLevel - result._1 <= peek._1 - waterLevel) {
                return result;
            }

            return peek;
        }
    }

How to reproduce

Through a custom indicator configuration, it must contain an increase call, such as: sw8_receive_request_total.sum(['service','instance','resource']).increase('PT1M'), then use debug breakpoints to wait for a certain amount of custom indicators to be reported. At this time, release the breakpoints and run them all to reappear with a high probability.

Anything else

image In this way, you can see that some queues have been completely destroyed, size < 0, and this queue will permanently fail until the service is restarted.

Are you willing to submit a pull request to fix on your own?

Code of Conduct

wu-sheng commented 3 months ago

@kezhenxu94 Could you recheck?

@kael-aiur As you have addressed the issue, would you send a pull request to review?

kael-aiur commented 3 months ago

@kezhenxu94 Could you recheck?

@kael-aiur As you have addressed the issue, would you send a pull request to review?

yeah, I will send a pull request later