apache / incubator-heron

Apache Heron (Incubating) is a realtime, distributed, fault-tolerant stream processing engine from Twitter
https://heron.apache.org/
Apache License 2.0
3.65k stars 597 forks source link

NullPointerException happened in KafkaSpout running on Heron #3103

Open Yitian-Zhang opened 5 years ago

Yitian-Zhang commented 5 years ago

When I run a topology of Storm with KafkaSpout in Heron, the following exception occurs:

[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.HeronInstance: 
Starting instance container_2_ads_2 for topology AdvertisingTopology and topologyId AdvertisingTopologyf7b4acbe-bdbc-4772-aaa4-9dd2f113f405 for component ads with taskId 2 and componentIndex 0 and stmgrId stmgr-2 and stmgrPort 31162 and metricsManagerPort 31067  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.HeronInstance: System Config: {heron.streammgr.network.backpressure.lowwatermark.mb=50, heron.streammgr.connection.write.batch.size.mb=1, heron.streammgr.stateful.buffer.size.mb=100, heron.instance.internal.bolt.write.queue.capacity=128, heron.instance.tuning.expected.spout.read.queue.size=512, heron.metricsmgr.network.write.batch.size.bytes=ByteAmount{32768 bytes}, heron.instance.reconnect.streammgr.interval.sec=PT5S, heron.instance.tuning.interval.ms=PT0.1S, heron.instance.emit.batch.size.bytes=ByteAmount{32768 bytes}, heron.logging.directory=log-files, heron.check.tmaster.location.interval.sec=120, heron.instance.reconnect.metricsmgr.interval.sec=PT5S, heron.streammgr.client.reconnect.tmaster.max.attempts=30, heron.streammgr.network.backpressure.highwatermark.mb=100, heron.instance.network.read.batch.size.bytes=ByteAmount{32768 bytes}, heron.instance.tuning.expected.metrics.write.queue.size=8, heron.instance.internal.spout.write.queue.capacity=128, heron.instance.force.exit.timeout.ms=PT2S, heron.tmaster.network.stats.options.maximum.packet.mb=1, heron.streammgr.xormgr.rotatingmap.nbuckets=3, heron.instance.set.control.tuple.capacity=1024, heron.metricsmgr.network.read.batch.size.bytes=ByteAmount{32768 bytes}, heron.streammgr.client.reconnect.tmaster.interval.sec=10, heron.instance.execute.batch.time.ms=PT0.016S, heron.metrics.export.interval.sec=PT1M, heron.streammgr.connection.read.batch.size.mb=1, heron.streammgr.cache.drain.size.mb=100, heron.tmaster.network.master.options.maximum.packet.mb=16, heron.tmaster.establish.retry.interval.sec=1, heron.metrics.max.exceptions.per.message.count=1024, heron.tmaster.stmgr.state.timeout.sec=60, heron.instance.network.write.batch.size.bytes=ByteAmount{32768 bytes}, heron.logging.err.threshold=3, heron.tmaster.network.controller.options.maximum.packet.mb=1, heron.tmaster.metrics.collector.maximum.exception=256, heron.instance.network.write.batch.time.ms=PT0.016S, heron.instance.network.options.socket.send.buffer.size.bytes=ByteAmount{6 MB (6553600 bytes)}, heron.streammgr.mempool.max.message.number=512, heron.logging.maximum.size.mb=100, heron.streammgr.tmaster.heartbeat.interval.sec=10, heron.instance.network.read.batch.time.ms=PT0.016S, heron.tmaster.metrics.network.bindallinterfaces=false, heron.streammgr.network.options.maximum.packet.mb=10, heron.instance.tuning.expected.bolt.write.queue.size=8, heron.metricsmgr.network.options.socket.received.buffer.size.bytes=ByteAmount{8 MB (8738000 bytes)}, heron.logging.maximum.files=5, heron.instance.network.options.socket.received.buffer.size.bytes=ByteAmount{8 MB (8738000 bytes)}, heron.instance.execute.batch.size.bytes=ByteAmount{32768 bytes}, heron.instance.acknowledgement.nbuckets=10, heron.metricsmgr.network.read.batch.time.ms=PT0.016S, heron.metricsmgr.network.options.socket.send.buffer.size.bytes=ByteAmount{6 MB (6553600 bytes)}, heron.metricsmgr.network.options.maximum.packetsize.bytes=ByteAmount{1 MB (1048576 bytes)}, heron.instance.tuning.expected.bolt.read.queue.size=8, heron.logging.flush.interval.sec=10, heron.streammgr.cache.drain.frequency.ms=10, heron.tmaster.establish.retry.times=30, heron.instance.network.options.maximum.packetsize.bytes=ByteAmount{10 MB (10485760 bytes)}, heron.instance.tuning.current.sample.weight=0.8, heron.instance.reconnect.streammgr.times=60, heron.logging.prune.interval.sec=300, heron.instance.reconnect.metricsmgr.times=60, heron.tmaster.metrics.collector.maximum.interval.min=PT3H, heron.tmaster.metrics.collector.purge.interval.sec=PT1M, heron.streammgr.client.reconnect.interval.sec=1, heron.instance.internal.spout.read.queue.capacity=1024, heron.instance.ack.batch.time.ms=PT0.128S, heron.instance.set.data.tuple.size.bytes=ByteAmount{8 MB (8388608 bytes)}, heron.instance.tuning.expected.spout.write.queue.size=8, heron.instance.internal.bolt.read.queue.capacity=128, heron.instance.set.data.tuple.capacity=1024, heron.instance.metrics.system.sample.interval.sec=PT10S, heron.streammgr.network.backpressure.threshold=3, heron.instance.emit.batch.time.ms=PT0.016S, heron.metricsmgr.network.write.batch.time.ms=PT0.016S, heron.instance.internal.metrics.write.queue.capacity=128}  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: Connecting to endpoint: /127.0.0.1:31162  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: Connecting to endpoint: /127.0.0.1:31067  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Connected to Stream Manager. Ready to send register request  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: Connected to Metrics Manager. Ready to send register request  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Stop writing due to not yet connected to Stream Manager.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Stop writing due to not yet connected to Stream Manager.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: We registered ourselves to the Stream Manager  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Handling assignment message from response  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: We received a new Physical Plan.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Push to Slave  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: We registered ourselves to the Metrics Manager  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.misc.PhysicalPlanHelper: Building configs for component: ads  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.misc.PhysicalPlanHelper: Added topology-level configs: {topology.acker.executors=2, topology.workers=3, topology.skip.missing.kryo.registrations=false, topology.enable.message.timeouts=true, topology.serializer.classname=org.apache.storm.serialization.HeronPluggableSerializerDelegate, topology.debug=false, topology.max.spout.pending=100, topology.kryo.factory=org.apache.storm.serialization.DefaultKryoFactory, topology.fall.back.on.java.serialization=false, topology.name=AdvertisingTopology, topology.component.parallelism=1, topology.stmgrs=3, topology.reliability.mode=ATLEAST_ONCE, topology.message.timeout.secs=30}  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.misc.PhysicalPlanHelper: Added component-specific configs: {topology.acker.executors=2, config.zkRoot=/ad-events/6647e83d-6bd8-454e-ad91-d3ec0a012e62, topology.workers=3, topology.skip.missing.kryo.registrations=false, topology.enable.message.timeouts=true, topology.serializer.classname=org.apache.storm.serialization.HeronPluggableSerializerDelegate, topology.debug=false, topology.max.spout.pending=100, topology.kryo.factory=org.apache.storm.serialization.DefaultKryoFactory, topology.fall.back.on.java.serialization=false, topology.name=AdvertisingTopology, topology.component.parallelism=1, config.topics=ad-events, topology.stmgrs=3, topology.reliability.mode=ATLEAST_ONCE, topology.message.timeout.secs=30, config.zkNodeBrokers=/brokers}  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Slave: Incarnating ourselves as ads with task id 2  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.spout.SpoutInstance: Is this topology stateful: false  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.spout.SpoutInstance: Enable Ack: true  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.spout.SpoutInstance: EnableMessageTimeouts: true  
[2018-11-01 22:43:49 +0800] [SEVERE] com.twitter.heron.instance.HeronInstance: Exception caught in thread: SlaveThread with id: 12 
java.lang.NullPointerException
    at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:80)
    at org.apache.storm.topology.IRichSpoutDelegate.open(IRichSpoutDelegate.java:53)
    at com.twitter.heron.instance.spout.SpoutInstance.init(SpoutInstance.java:173)
    at com.twitter.heron.instance.Slave.startInstanceIfNeeded(Slave.java:222)
    at com.twitter.heron.instance.Slave.handleNewAssignment(Slave.java:173)
    at com.twitter.heron.instance.Slave.handleNewPhysicalPlan(Slave.java:349)
    at com.twitter.heron.instance.Slave.access$300(Slave.java:49)
    at com.twitter.heron.instance.Slave$1.run(Slave.java:118)
    at com.twitter.heron.common.basics.WakeableLooper.executeTasksOnWakeup(WakeableLooper.java:160)
    at com.twitter.heron.common.basics.WakeableLooper.runOnce(WakeableLooper.java:89)
    at com.twitter.heron.common.basics.WakeableLooper.loop(WakeableLooper.java:79)
    at com.twitter.heron.instance.Slave.run(Slave.java:180)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.HeronInstance: Waiting for process exit in PT2S  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Slave: Closing the Slave Thread  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.metrics.MetricsCollector: Forcing to gather all metrics and flush out.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Slave: Shutting down the instance  
