wso2 / product-ei

An open source, a high-performance hybrid integration platform that allows developers quick integration with any application, data, or system.
https://wso2.com/integration/
Apache License 2.0
373 stars 279 forks source link

[Inbound] Kafka inbound endpoint doesn't work with kafka_2.12-1.1.0 or kafka_2.11-1.1.0 #2239

Open indikasampath2000 opened 6 years ago

indikasampath2000 commented 6 years ago

Description: Kafka inbound endpoint doesn't receive any messages from the topic with above Kafka versions. The consumer creates successfully. No errors. You should copy following jars from {KAFKA_HOME}/lib to {EI_HOME}/lib (version kafka_2.12-1.1.0)

kafka_2.12-1.1.0.jar kafka-clients-1.1.0.jar metrics-core-2.2.0.jar scala-library-2.12.4.jar scala-logging_2.12-3.7.2.jar zkclient-0.10.jar zookeeper-3.4.10.jar

Suggested Labels:

Suggested Assignees:

Affected Product Version: Enterprise Integrator 6.2.0

OS, DB, other environment details and versions:

Steps to reproduce: Create a Kafka inbound endpoint.

<inboundEndpoint name="HelloWorldKafkaIEP" onError="HelloWorldKafkaErrorSeq" protocol="kafka" sequence="HelloWorldKafkaInSeq" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="interval">100</parameter>
        <parameter name="sequential">true</parameter>
        <parameter name="coordination">true</parameter>
        <parameter name="zookeeper.connect">localhost:2181</parameter>
        <parameter name="group.id">hello-world-group</parameter>
        <parameter name="content.type">application/xml</parameter>
        <parameter name="consumer.type">highlevel</parameter>
        <parameter name="topics">HelloWorldTopic</parameter>
        <parameter name="auto.commit.enable">false</parameter>
        <parameter name="auto.offset.reset">smallest</parameter>
        <parameter name="exclude.internal.topics">false</parameter>
        <parameter name="partition.assignment.strategy">roundrobin</parameter>
        <parameter name="offsets.storage">zookeeper</parameter>
        <parameter name="dual.commit.enabled">false</parameter>
    </parameters>
</inboundEndpoint>

Related Issues:

arunans23 commented 5 years ago

I used the latest Kafka version [kafka_2.11-2.2.0], and the Kafka inbound listener is not even starting and throwing the following error.

Caused by: java.lang.ClassNotFoundException: kafka.consumer.ConsumerTimeoutException cannot be found by synapse-core_2.1.7.wso2v111
    at org.eclipse.osgi.internal.loader.BundleLoader.findClassInternal(BundleLoader.java:475)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:421)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:412)
    at org.eclipse.osgi.internal.baseadaptor.DefaultClassLoader.loadClass(DefaultClassLoader.java:107)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 26 more

The Kafka version we are recommending in the documentation is very old (released March 2014). We should try to support the newer versions of Kafka in the next releases.

Ibaqu commented 4 years ago

Kafka Inbound listener can be configured using Custom Inbound EP : https://github.com/wso2-extensions/esb-inbound-kafka/blob/release-1.0.7/docs/config.md

sreenath-centime commented 4 years ago

I am also facing issues while using kafka inbound endpoint but was able to find a workaround

Approach 1 : I am using kafka_2.12-2.2.1 version and and created inbound endpoint with inbound endpoint creation type as kafka. I see the below error while starting the esb project...

Caused by: java.lang.ClassNotFoundException: kafka.consumer.ConsumerTimeoutException cannot be found by synapse-core_2.1.7.wso2v133

Approach 2 : However I was able to fix this issue for the same kafka version by creating inbound endpoint with inbound endpoint creation type as custom instead of kafka as mentioned in one of the above links.

Could anyone please tell me what is the right way to do it. I am trying to fix this issue by using approach 1.

arunans23 commented 4 years ago

@sreenath-centime Currently, approach 2 is recommended since approach 1 has some issues.

sreenath-centime commented 4 years ago

@arunans23 , thanks for the prompt response.

sreenath-centime commented 4 years ago

Hi @arunans23 I have run into one more problem related to using kafka inbound endpoint. So as suggested above, I used custom inbound endpoint for kafka and was able to proceed with my work. Approach 1 : I am using wso2 integration studio 7.0.0 version, and configured custom inbound endpoint for kafka, version 2.12-2.2.1 as suggested in the documentation https://ei.docs.wso2.com/en/latest/micro-integrator/setup/feature_configs/configuring-kafka/ And I am using the org.apache.synapse.kafka.poll-1.0.9.jar file and everything is working fine.

