siddhi-io / siddhi

Stream Processing and Complex Event Processing Engine
http://siddhi.io
Apache License 2.0
1.52k stars 527 forks source link

Siddhi Kafka Connection #1818

Open programmedrn opened 1 year ago

programmedrn commented 1 year ago

Description:

I'm Using siddhi-tooling-5.1.2, and trying to connect to kafka via web editor on chrome.

and this is my code,

`@App:name('KakfaUiSample') @App:description('Receives events via simulation and publish them to a kafka topic in json format')

@sink(type = 'log', @map(type = 'json', validate.json = "true")) define stream pm25LimitedStream(time string, pm25 double); --Send events to kafka_json_topic

@source(type = 'kafka', bootstrap.servers = "kafka109:9092", topic.list = "airmap4", group.id = "SiddhiUiKafka", threading.option = "single.thread", enable.offsets.commit="true", @map(type = 'json')) define stream KafkaAirmapStream (ts string, svcCode string, svcTgtSeq string, spotDevSeq string, groupTagCd string, occDt string, deviceModelId string, pm10 float, pm25 float, triOxygen float);

@info(name = 'pm25Limitation') from KafkaAirmapStream select ts as time, pm25 having pm25 > 70 order by time ASC insert into pm25LimitedStream; It works at the first time, and then I stopped to optimize some values, and then it started to give me an errors, saying [2023-06-08_18-26-51_768] ERROR {io.siddhi.extension.io.kafka.source.ConsumerKafkaGroup} - Error while creating KafkaConsumerThread for topic(s): [airmap4] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@636dc392 rejected from java.util.concurrent.ThreadPoolExecutor@599dc55c[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at io.siddhi.extension.io.kafka.source.ConsumerKafkaGroup.run(ConsumerKafkaGroup.java:111) at io.siddhi.extension.io.kafka.source.KafkaSource.connect(KafkaSource.java:278) at io.siddhi.extension.io.kafka.source.KafkaSource.connect(KafkaSource.java:56) at io.siddhi.core.stream.input.source.Source.connectWithRetry(Source.java:160) at io.siddhi.core.SiddhiAppRuntimeImpl.startSources(SiddhiAppRuntimeImpl.java:502) at io.siddhi.core.SiddhiAppRuntimeImpl.start(SiddhiAppRuntimeImpl.java:427) at io.siddhi.distribution.editor.core.internal.DebugRuntime.start(DebugRuntime.java:93) at io.siddhi.distribution.editor.core.internal.DebugProcessorService.start(DebugProcessorService.java:42) at io.siddhi.distribution.editor.core.internal.EditorMicroservice.start(EditorMicroservice.java:761) at io.siddhi.distribution.editor.core.internal.EditorMicroservice.startWithVariables(EditorMicroservice.java:781) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.wso2.msf4j.internal.router.HttpMethodInfo.invokeResource(HttpMethodInfo.java:187) at org.wso2.msf4j.internal.router.HttpMethodInfo.invoke(HttpMethodInfo.java:143) at org.wso2.msf4j.internal.MSF4JHttpConnectorListener.dispatchMethod(MSF4JHttpConnectorListener.java:218) at org.wso2.msf4j.internal.MSF4JHttpConnectorListener.lambda$onMessage$58(MSF4JHttpConnectorListener.java:129) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) `

This is my /lib folder status kafka_2.13_3.4.0_1.0.0.jar siddhi-execution-json-2.0.5.jar siddhi-execution-regex-5.0.5.jar siddhi-execution-unique-5.0.5.jar siddhi-io-file-2.0.5.jar siddhi-io-kafka-5.0.7.jar siddhi-io-s3-1.0.2.jar siddhi-map-csv-2.0.3.jar siddhi-map-text-2.0.4.jar siddhi-store-rdbms-7.0.5.jar kafka_clients_3.4.0_1.0.0.jar siddhi-execution-list-1.0.0.jar siddhi-execution-reorder-5.0.3.jar siddhi-execution-unitconversion-2.0.2.jar siddhi-io-grpc-1.0.8.jar siddhi-io-nats-2.0.10.jar siddhi-io-tcp-3.0.4.jar siddhi-map-json-5.0.6.jar siddhi-map-xml-5.0.3.jar siddhi-store-redis-3.1.1.jar metrics_core_2.2.0_1.0.0.jar siddhi-execution-map-5.0.5.jar siddhi-execution-string-5.0.9.jar siddhi-io-cdc-2.0.5.jar siddhi-io-http-2.2.0.jar siddhi-io-prometheus-2.1.1.jar siddhi-map-avro-2.0.6.jar siddhi-map-keyvalue-2.0.5.jar siddhi-script-js-5.0.3.jar siddhi_io_kafka_5.0.17_1.0.0.jar scala_library_2.13.10_1.0.0.jar siddhi-execution-math-5.0.4.jar siddhi-execution-time-5.0.4.jar siddhi-io-email-2.0.5.jar siddhi-io-jms-2.0.3.jar siddhi-io-rabbitmq-3.0.4.jar siddhi-map-binary-2.0.4.jar siddhi-map-protobuf-1.0.4.jar siddhi-store-mongodb-2.1.0.jar zookeeper_3.6.3_1.0.0.jar

and my kafka version is 2.13 I'm using java11 What I can't understand is the above code works some time, but not often. How can I deal with this isseu?

Affected Siddhi Version:

OS, DB, other environment details and versions:

Steps to reproduce:

Related Issues: