synadia-io / flink-connector-nats

Nats Java Flink Connector
Apache License 2.0
10 stars 2 forks source link

NullPointerException while adding splits. #6

Open scottf opened 8 months ago

scottf commented 8 months ago

Observed behavior

We observed that NATS Flink connector throws exceptions during the recovery process. Below is the stack trace of the problem in NATS Flink connector.

java.lang.NullPointerException: null
    at io.synadia.flink.source.NatsSourceReader.addSplits(NatsSourceReader.java:99) ~[poc-tsi-flink-aggregator-0.2.4-20240209.160817-1.jar:?]
    at org.apache.flink.streaming.api.operators.SourceOperator.open(SourceOperator.java:342) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.1.jar:1.17.1]
    at java.lang.Thread.run(Thread.java:829) ~[?:?]
    Suppressed: java.lang.NullPointerException
        at io.synadia.flink.source.NatsSourceReader.close(NatsSourceReader.java:113) ~[poc-tsi-flink-aggregator-0.2.4-20240209.160817-1.jar:?]
        at org.apache.flink.streaming.api.operators.SourceOperator.close(SourceOperator.java:395) ~[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) ~[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) ~[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1043) ~[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255) ~[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) ~[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:951) ~[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:934) ~[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934) ~[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.1.jar:1.17.1]
        at java.lang.Thread.run(Thread.java:829) ~[?:?]

Expected behavior

A proper error and no null pointer.

Server and client version

N/A

Host environment

No response

Steps to reproduce

TBD

scottf commented 8 months ago

Can you reproduce this? Looking at line 99 of NatsSourceReader

 94:    public void addSplits(List<NatsSubjectSplit> splits) {
 95:        for (NatsSubjectSplit split : splits) {
 96:            LOG.debug("{} | addSplits {}", id, split);
 97:            int ix = subbedSplits.indexOf(split);
 98:            if (ix == -1) {
 99:                dispatcher.subscribe(split.getSubject());
100:                subbedSplits.add(split);
101:            }
102:        }
103:    }

I'm not sure how either dispatcher or split is null.