Aproach 2: I have downloaded a wso2 Integration Studio recently and it by default downloaded 7.0.2 version, and followed the same steps as mentioned in aproach 1. And while starting my kafka inbound application, its failing with the below error There is already an open bug on this and we are not able to proceed further with this latest version because of this issue https://github.com/wso2/devstudio-tooling-ei/issues/873#issuecomment-664838657

Caused by: java.lang.ClassNotFoundException: org.wso2.carbon.inbound.endpoint.protocol.generic.GenericPollingConsumer cannot be found by org.apache.synapse.kafka.poll_1.0.9_1.0.0

We tried with the latest version of inbound endpoint org.apache.synapse.kafka.poll_1.0.10.jar file, but with no luck. We are still using the kafka version as 2.12-2.2.1.

Kindly let us know how to proceed further on this issue or any other alternative you suggest? Thanks in advance.

arunans23 commented 4 years ago

@sreenath-centime We are very close to releasing Integration Studio 7.1.0. The MI 1.2.0 which resides inside it should work with the org.apache.synapse.kafka.poll_1.0.10.jar without any issues. I'll update this thread once the release work is completed. You can try out with the Pre releases if it is very urgent for you. https://github.com/wso2/devstudio-tooling-ei/releases

sreenath-centime commented 4 years ago

Thanks a lot @arunans23 I will try it out

sreenath-centime commented 3 years ago

@arunans23 I have tried using the KafkaInboundEndpoint with IntegrationStudio 7.1.0 version. I still see that the KafkaInboundEndpoint has issues. Below is the error I get, when I start my application which has a KafkaInboundEndpoint.. Kindly let us know if this issue is fixed in the 7.1.0 version or Should we continue to use the CustomInboundEndpoint. We want to make use of the properties which KafkaInboundEndpoint provides and which I guess are not supported by CustomInboundEndpoint. Please confirm.