[2018-11-01 22:43:49 +0800] [WARNING] com.twitter.heron.common.basics.SysUtils: Failed to close com.twitter.heron.instance.Slave@4bef4d93 
java.lang.NullPointerException
    at org.apache.storm.kafka.KafkaSpout.close(KafkaSpout.java:136)
    at org.apache.storm.topology.IRichSpoutDelegate.close(IRichSpoutDelegate.java:58)
    at com.twitter.heron.instance.spout.SpoutInstance.clean(SpoutInstance.java:195)
    at com.twitter.heron.instance.spout.SpoutInstance.shutdown(SpoutInstance.java:204)
    at com.twitter.heron.instance.Slave.close(Slave.java:238)
    at com.twitter.heron.common.basics.SysUtils.closeIgnoringExceptions(SysUtils.java:66)
    at com.twitter.heron.instance.HeronInstance$SlaveExitTask.run(HeronInstance.java:428)
    at com.twitter.heron.instance.HeronInstance$DefaultExceptionHandler.handleException(HeronInstance.java:396)
    at com.twitter.heron.instance.HeronInstance$DefaultExceptionHandler.uncaughtException(HeronInstance.java:360)
    at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1057)
    at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1052)
    at java.lang.Thread.dispatchUncaughtException(Thread.java:1959)

