wso2 / product-sp

An open source, cloud-native streaming data integration and analytics product optimized for agile digital businesses
https://wso2.com/analytics-and-stream-processing/
Apache License 2.0
118 stars 132 forks source link

Exception with aggregation in 4.4.0 #1027

Open p-litvinov opened 5 years ago

p-litvinov commented 5 years ago

Description: The error occured in WSO2-SP 4.4.0 when trying to use read-only aggregation from @store. In the WSO2-SP 4.3.0 it works fine.

osgi> [2019-07-03 23:03:19,540] ERROR

{org.wso2.siddhi.core.stream.StreamJunction}

  • Error in 'demoError1' after consuming events from Stream 'RebuildCmd', Error executing 'store_select_query_TradeClientAggregation_YEARS', null. Hence, dropping event 'StreamEvent

{ timestamp=1562184199538, beforeWindowData=null, onAfterWindowData=null, outputData=[1], type=CURRENT, next=null} ' org.wso2.siddhi.core.exception.StoreQueryRuntimeException: Error executing 'store_select_query_TradeClientAggregation_YEARS', null at org.wso2.siddhi.core.query.SelectStoreQueryRuntime.execute(SelectStoreQueryRuntime.java:68) at org.wso2.siddhi.core.aggregation.RecreateInMemoryData.recreateInMemoryData(RecreateInMemoryData.java:123) at org.wso2.siddhi.core.aggregation.AggregationRuntime.recreateInMemoryData(AggregationRuntime.java:365) at org.wso2.siddhi.core.aggregation.AggregationRuntime.find(AggregationRuntime.java:211) at org.wso2.siddhi.core.query.processor.stream.window.AggregateWindowProcessor.find(AggregateWindowProcessor.java:78) at org.wso2.siddhi.core.query.input.stream.join.JoinProcessor.process(JoinProcessor.java:87) at org.wso2.siddhi.core.query.processor.stream.window.LengthWindowProcessor.process(LengthWindowProcessor.java:137) at org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.processEventChunk(WindowProcessor.java:65) at org.wso2.siddhi.core.query.processor.stream.AbstractStreamProcessor.process(AbstractStreamProcessor.java:123) at org.wso2.siddhi.core.query.input.stream.join.JoinProcessor.process(JoinProcessor.java:123) at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:187) at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:97) at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:122) at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:181) at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:405) at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56) at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:97) at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46) at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:98) at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:187) at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:97) at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:133) at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:204) at org.wso2.siddhi.core.trigger.StartTrigger.start(StartTrigger.java:74) at org.wso2.siddhi.core.SiddhiAppRuntime.startWithoutSources(SiddhiAppRuntime.java:398) at org.wso2.siddhi.core.SiddhiAppRuntime.start(SiddhiAppRuntime.java:380) at org.wso2.carbon.siddhi.editor.core.internal.DebugRuntime.start(DebugRuntime.java:68) at org.wso2.carbon.siddhi.editor.core.internal.DebugProcessorService.start(DebugProcessorService.java:37) at org.wso2.carbon.siddhi.editor.core.internal.EditorMicroservice.start(EditorMicroservice.java:650) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.wso2.msf4j.internal.router.HttpMethodInfo.invokeResource(HttpMethodInfo.java:187) at org.wso2.msf4j.internal.router.HttpMethodInfo.invoke(HttpMethodInfo.java:143) at org.wso2.msf4j.internal.MSF4JHttpConnectorListener.dispatchMethod(MSF4JHttpConnectorListener.java:218) at org.wso2.msf4j.internal.MSF4JHttpConnectorListener.lambda$onMessage$57(MSF4JHttpConnectorListener.java:129) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: java.lang.NullPointerException at org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable.getConnection(RDBMSEventTable.java:1423) at org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable.getConnection(RDBMSEventTable.java:1411) at org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable.query(RDBMSEventTable.java:1712) at org.wso2.siddhi.core.table.record.AbstractQueryableRecordTable.query(AbstractQueryableRecordTable.java:90) at org.wso2.siddhi.core.query.SelectStoreQueryRuntime.execute(SelectStoreQueryRuntime.java:55) ... 39 more

Suggested Labels:

Suggested Assignees:

Affected Product Version:

OS, DB, other environment details and versions:

Steps to reproduce: Create a aggregation with store in DB Postgres or Mysql. Put several events in the aggregation. Run the second script with read-only aggregation: `@App:name("demoError1") @App:description("Aggregation with store exception.")

-- Trade aggregation, read-only --

define stream TradesVolumeFake( timestamp long, exchange string, client_id string, trade_volume double ); @store(type = 'rdbms' , datasource = 'AF2_DB') @purge(enable="false") define aggregation TradeClientAggregation from TradesVolumeFake select exchange, client_id, sum(trade_volume) as totalVolume, max(trade_volume) as maxVolume, min(trade_volume) as minVolume, count() as countTrades group by exchange, client_id aggregate by timestamp every sec ... year; define trigger TriggerPreLoadTrades at 'start'; define stream RebuildCmd (t int);

@sink(type='log', prefix='CalcStream') define stream CalcStream(_timestamp long, exchange string, client_id string, countTrades long); from TriggerPreLoadTrades select 1 as t insert into RebuildCmd; from RebuildCmd as cmd right outer join TradeClientAggregation as agg on not (agg.exchange is null) --agg.exchange =='moex.eq' and agg.client_id == 'BC15981' within -- '2019-05-09 00:00:00', '2019-05-15 00:00:00' -- for tests only -- time:timestampInMilliseconds(time:dateSub(time:dateFormat(currentTimeMillis(), 'yyyy-MM-dd 00:00:00.000'),100,'DAY','yyyy-MM-dd HH:mm:ss.SSS')), -- 100 DAYS is parameter time:timestampInMilliseconds(time:dateFormat(currentTimeMillis(), 'yyyy-MM-dd 00:00:00.000')) per 'minutes' select agg.AGG_TIMESTAMP as _timestamp, agg.exchange as exchange, agg.client_id as client_id, agg.countTrades as countTrades --- time:dateFormat(agg.AGG_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as time order by _timestamp ---limit 10 insert into CalcStream;`

Related Issues:

niveathika commented 5 years ago

@p-litvinov Was this run on SP editor or worker runtime?

p-litvinov commented 5 years ago

I had tried to run it on both runtimes: editor and worker. Every runtime has the issue in 4.4.0. But with 4.3.0 version it works without any exception with worker and editor.

niveathika commented 5 years ago

It seems the data source is not initialized properly, did you see any other errors? Was the app deployed successfully?

p-litvinov commented 5 years ago

Yes. App is deployed successfully and data source is initialized properly. I have another app where this aggregation with the same data source works in write mode. And it works fine. In the 4.3.0 both apps work fine with the data source.

p-litvinov commented 5 years ago

Maybe in the 4.4.0 it needs some additional rule in the app when using an aggregation in the read only mode?

niveathika commented 5 years ago

No, it does not, read-only mode need not connect to a source that's all. Siddi app with the read-only aggregation lets define an event table and see if we can connect to the data source.

This is not reproducible in our end.