[2021-04-19 14:30:04,479] ERROR {org.apache.synapse.deployers.AbstractSynapseArtifactDeployer} - Deployment of the Synapse Artifact from file : /Applications/IntegrationStudio.app/Contents/Eclipse/runtime/microesb/tmp/carbonapps/-1234/1618822804300TestCompositeApplication_1.0.0.car/KafkaInboundEndpoint_1.0.0/KafkaInboundEndpoint-1.0.0.xml : Failed! java.lang.NoClassDefFoundError: kafka/consumer/ConsumerTimeoutException at org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKAPollingConsumer.startsMessageListener(KAFKAPollingConsumer.java:79) at org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKAProcessor.init(KAFKAProcessor.java:97) at org.apache.synapse.inbound.InboundEndpoint.init(InboundEndpoint.java:78) at org.apache.synapse.deployers.InboundEndpointDeployer.deploySynapseArtifact(InboundEndpointDeployer.java:57) at org.apache.synapse.deployers.AbstractSynapseArtifactDeployer.deploy(AbstractSynapseArtifactDeployer.java:197) at org.wso2.micro.integrator.initializer.deployment.synapse.deployer.SynapseAppDeployer.deployArtifactType(SynapseAppDeployer.java:1086) at org.wso2.micro.integrator.initializer.deployment.synapse.deployer.SynapseAppDeployer.deployArtifacts(SynapseAppDeployer.java:123) at org.wso2.micro.integrator.initializer.deployment.application.deployer.CAppDeploymentManager.deploy(CAppDeploymentManager.java:129) at org.wso2.micro.integrator.initializer.deployment.AppDeployerServiceComponent.activate(AppDeployerServiceComponent.java:80) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.eclipse.equinox.internal.ds.model.ServiceComponent.activate(ServiceComponent.java:260) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.activate(ServiceComponentProp.java:146) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.build(ServiceComponentProp.java:345) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponent(InstanceProcess.java:620) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponents(InstanceProcess.java:197) at org.eclipse.equinox.internal.ds.Resolver.getEligible(Resolver.java:343) at org.eclipse.equinox.internal.ds.SCRManager.serviceChanged(SCRManager.java:222) at org.eclipse.osgi.internal.serviceregistry.FilteredServiceListener.serviceChanged(FilteredServiceListener.java:113) at org.eclipse.osgi.internal.framework.BundleContextImpl.dispatchEvent(BundleContextImpl.java:985) at org.eclipse.osgi.framework.eventmgr.EventManager.dispatchEvent(EventManager.java:234) at org.eclipse.osgi.framework.eventmgr.ListenerQueue.dispatchEventSynchronous(ListenerQueue.java:151) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEventPrivileged(ServiceRegistry.java:866) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEvent(ServiceRegistry.java:804) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistrationImpl.register(ServiceRegistrationImpl.java:130) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.registerService(ServiceRegistry.java:228) at org.eclipse.osgi.internal.framework.BundleContextImpl.registerService(BundleContextImpl.java:525) at org.eclipse.osgi.internal.framework.BundleContextImpl.registerService(BundleContextImpl.java:544) at org.wso2.micro.integrator.initializer.ServiceBusInitializer.activate(ServiceBusInitializer.java:162) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.eclipse.equinox.internal.ds.model.ServiceComponent.activate(ServiceComponent.java:260) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.activate(ServiceComponentProp.java:146) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.build(ServiceComponentProp.java:345) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponent(InstanceProcess.java:620) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponents(InstanceProcess.java:197) at org.eclipse.equinox.internal.ds.Resolver.getEligible(Resolver.java:343) at org.eclipse.equinox.internal.ds.SCRManager.serviceChanged(SCRManager.java:222) at org.eclipse.osgi.internal.serviceregistry.FilteredServiceListener.serviceChanged(FilteredServiceListener.java:113) at org.eclipse.osgi.internal.framework.BundleContextImpl.dispatchEvent(BundleContextImpl.java:985) at org.eclipse.osgi.framework.eventmgr.EventManager.dispatchEvent(EventManager.java:234) at org.eclipse.osgi.framework.eventmgr.ListenerQueue.dispatchEventSynchronous(ListenerQueue.java:151) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEventPrivileged(ServiceRegistry.java:866) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEvent(ServiceRegistry.java:804) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistrationImpl.register(ServiceRegistrationImpl.java:130) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.registerService(ServiceRegistry.java:228) at org.eclipse.osgi.internal.framework.BundleContextImpl.registerService(BundleContextImpl.java:525) at org.eclipse.osgi.internal.framework.BundleContextImpl.registerService(BundleContextImpl.java:544) at org.wso2.micro.integrator.ntask.core.internal.TasksDSComponent.activate(TasksDSComponent.java:88) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.eclipse.equinox.internal.ds.model.ServiceComponent.activate(ServiceComponent.java:260) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.activate(ServiceComponentProp.java:146) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.build(ServiceComponentProp.java:345) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponent(InstanceProcess.java:620) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponents(InstanceProcess.java:197) at org.eclipse.equinox.internal.ds.Resolver.buildNewlySatisfied(Resolver.java:473) at org.eclipse.equinox.internal.ds.Resolver.enableComponents(Resolver.java:217) at org.eclipse.equinox.internal.ds.SCRManager.performWork(SCRManager.java:816) at org.eclipse.equinox.internal.ds.SCRManager$QueuedJob.dispatch(SCRManager.java:783) at org.eclipse.equinox.internal.ds.WorkThread.run(WorkThread.java:89) at org.eclipse.equinox.internal.util.impl.tpt.threadpool.Executor.run(Executor.java:70) Caused by: java.lang.ClassNotFoundException: kafka.consumer.ConsumerTimeoutException cannot be found by synapse-core_2.1.7.wso2v133 at org.eclipse.osgi.internal.loader.BundleLoader.findClassInternal(BundleLoader.java:485) at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:423) at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:415) at org.eclipse.osgi.internal.loader.ModuleClassLoader.loadClass(ModuleClassLoader.java:155) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 68 more

sreenath-centime commented 3 years ago

@arunans23 same issue with IntegrationStudio 7.2.0 version. Only way to use a Kafka Connector in wso2 is by using the custom inbound endpoint, but just want to understand if all the properties mentioned in the documentation for kafkainboundendpoint, can they be used for custominboundendpoint also? Please clarify.

minhtranes commented 2 years ago

Encounter the similar issue when deploy Kafka Inbound Endpoint Studio: 8.0.0 Kafka: kafka_2.12-2.3.0

Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.errors.WakeupException cannot be found by org.apache.synapse.kafka.poll_1.0.10_1.0.0
    at org.eclipse.osgi.internal.loader.BundleLoader.findClassInternal(BundleLoader.java:512)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:423)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:415)
    at org.eclipse.osgi.internal.loader.ModuleClassLoader.loadClass(ModuleClassLoader.java:155)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)