siddhi-io / siddhi

Stream Processing and Complex Event Processing Engine
http://siddhi.io
Apache License 2.0
1.52k stars 527 forks source link

Partitioned Distributed Sink encounter NullPointerException #1796

Open GitHub-Yann opened 1 year ago

GitHub-Yann commented 1 year ago

Description: Hello , I am new for Siddhi. When I follow the "Distributed Sink" of "Query Guide" , I can't make the demo run successfully.

(1)Is my usage wrong ?

Affected Siddhi Version: siddhi-tooling-5.1.0

OS, DB, other environment details and versions:
CentOS Linux release 7.5.1804

Steps to reproduce: I have a service which contains 2 interfaces: (1)/api/demo/test/post (2)/api/demo/test/post2 they all need User entity.

below is my siddhi settings:

@App:name("HelloWorldPartitionApp")

@source(type = 'http', receiver.url = "http://0.0.0.0:8008/cargo123", @map(type = 'json'))
define stream CargoStream1 (name string,age int);

@sink(type='http',method='POST',@map(type='json',validate.json='true',@payload("""{"userName":"{{weight}}","age":"{{age}}","mobile":"{{totalWeight}}"}""")),
    @distribution(strategy='partitioned', partitionKey='weight',
        @destination(publisher.url='http://192.168.1.39:9911/api/demo/test/post'),
        @destination(publisher.url='http://192.168.1.39:9911/api/demo/test/post2')))
define stream OutputStream1(weight string,age int, totalWeight long);

@info(name='HelloWorldPartitionQuery')
from CargoStream1
select name as weight, age, sum(age) as totalWeight
insert into OutputStream1;

my request body for cargo123:

{
    "name": "Tom",
    "age": 28
}

Related Issues:

[2022-10-18_15-40-38_886] ERROR {io.siddhi.core.stream.StreamJunction} - Error in 'HelloWorldPartitionApp' after consuming events from Stream 'OutputStream1', null. Hence, dropping event 'StreamEvent{ timestamp=1666078838886, beforeWindowData=null, onAfterWindowData=null, outputData=[Tom, 28, 28], type=CURRENT, next=null}' (Encoded) 

java.lang.NullPointerException
    at io.siddhi.extension.io.http.sink.HttpSink.sendRequest(HttpSink.java:841)
    at io.siddhi.extension.io.http.sink.HttpSink.publish(HttpSink.java:618)
    at io.siddhi.core.util.transport.SingleClientDistributedSink.publish(SingleClientDistributedSink.java:61)
    at io.siddhi.core.stream.output.sink.distributed.DistributedTransport.publish(DistributedTransport.java:125)
    at io.siddhi.core.stream.output.sink.Sink.publish(Sink.java:182)
    at io.siddhi.extension.map.json.sinkmapper.JsonSinkMapper.mapAndSend(JsonSinkMapper.java:211)
    at io.siddhi.core.stream.output.sink.SinkMapper.mapAndSend(SinkMapper.java:180)
    at io.siddhi.core.stream.output.sink.SinkCallback.receive(SinkCallback.java:55)
    at io.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:100)
    at io.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:176)
    at io.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:465)
    at io.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
    at io.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:104)
    at io.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:44)
    at io.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
    at io.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:183)
    at io.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:90)
    at io.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:128)
    at io.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:199)
    at io.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:474)
    at io.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:34)
    at io.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:45)
    at io.siddhi.core.stream.input.InputHandler.send(InputHandler.java:78)
    at io.siddhi.core.stream.input.source.PassThroughSourceHandler.sendEvent(PassThroughSourceHandler.java:35)
    at io.siddhi.core.stream.input.source.InputEventHandler.sendEvent(InputEventHandler.java:81)
    at io.siddhi.extension.map.json.sourcemapper.JsonSourceMapper.mapAndProcess(JsonSourceMapper.java:234)
    at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:152)
    at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:118)
    at io.siddhi.extension.io.http.source.HttpWorkerThread.run(HttpWorkerThread.java:62)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)