strimzi / strimzi-lab

A lab focused on deploying streaming applications on top of Strimzi
Apache License 2.0
23 stars 78 forks source link

AMQP KAFKA Connector #30

Closed iamrsaravana closed 3 years ago

iamrsaravana commented 3 years ago

I am planning to pull the data from Hono using Strimzi kafka(Version 0.20.0) connector (Camel AMQP source connector). I have followed the below steps to read data from Hono.

I downloaded Camel-amqp-kafka-connector, JMS Jar files from below link:

https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-amqp-kafka-connector/0.7.0/camel-amqp-kafka-connector-0.7.0-package.tar.gz https://downloads.apache.org/qpid/jms/0.51.0/apache-qpid-jms-0.51.0-bin.tar.gz

After downloaded the above tar and unzipped and created docker image file using below command

cat <Dockerfile FROM strimzi/kafka:0.20.1-kafka-2.6.0 USER root:root RUN mkdir -p /opt/kafka/plugins/camel COPY ./camel-activemq-kafka-connector/* /opt/kafka/plugins/camel/ USER 1001 EOF

Docker build -f ./Dockerfile -t localhost:5000/my-connector-amqp_new . docker push localhost:5000/my-connector-amqp_new

using below command i have created kafkaConnect( here used my local image created above)

apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" spec: image: 10.128.0.6:5000/my-connector-amqp_new replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9092 config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1

**I was referring the below link to create kafkaConnector properties:

https://github.com/apache/camel-kafka-connector/blob/master/examples/CamelAmqpSourceConnector.properties**

Values are in this file as mentioned it below:

name=CamelAmqpSourceConnector topics=mytopic tasks.max=1 connector.class=org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector

camel.source.path.destinationType=queue camel.source.path.destinationName=test-queue

camel.component.amqp.includeAmqpAnnotations=true camel.component.amqp.connectionFactory=#class:org.apache.qpid.jms.JmsConnectionFactory camel.component.amqp.connectionFactory.remoteURI=amqp://localhost:5672 camel.component.amqp.username=admin camel.component.amqp.password=admin camel.component.amqp.testConnectionOnStartup=true

I am using the below configurations. Could you suggest me what is the correct value for this property?

camel.component.amqp.connectionFactory

apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: name: camelamqpsourceconnector labels: strimzi.io/cluster: my-connect-cluster-new spec: class: org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector tasksMax: 1 config: camel.component.amqp.includeAmqpAnnotations: true camel.component.amqp.connectionFactory: org.apache.qpid.jms.JmsConnectionFactory camel.component.amqp.connectionFactory.remoteURI: amqp://$( kubectl get service eclipse-hono-dispatch-router-ext --output="jsonpath={.spec.clusterIP}" -n hono):15671 camel.component.amqp.username: consumer@HONO camel.component.amqp.password: verysecret camel.component.amqp.testConnectionOnStartup: true camel.source.kafka.topic: mytopic camel.source.path.destinationType: queue camel.source.path.destinationName: test-queue

I am getting below error messages:

Could you help me, what values we need to give for the below properties:

camel.component.amqp.connectionFactory: org.apache.qpid.jms.JmsConnectionFactory camel.component.amqp.connectionFactory.remoteURI: amqp://$( kubectl get service eclipse-hono-dispatch-router-ext --output="jsonpath={.spec.clusterIP}" -n hono):15671

Error message:

2020-12-26 12:50:54,474 INFO Creating task camelamqpsourceconnector-0 (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,479 INFO ConnectorConfig values: config.action.reload = restart connector.class = org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = null name = camelamqpsourceconnector predicates = [] tasks.max = 1 transforms = [] value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,480 INFO EnrichedConnectorConfig values: config.action.reload = restart connector.class = org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = null name = camelamqpsourceconnector predicates = [] tasks.max = 1 transforms = [] value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,481 INFO TaskConfig values: task.class = class org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceTask (org.apache.kafka.connect.runtime.TaskConfig) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,481 INFO Instantiated task camelamqpsourceconnector-0 with version 0.7.0 of type org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceTask (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,482 INFO JsonConverterConfig values: converter.type = key decimal.format = BASE64 schemas.cache.size = 1000 schemas.enable = true (org.apache.kafka.connect.json.JsonConverterConfig) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,483 INFO Set up the key converter class org.apache.kafka.connect.json.JsonConverter for task camelamqpsourceconnector-0 using the worker config (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,485 INFO JsonConverterConfig values: converter.type = value decimal.format = BASE64 schemas.cache.size = 1000 schemas.enable = true (org.apache.kafka.connect.json.JsonConverterConfig) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,486 INFO Set up the value converter class org.apache.kafka.connect.json.JsonConverter for task camelamqpsourceconnector-0 using the worker config (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,486 INFO Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task camelamqpsourceconnector-0 using the worker config (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,492 INFO SourceConnectorConfig values: config.action.reload = restart connector.class = org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = null name = camelamqpsourceconnector predicates = [] tasks.max = 1 topic.creation.groups = [] transforms = [] value.converter = null (org.apache.kafka.connect.runtime.SourceConnectorConfig) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,493 INFO EnrichedConnectorConfig values: config.action.reload = restart connector.class = org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = null name = camelamqpsourceconnector predicates = [] tasks.max = 1 topic.creation.groups = [] transforms = [] value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,497 INFO Initializing: org.apache.kafka.connect.runtime.TransformationChain{} (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,500 INFO ProducerConfig values: acks = -1 batch.size = 16384 bootstrap.servers = [my-cluster-kafka-bootstrap:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = connector-producer-camelamqpsourceconnector-0 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 2147483647 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = false key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 0 max.block.ms = 9223372036854775807 max.in.flight.requests.per.connection = 1 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 2147483647 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer (org.apache.kafka.clients.producer.ProducerConfig) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,517 WARN The configuration 'metrics.context.connect.group.id' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,518 WARN The configuration 'metrics.context.connect.kafka.cluster.id' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,518 INFO Kafka version: 2.6.0 (org.apache.kafka.common.utils.AppInfoParser) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,518 INFO Kafka commitId: 62abe01bee039651 (org.apache.kafka.common.utils.AppInfoParser) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,518 INFO Kafka startTimeMs: 1608987054518 (org.apache.kafka.common.utils.AppInfoParser) [StartAndStopExecutor-connect-1-2] 2020-12-26 12:50:54,543 INFO [Worker clientId=connect-1, groupId=connect-cluster-new] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-12-26 12:50:54,600 INFO Starting CamelSourceTask connector task (org.apache.camel.kafkaconnector.CamelSourceTask) [task-thread-camelamqpsourceconnector-0] 2020-12-26 12:50:54,655 INFO [Producer clientId=connector-producer-camelamqpsourceconnector-0] Cluster ID: AxvFw7iiSDCEUx-RwUy0gw (org.apache.kafka.clients.Metadata) [kafka-producer-network-thread | connector-producer-camelamqpsourceconnector-0] 2020-12-26 12:50:54,660 INFO CamelAmqpSourceConnectorConfig values: camel.aggregation.size = 10 camel.aggregation.timeout = 500 camel.beans.aggregate = null camel.component.amqp.acceptMessagesWhileStopping = false camel.component.amqp.acknowledgementModeName = AUTO_ACKNOWLEDGE camel.component.amqp.allowAutoWiredConnectionFactory = true camel.component.amqp.allowAutoWiredDestinationResolver = true camel.component.amqp.allowReplyManagerQuickStop = false camel.component.amqp.allowSerializedHeaders = false camel.component.amqp.artemisStreamingEnabled = true camel.component.amqp.asyncConsumer = false camel.component.amqp.asyncStartListener = false camel.component.amqp.asyncStopListener = false camel.component.amqp.autoStartup = true camel.component.amqp.autowiredEnabled = true camel.component.amqp.cacheLevel = null camel.component.amqp.cacheLevelName = CACHE_AUTO camel.component.amqp.clientId = null camel.component.amqp.concurrentConsumers = 1 camel.component.amqp.configuration = null camel.component.amqp.connectionFactory = org.apache.qpid.jms.JmsConnectionFactory camel.component.amqp.consumerType = Default camel.component.amqp.defaultTaskExecutorType = null camel.component.amqp.destinationResolver = null camel.component.amqp.disableReplyTo = false camel.component.amqp.durableSubscriptionName = null camel.component.amqp.eagerLoadingOfProperties = false camel.component.amqp.eagerPoisonBody = Poison JMS message due to ${exception.message} camel.component.amqp.errorHandler = null camel.component.amqp.errorHandlerLogStackTrace = true camel.component.amqp.errorHandlerLoggingLevel = WARN camel.component.amqp.exceptionListener = null camel.component.amqp.exposeListenerSession = false camel.component.amqp.headerFilterStrategy = null camel.component.amqp.idleConsumerLimit = 1 camel.component.amqp.idleTaskExecutionLimit = 1 camel.component.amqp.includeAllJMSXProperties = false camel.component.amqp.includeAmqpAnnotations = true camel.component.amqp.jmsKeyFormatStrategy = null camel.component.amqp.jmsMessageType = null camel.component.amqp.lazyCreateTransactionManager = true camel.component.amqp.mapJmsMessage = true camel.component.amqp.maxConcurrentConsumers = null camel.component.amqp.maxMessagesPerTask = -1 camel.component.amqp.messageConverter = null camel.component.amqp.messageCreatedStrategy = null camel.component.amqp.messageIdEnabled = true camel.component.amqp.messageListenerContainerFactory = null camel.component.amqp.messageTimestampEnabled = true camel.component.amqp.password = verysecret camel.component.amqp.pubSubNoLocal = false camel.component.amqp.queueBrowseStrategy = null camel.component.amqp.receiveTimeout = 1000 camel.component.amqp.recoveryInterval = 5000 camel.component.amqp.replyTo = null camel.component.amqp.replyToDeliveryPersistent = true camel.component.amqp.replyToSameDestinationAllowed = false camel.component.amqp.requestTimeoutCheckerInterval = 1000 camel.component.amqp.selector = null camel.component.amqp.subscriptionDurable = false camel.component.amqp.subscriptionName = null camel.component.amqp.subscriptionShared = false camel.component.amqp.taskExecutor = null camel.component.amqp.testConnectionOnStartup = true camel.component.amqp.transacted = false camel.component.amqp.transactedInOut = false camel.component.amqp.transactionManager = null camel.component.amqp.transactionName = null camel.component.amqp.transactionTimeout = -1 camel.component.amqp.transferException = false camel.component.amqp.transferExchange = false camel.component.amqp.useMessageIDAsCorrelationID = false camel.component.amqp.username = consumer@HONO camel.component.amqp.waitForProvisionCorrelationToBeUpdatedCounter = 50 camel.component.amqp.waitForProvisionCorrelationToBeUpdatedThreadSleepingTime = 100 camel.error.handler = default camel.error.handler.max.redeliveries = 0 camel.error.handler.redelivery.delay = 1000 camel.idempotency.enabled = false camel.idempotency.expression.header = null camel.idempotency.expression.type = body camel.idempotency.kafka.bootstrap.servers = localhost:9092 camel.idempotency.kafka.max.cache.size = 1000 camel.idempotency.kafka.poll.duration.ms = 100 camel.idempotency.kafka.topic = kafka_idempotent_repository camel.idempotency.memory.dimension = 100 camel.idempotency.repository.type = memory camel.remove.headers.pattern = camel.source.camelMessageHeaderKey = null camel.source.component = amqp camel.source.contentLogLevel = OFF camel.source.endpoint.acceptMessagesWhileStopping = false camel.source.endpoint.acknowledgementModeName = AUTO_ACKNOWLEDGE camel.source.endpoint.allowReplyManagerQuickStop = false camel.source.endpoint.allowSerializedHeaders = false camel.source.endpoint.artemisStreamingEnabled = true camel.source.endpoint.asyncConsumer = false camel.source.endpoint.asyncStartListener = false camel.source.endpoint.asyncStopListener = false camel.source.endpoint.autoStartup = true camel.source.endpoint.cacheLevel = null camel.source.endpoint.cacheLevelName = CACHE_AUTO camel.source.endpoint.clientId = null camel.source.endpoint.concurrentConsumers = 1 camel.source.endpoint.connectionFactory = null camel.source.endpoint.consumerType = Default camel.source.endpoint.defaultTaskExecutorType = null camel.source.endpoint.destinationResolver = null camel.source.endpoint.disableReplyTo = false camel.source.endpoint.durableSubscriptionName = null camel.source.endpoint.eagerLoadingOfProperties = false camel.source.endpoint.eagerPoisonBody = Poison JMS message due to ${exception.message} camel.source.endpoint.errorHandler = null camel.source.endpoint.errorHandlerLogStackTrace = true camel.source.endpoint.errorHandlerLoggingLevel = WARN camel.source.endpoint.exceptionHandler = null camel.source.endpoint.exceptionListener = null camel.source.endpoint.exchangePattern = null camel.source.endpoint.exposeListenerSession = false camel.source.endpoint.headerFilterStrategy = null camel.source.endpoint.idleConsumerLimit = 1 camel.source.endpoint.idleTaskExecutionLimit = 1 camel.source.endpoint.includeAllJMSXProperties = false camel.source.endpoint.jmsKeyFormatStrategy = null camel.source.endpoint.jmsMessageType = null camel.source.endpoint.lazyCreateTransactionManager = true camel.source.endpoint.mapJmsMessage = true camel.source.endpoint.maxConcurrentConsumers = null camel.source.endpoint.maxMessagesPerTask = -1 camel.source.endpoint.messageConverter = null camel.source.endpoint.messageCreatedStrategy = null camel.source.endpoint.messageIdEnabled = true camel.source.endpoint.messageListenerContainerFactory = null camel.source.endpoint.messageTimestampEnabled = true camel.source.endpoint.password = null camel.source.endpoint.pubSubNoLocal = false camel.source.endpoint.receiveTimeout = 1000 camel.source.endpoint.recoveryInterval = 5000 camel.source.endpoint.replyTo = null camel.source.endpoint.replyToDeliveryPersistent = true camel.source.endpoint.replyToSameDestinationAllowed = false camel.source.endpoint.requestTimeoutCheckerInterval = 1000 camel.source.endpoint.selector = null camel.source.endpoint.subscriptionDurable = false camel.source.endpoint.subscriptionName = null camel.source.endpoint.subscriptionShared = false camel.source.endpoint.synchronous = false camel.source.endpoint.taskExecutor = null camel.source.endpoint.testConnectionOnStartup = false camel.source.endpoint.transacted = false camel.source.endpoint.transactedInOut = false camel.source.endpoint.transactionManager = null camel.source.endpoint.transactionName = null camel.source.endpoint.transactionTimeout = -1 camel.source.endpoint.transferException = false camel.source.endpoint.transferExchange = false camel.source.endpoint.useMessageIDAsCorrelationID = false camel.source.endpoint.username = null camel.source.endpoint.waitForProvisionCorrelationToBeUpdatedCounter = 50 camel.source.endpoint.waitForProvisionCorrelationToBeUpdatedThreadSleepingTime = 100 camel.source.marshal = null camel.source.maxBatchPollSize = 1000 camel.source.maxPollDuration = 1000 camel.source.path.destinationName = test-queue camel.source.path.destinationType = queue camel.source.pollingConsumerBlockTimeout = 0 camel.source.pollingConsumerBlockWhenFull = true camel.source.pollingConsumerQueueSize = 1000 camel.source.unmarshal = null camel.source.url = null topics = test (org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnectorConfig) [task-thread-camelamqpsourceconnector-0] 2020-12-26 12:50:54,905 INFO Setting initial properties in Camel context: [{camel.source.path.destinationName=test-queue, connector.class=org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector, camel.component.amqp.username=consumer@HONO, tasks.max=1, camel.component.amqp.connectionFactory=org.apache.qpid.jms.JmsConnectionFactory, camel.source.component=amqp, camel.component.amqp.testConnectionOnStartup=true, camel.component.amqp.connectionFactory.remoteURI=amqp://$( kubectl get service eclipse-hono-dispatch-router-ext --output="jsonpath={.spec.clusterIP}" -n hono):15671, camel.component.amqp.password=verysecret, camel.component.amqp.includeAmqpAnnotations=true, camel.source.kafka.topic=mytopic, camel.source.path.destinationType=queue, task.class=org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceTask, name=camelamqpsourceconnector}] (org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain) [task-thread-camelamqpsourceconnector-0] 2020-12-26 12:50:54,985 INFO Using properties from: classpath:application.properties;optional=true (org.apache.camel.main.BaseMainSupport) [task-thread-camelamqpsourceconnector-0] 2020-12-26 12:50:55,175 INFO WorkerSourceTask{id=camelamqpsourceconnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) [task-thread-camelamqpsourceconnector-0] 2020-12-26 12:50:55,175 INFO WorkerSourceTask{id=camelamqpsourceconnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask) [task-thread-camelamqpsourceconnector-0] 2020-12-26 12:50:55,176 ERROR WorkerSourceTask{id=camelamqpsourceconnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-camelamqpsourceconnector-0] org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context at org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:144) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.IllegalArgumentException: Cannot find getter method: connectionFactory on bean: class org.apache.camel.component.amqp.AMQPComponent when binding property: connectionFactory.remoteURI at org.apache.camel.support.PropertyBindingSupport.doBuildPropertyOgnlPath(PropertyBindingSupport.java:282) at org.apache.camel.support.PropertyBindingSupport.doBindProperties(PropertyBindingSupport.java:210) at org.apache.camel.support.PropertyBindingSupport.access$100(PropertyBindingSupport.java:88) at org.apache.camel.support.PropertyBindingSupport$Builder.bind(PropertyBindingSupport.java:1785) at org.apache.camel.main.MainHelper.setPropertiesOnTarget(MainHelper.java:163) at org.apache.camel.main.BaseMainSupport.autoConfigurationFromProperties(BaseMainSupport.java:1133) at org.apache.camel.main.BaseMainSupport.autoconfigure(BaseMainSupport.java:424) at org.apache.camel.main.BaseMainSupport.postProcessCamelContext(BaseMainSupport.java:472) at org.apache.camel.main.SimpleMain.doInit(SimpleMain.java:32) at org.apache.camel.support.service.BaseService.init(BaseService.java:83) at org.apache.camel.support.service.BaseService.start(BaseService.java:111) at org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:141) ... 8 more 2020-12-26 12:50:55,176 ERROR WorkerSourceTask{id=camelamqpsourceconnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-camelamqpsourceconnector-0] 2020-12-26 12:50:55,177 INFO Stopping CamelSourceTask connector task (org.apache.camel.kafkaconnector.CamelSourceTask) [task-thread-camelamqpsourceconnector-0] 2020-12-26 12:50:55,177 INFO CamelSourceTask connector task stopped (org.apache.camel.kafkaconnector.CamelSourceTask) [task-thread-camelamqpsourceconnector-0] 2020-12-26 12:50:55,177 INFO [Producer clientId=connector-producer-camelamqpsourceconnector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer) [task-thread-camelamqpsourceconnector-0]

scholzj commented 3 years ago

I do not think the Strimzi-lab is the right place to ask this. The Apache Camel discussions might be the best place to ask about the actual configuration of the Camel connector. The Strimzi Discussions (https://github.com/strimzi/strimzi-kafka-operator/discussions) might be best to ask about how to deploy the configuration with Strimzi.

The way you add the connector to the Strimzi image seems good to me. But for the KafkaConnetor custom resource, this part:

$( kubectl get service eclipse-hono-dispatch-router-ext --output="jsonpath={.spec.clusterIP}" -n hono)

would need to be evaluated before you create the custom resource. You should be able to double check with kubectl get kafkaconnector -o yaml to see what the actual value there is.

PS: It would be much easier to read your comment and try to help if you properly format the different sections or logs as code. This way it is hard to actually just go through it.

iamrsaravana commented 3 years ago

Thanks for your reply, I Gave direct IP address in "KafkaConnector" yaml file instead this "$( kubectl get service eclipse-hono-dispatch-router-ext --output="jsonpath={.spec.clusterIP}" -n hono)"

I am getting below error messages now:

        org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context
    at org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:144)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.lang.IllegalArgumentException: Cannot find getter method: connectionFactory on bean: class org.apache.camel.component.amqp.AMQPComponent when binding property: connectionFactory.remoteURI at org.apache.camel.support.PropertyBindingSupport.doBuildPropertyOgnlPath(PropertyBindingSupport.java:282) at org.apache.camel.support.PropertyBindingSupport.doBindProperties(PropertyBindingSupport.java:210) at org.apache.camel.support.PropertyBindingSupport.access$100(PropertyBindingSupport.java:88) at org.apache.camel.support.PropertyBindingSupport$Builder.bind(PropertyBindingSupport.java:1785) at org.apache.camel.main.MainHelper.setPropertiesOnTarget(MainHelper.java:163) at org.apache.camel.main.BaseMainSupport.autoConfigurationFromProperties(BaseMainSupport.java:1133) at org.apache.camel.main.BaseMainSupport.autoconfigure(BaseMainSupport.java:424) at org.apache.camel.main.BaseMainSupport.postProcessCamelContext(BaseMainSupport.java:472) at org.apache.camel.main.SimpleMain.doInit(SimpleMain.java:32) at org.apache.camel.support.service.BaseService.init(BaseService.java:83) at org.apache.camel.support.service.BaseService.start(BaseService.java:111) at org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:141)

Anybody gives a suggestion how to pass source AMQP IP address in the configuration:

iamrsaravana commented 3 years ago

Now I am able to send the data from Hono to Strimzi Kafka Connector using Camel AMQP Source Connector.

Follow the below steps to pull data from Hono and send to Strimzi Kafka Connector:

1) Creating Name space: kubectl create namespace kafka

2) install latest Strimzi operator using the below command:

kubectl apply -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka 3) creating Kafka (1 pod) and zookeeper(2 pods ) using the below yaml configuration:

    **kafka.yaml:**

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        version: 2.6.0
        replicas: 1
        listeners:
          - name: plain
            port: 9092
            type: internal
            tls: false
          - name: tls
            port: 9093
            type: internal
            tls: true
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          log.message.format.version: "2.6"
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 10Gi
            deleteClaim: false
      zookeeper:
        replicas: 2
        storage:
          type: persistent-claim
          size: 10Gi    
          deleteClaim: false
      entityOperator:
        topicOperator: {}
        userOperator: {}

    Kuebctl apply -f kafka.yaml  -n kafka.

    _ps: Here I use glusterfs persistent storage._

4) Creating AMQP - Source Connector image file using below commands:

I downloaded Camel-amqp-kafka-connector, JMS Jar files from below link:

        https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-amqp-kafka-connector/0.7.0/camel-amqp-kafka-connector-0.7.0-package.tar.gz
        https://downloads.apache.org/qpid/jms/0.51.0/apache-qpid-jms-0.51.0-bin.tar.gz

        After downloaded the above tar and unzipped and created docker image file using below command

        **Dockerfile:**
        FROM strimzi/kafka:0.20.1-kafka-2.6.0
        USER root:root
        RUN mkdir -p /opt/kafka/plugins/camel-kafka-connectors/camel-amqp-kafka-connector/
        COPY ./plugin/apache-qpid-jms-0.55.0/* /opt/kafka/plugins/camel-kafka-connectors/camel-amqp-kafka-connector/
        COPY ./camel-amqp-kafka-connector/* /opt/kafka/plugins/camel-kafka-connectors/camel-amqp-kafka-connector/
        USER 1001

        Docker build -f ./Dockerfile -t localhost:5000/my-connector-amqp_new .
        docker push localhost:5000/my-connector-amqp_new

       _ps: Here I have used local docker repository_

5) Creating Kafkaconnect using below Yaml configuration:

    Kafkaconnect.yaml:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster-new
      annotations:
        strimzi.io/use-connector-resources: "true"
    spec:
      image: 10.128.0.6:5000/my-connector-amqp_new5
      replicas: 1
      bootstrapServers: my-cluster-kafka-bootstrap:9092
      config:
        group.id: connect-cluster-new
        offset.storage.topic: connect-cluster-offsets
        config.storage.topic: connect-cluster-configs
        status.storage.topic: connect-cluster-status
        config.storage.replication.factor: 1
        offset.storage.replication.factor: 1
        status.storage.replication.factor: 1

  kubectl apply -f Kafkaconnect.yaml -n kafka

6) Creating AMQP-KafkaConnector using below yaml file:

amqp_connector.yaml

    apiVersion: kafka.strimzi.io/v1alpha1
    kind: KafkaConnector
    metadata:
      name: camelamqpsourceconnector
      labels:
        strimzi.io/cluster: my-connect-cluster-new
    spec:
      class: org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector
      tasksMax: 1
      config:
        camel.component.amqp.includeAmqpAnnotations: true
        camel.component.amqp.connectionFactory: '#class:org.apache.qpid.jms.JmsConnectionFactory'
        camel.component.amqp.connectionFactory.remoteURI: amqp://10.106.88.243:15672
        camel.component.amqp.username: consumer@HONO
        camel.component.amqp.password: verysecret
        camel.component.amqp.testConnectionOnStartup: true
        camel.source.path.destinationType: queue 
        #( my usecase i am reading from hono)
        camel.source.path.destinationName: telemetry/LTTS_TENANT
        # ( this is the place where hono writing the data)
        topics: mytopic 
        #(kafka topics where you want to write it)

kubectl apply -f amqp_connector.yaml -n kafka. ps: IP address given here all are my pod ip address. it may vary according to your pod.

7 ) viewing data in strimzi kafka using the below command: kubectl run kafka-consumer -ti --image=strimzi/kafka:0.20.1-kafka-2.6.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic mytopic

I Hope this may be useful for somebody who wants to pull the data directly using AMQP Camel Connector.