[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Gateway: Closing the Gateway thread  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.metrics.MetricsCollector: Forcing to gather all metrics and flush out.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: Flushing all pending data in MetricsManagerClient  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Flushing all pending data in StreamManagerClient  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.SocketChannelHelper: Forcing to flush data to socket with best effort.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: To stop the HeronClient.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: MetricsManagerClient exits  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.SocketChannelHelper: Forcing to flush data to socket with best effort.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: To stop the HeronClient.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: StreamManagerClient exits.  
[2018-11-01 22:43:49 +0800] [SEVERE] com.twitter.heron.instance.HeronInstance: Instance Process exiting.

The codes of the topology as follows:

String zkServerHosts = "MY_ZK_IP:2181";
ZkHosts hosts = new ZkHosts(zkServerHosts);

SpoutConfig spoutConfig = new SpoutConfig(hosts, kafkaTopic, "/" + kafkaTopic, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

The location of the NPE is 80 lines of the open method in KafkaSpout class:

public Object getValueAndReset() {
    List<PartitionManager> pms = KafkaSpout.this.coordinator.getMyManagedPartitions();
    Set<Partition> latestPartitions = new HashSet();
    Iterator var3 = pms.iterator();

    PartitionManager pm;
    while(var3.hasNext()) { // the line of NPE happened
         pm = (PartitionManager)var3.next();
         latestPartitions.add(pm.getPartition());
    }

    this.kafkaOffsetMetric.refreshPartitions(latestPartitions);
    var3 = pms.iterator();

    while(var3.hasNext()) {
         pm = (PartitionManager)var3.next();      
         this.kafkaOffsetMetric.setOffsetData(pm.getPartition(), 
         pm.getOffsetData());
    }

    return this.kafkaOffsetMetric.getValueAndReset();
 }

The dependencies of the pom.xml as follows:

            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka</artifactId>
                <version>1.0.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_${scala.binary.version}</artifactId>
                <version>0.8.2.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>1.0.2</version>
            </dependency>
            <dependency>
                <groupId>com.twitter.heron</groupId>
                <artifactId>heron-storm</artifactId>
                <version>0.17.5</version>
            </dependency>

I have tried to change storm-kafka to heron-kafka and added heron-api, what's more, removed the storm-kafka. But there is still the same NPE problem. I don't know what caused this problem and how to fix it. Any help is grateful.

simingweng commented 5 years ago

If you want to use Apache Storm KafkaSpout with Heron, you can use

`

org.apache.storm
        <artifactId>storm-kafka-client</artifactId>
        <version>1.2.2</version>
    </dependency>

`

But, there's a source file from Apache Storm is missing in the Heron Storm compatibility library which is required by the dependency above. You will need to manually copy org.apache.storm.utils.Time class from Apache Storm source tree into your project. We was using Heron 0.17.8 with storm-kafka-client 1.2.2 successfully, but recently rewrote a new one for Heron due to performance issue we found in storm-kafka-client.