Closed vedran-kolka-syntio closed 1 year ago
We didn't really focus on Schema registry and in particular we never focused on Azure one. I'll try to have a look, but I don't think it is feasible in that way. Also I do think AD must be used to set everything up: https://learn.microsoft.com/en-us/azure/event-hubs/schema-registry-json-schema-kafka
Also it should be '#bean:{{defaultAzureCredential}}' and not '{{bean:defaultAzureCredential}}'
Thank you for the response. Referencing like that also causes an Exception, but a different one:
Caused by: java.lang.IllegalArgumentException: Property with key [defaultAzureCredential] not found in properties from text: kafka:camel-eh-to-mq?additionalProperties.schema.registry.credential=#bean:{{defaultAzureCredential}}&additionalProperties.schema.registry.url=...
And do you think integration with APICurio would be feasible, if the problem is in authenticating to Azure?
I think it's just a matter of understanding what paremeters should be specified as additional properties even for the Azure one. I need to test the situation a bit.
@davsclaus I do think the additionalProperties parameter is a map so the syntax in yaml should be slightly different
what version of camel-jbang are you using?
camel version
And would you be able to share the example (you can email me in private if so - the yaml and the JAR) that allows me quicker to investigate this.
At first hunch I suspect its the additionalProperties
in camel-kafka that may need special support for dealing with # references, as its a special multi valued option in camel-karaf, but it may indicate a general problem with those kinds.
We have example of integration between camel kafka quarkus and apicurio, but not through kamelets and not by using beans.
My Camel version: Camel JBang version: 3.20.4
I will email you the whole route and JAR in a minute. I really appreciate the support!
I will check out the Quarkus example, but unfortunately I have to stick with kamelets and beans.
Please add me to the email in cc. I'd like to have something to try.
you can also use jbang to export to a real maven/gradle based project (quarkus, spring boot, or main)
What I did with your yaml was
camel export --gav=com.foo:acme:1.0 -dir=mycode --runtime=camel-main
and I can load it into IDEA and have classpath with the JARs I need. I had to add
# camel-k: dependency=mvn:com.azure:azure-identity:1.9.0
# camel-k: dependency=mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1
I have a fix in the works, and created a ticket to fix this in camel-kakfa component https://issues.apache.org/jira/browse/CAMEL-19387
Your example can now run with the fix and I can use this syntax
# camel-k: dependency=mvn:com.azure:azure-identity:1.9.0
# camel-k: dependency=mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1
- beans:
- name: defaultAzureCredential
#type: "#class:my.custom.azure.credential.SimpleCredential"
type: "#class:com.foo.acme.MyCredential"
properties:
username: "scott"
- route:
id: "kafka-to-mq"
from:
uri: "kafka:cheese"
parameters:
brokers: "localhost:1234"
consumerRequestTimeoutMs: "1234"
valueDeserializer: 'com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer'
additionalProperties.schema.registry.url: 'https://mykafka.servicebus.windows.net'
additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential'
steps:
- to: "log:foo"
And I implemented a custom bean
package com.foo.acme;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
import reactor.core.publisher.Mono;
public class MyCredential implements TokenCredential {
private String username;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
@Override
public Mono<AccessToken> getToken(TokenRequestContext tokenRequestContext) {
return null;
}
@Override
public AccessToken getTokenSync(TokenRequestContext request) {
return TokenCredential.super.getTokenSync(request);
}
}
Notice the syntax is
additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential'
Thank you for the help!
Is that how beans should always be referenced or only in additionalProperties
?
Also, will you close this issue when the fix is available for use?
There is a CI build and if it completes, then it published the fix into a ASF snapshot repo http://repository.apache.org/content/groups/snapshots/
The CI job (it runs daily but I kicked it now - it may fail for odd reasons) https://ci-builds.apache.org/job/Camel/job/Camel%20Daily%20Snapshot%20Deploy/job/camel-3.20.x/37/
You can try with the fix with jbang
edit this file (is usually in that directory) .jbang/cache/urls/f20491f9ebc0e16d2fca028eb3d4f03aafdf8cd62b8af1a9d1730bfe382d8165/CamelJBang.java
And change it to use 3.20.5-SNAPSHOT for camel-bom, and camel-jbang-core as shown below:
//JAVA 11+
//REPOS mavencentral,apache-snapshot=http://repository.apache.org/content/groups/snapshots/
//DEPS org.apache.camel:camel-bom:${camel.jbang.version:3.20.5-SNASPHOT}@pom
//DEPS org.apache.camel:camel-jbang-core:${camel.jbang.version:3.20.5-SNAPSHOT}
//DEPS org.apache.camel.kamelets:camel-kamelets:${camel-kamelets.version:3.20.4}
Also you can change to use 3.20.5-SNAPSHOT version in maven projects and add that ASF snapshot repo to maven repositories in the pom.xml file http://repository.apache.org/content/groups/snapshots/
The fix is coming in 3.20.6 / 3.21 / 4.0-RC1 etc
We are going to add an example similar to the eventhubs one with schema registry too
That would be perfect
You can run this example locally with jbang today, and download a patch JAR with the fix.
Then it works with the released Camel 3.20.4
1) Download patched JAR https://repository.apache.org/content/groups/snapshots/org/apache/camel/camel-kafka/3.20.5-SNAPSHOT/
Make sure to take the newest date; I am using [camel-kafka-3.20.5-20230525.085449-18.jar] Download this JAR to the folder where you can your Camel yaml route and rename it camel-kafka-3.20.6-fix.jar
2) Have folder with the source. Here is what I have
-rw-r--r-- 1 davsclaus staff 722 May 25 10:24 MyCredential.java -rw-r--r-- 1 davsclaus staff 743 May 25 16:29 azure.yaml -rw-r--r--@ 1 davsclaus staff 251101 May 25 16:27 camel-kafka-3.20.6-fix.jar
This source is in a gist at: https://gist.github.com/davsclaus/acce5925c4abce08aff5d41d7747ae44
3) You can now run camel with the patched JAR
camel run azure.yaml MyCredential.java camel-kafka-3.20.6-fix.jar
If you dont have other files you should not include, you can also run with camel run *
4) When Camel startup it logs that it has added that patched JAR
2023-05-25 16:34:05.944 INFO 32691 --- [ main] org.apache.camel.main.MainSupport : Apache Camel (JBang) 3.20.4 is starting
2023-05-25 16:34:05.967 INFO 32691 --- [ main] org.apache.camel.main.MainSupport : Additional jars added to classpath: camel-kafka-3.20.6-fix.jar
The additionalProperties.schema.registry.credential
works now. Thank you for the support!
Unfortunately now I have a problem with a boolean in the additionalProperties, but I will raise a new issue for that tomorrow.
Camel 3.20.6 is expected to be released this week
Camel 3.20.6 has been released https://camel.apache.org/blog/2023/06/RELEASE-3.20.6/
Thank you for the update! The issue with the credentials has been resolved so do you plan on closing the issue or are you keeping it open until an example is produced?
@vedran-kolka-syntio - Sorry for any delayed reply. We are currently very busy and focused on bringing Apache Camel 4 to GA which is scheduled for this month.
Also its PTO for people on and off these days, so we have better time to get back to this subject towards end of september.
I think its valuable for the Camel community to have examples or snippets how to work with Azure Kafka. So any help from you or your team is appreciated.
Please if you want to help then ping back later in September. Thanks.
I'll restart working on the example soon. Probably next week. Sorry for delay but I've been busy.
The
additionalProperties.schema.registry.credential
works now. Thank you for the support! Unfortunately now I have a problem with a boolean in the additionalProperties, but I will raise a new issue for that tomorrow.
Is the boolean issue resolved?
I was trying to create an example, but I would like to have exactly your same settings @vedran-kolka-syntio, can you please share your full configuration for this integration? Thanks.
Sorry for the late response, but I am currently otherwise engaged at the job. I would love to contribute the example myself, as I might have some time for the weekend. Until then, the full configuration for the route is as follows:
# camel-k: dependency=mvn:com.ibm.mq:com.ibm.mq.allclient:9.2.5.0
# camel-k: dependency=mvn:org.apache.camel.kamelets:camel-kamelets-utils:3.20.1.1
# camel-k: dependency=mvn:kolka.camel:eventhubs:0.2
# camel-k: dependency=mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1
# camel-k: dependency=mvn:com.azure:azure-data-schemaregistry-apacheavro:1.1.4
# camel-k: dependency=mvn:com.azure:azure-identity:1.9.0
- beans:
- name: wmqConnectionFactory
type: "#class:com.ibm.mq.jms.MQConnectionFactory"
properties:
XMSC_WMQ_HOST_NAME: '{{mq.serverName}}'
XMSC_WMQ_PORT: '{{mq.serverPort}}'
XMSC_WMQ_CHANNEL: '{{mq.channel}}'
XMSC_WMQ_QUEUE_MANAGER: '{{mq.queueManager}}'
XMSC_WMQ_CONNECTION_MODE: 1
XMSC_USERID: '{{mq.username}}'
XMSC_PASSWORD: '{{mq.password}}'
XMSC_CLIENT_ID: '{{?mq.clientId}}'
XMSC_WMQ_SSL_CIPHER_SUITE: '*TLS12ORHIGHER'
- name: kafkaHeaderDeserializer
type: "#class:kolka.camel.eventhubs.StringKafkaHeaderDeserializer"
- name: defaultAzureCredential
type: "#class:kolka.camel.eventhubs.SimpleCredential"
- route:
id: "kafka-to-mq"
from:
uri: "kafka:{{kafka.kafka-to-mq-topic}}"
parameters:
autoOffsetReset: earliest
brokers: "{{kafka.bootstrapServers}}"
saslJaasConfig: 'org.apache.kafka.common.security.plain.PlainLoginModule required username={{kafka.username}} password={{kafka.password}};'
saslMechanism: PLAIN
securityProtocol: SASL_SSL
headerDeserializer: '#kafkaHeaderDeserializer'
valueDeserializer: 'com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer'
groupId: 'kolka-consumer-group'
additionalProperties.schema.registry.url: '{{kafka.schema-registry-url}}'
additionalProperties.schema.group: avro
additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential'
additionalProperties.specific.avro.value.type: 'kolka.camel.eventhubs.models.Order'
additionalProperties.specific.avro.reader: '#valueAs(boolean):true'
steps:
- set-header:
name: "JMSCorrelationID"
header:
expression: "kafka.KEY"
- remove-header:
name: "kafka.KEY"
- remove-header:
name: "kafka.HEADERS"
- to:
uri: "jms:queue:{{mq.destinationName}}"
parameters:
connectionFactory: "{{bean:wmqConnectionFactory}}"
allowAdditionalHeaders: "*"
allowSerializedHeaders: true
I would like to try creating the example and contribute it, if that's alright.
For the example, I would leave only Kafka and Schema Registry related things. It would also include the implementation of SimpleCredential
, as it is only a thin wrapper around Azure's DefaultAzureCredential.
It would be super. Thanks a lot. It would be great also to have some terraform or infra instructions for creating the azure resources. No rush. Thanks a lot for your time.
I assigned this to you @vedran-kolka-syntio
Using camel version 4.0.0, I am having the same problem for which I opened the issue, so I reverted back to using camel-kafka-3.22.0 (this one exactly). Is it possible that the fix isn't available in 4.0.0?
Can you report the error exactly? Thanks
I don't think we didn't backport the fix
This is my current configuration:
# camel-k: dependency=mvn:org.apache.camel.kamelets:camel-kamelets-utils:3.20.1.1
# camel-k: dependency=mvn:org.apache.camel.kamelets:azure-identity:0.1
# camel-k: dependency=mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1
# camel-k: dependency=mvn:com.azure:azure-data-schemaregistry-apacheavro:1.1.4
# camel-k: dependency=mvn:com.azure:azure-identity:1.9.0
# camle-k: dependency=mvn:org.apache.camel:camel-kafka:3.20.6
- beans:
- name: defaultAzureCredential
type: "#class:org.apache.camel.kamelets.azure.DefaultAzureCredentialWrapper"
- route:
id: "kafka-to-log"
from:
uri: "kafka:{{kafka.my-topic}}"
parameters:
autoOffsetReset: earliest
brokers: "{{kafka.bootstrapServers}}"
saslJaasConfig: 'org.apache.kafka.common.security.plain.PlainLoginModule required username={{kafka.username}} password={{kafka.password}};'
saslMechanism: PLAIN
securityProtocol: SASL_SSL
valueDeserializer: 'com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer'
groupId: 'my-consumer-group'
additionalProperties.schema.registry.url: '{{kafka.schema-registry-url}}'
additionalProperties.schema.group: avro
additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential'
additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):org.apache.camel.kamelets.eventhubs.models.Order'
additionalProperties.specific.avro.reader: '#valueAs(boolean):true'
steps:
- to:
uri: "kamelet:log-sink"
parameters:
showStreams: true
showHeaders: true
multiline: true
And this is the error:
2023-09-24 17:49:53.389 WARN 18716 --- [sumer[my-topic]] l.component.kafka.KafkaFetchRecords : Error creating org.apache.kafka.clients.consumer.KafkaConsumer due to: Failed to construct kafka consumer
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:825) ~[kafka-clients-3.2.3.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[kafka-clients-3.2.3.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:647) ~[kafka-clients-3.2.3.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:627) ~[kafka-clients-3.2.3.jar:?]
at org.apache.camel.component.kafka.DefaultKafkaClientFactory.getConsumer(DefaultKafkaClientFactory.java:34) ~[camel-kafka-3.20.1.jar:3.20.1]
at org.apache.camel.component.kafka.KafkaFetchRecords.createConsumer(KafkaFetchRecords.java:245) ~[camel-kafka-3.20.1.jar:3.20.1]
at org.apache.camel.component.kafka.KafkaFetchRecords.createConsumerTask(KafkaFetchRecords.java:205) ~[camel-kafka-3.20.1.jar:3.20.1]
at org.apache.camel.support.task.ForegroundTask.run(ForegroundTask.java:94) [camel-support-4.0.0.jar:4.0.0]
at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:127) [camel-kafka-3.20.1.jar:3.20.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class com.azure.core.credential.TokenCredential (java.lang.String is in module java.base of loader 'bootstrap'; com.azure.core.credential.TokenCredential is in unnamed module of loader org.apache.camel.main.download.DependencyDownloaderClassLoader @3e8f7922)
at com.microsoft.azure.schemaregistry.kafka.avro.AbstractKafkaSerdeConfig.getCredential(AbstractKafkaSerdeConfig.java:66) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:?]
at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializerConfig.getCredential(KafkaAvroDeserializerConfig.java:11) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:?]
at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.configure(KafkaAvroDeserializer.java:55) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718) ~[kafka-clients-3.2.3.jar:?]
... 13 more
The error looks exactly the same as in the original error of this issue, and using camel-kafka-3.22.0
I was able to resolve it, like so:
camel run kafka-log.yaml camel-kafka-3.22.0.jar --properties application.properties
Can you go ahead with the PR with the example and use 3.22.0 so we could use to reproduce? Thanks. The instructions to set up Azure resources too, are welcome. Thanks!
Il dom 24 set 2023, 17:57 Vedran Kolka @.***> ha scritto:
This is my current configuration:
camel-k: dependency=mvn:org.apache.camel.kamelets:camel-kamelets-utils:3.20.1.1# camel-k: dependency=mvn:org.apache.camel.kamelets:azure-identity:0.1# camel-k: dependency=mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1# camel-k: dependency=mvn:com.azure:azure-data-schemaregistry-apacheavro:1.1.4# camel-k: dependency=mvn:com.azure:azure-identity:1.9.0# camle-k: dependency=mvn:org.apache.camel:camel-kafka:3.20.6
beans:
- name: defaultAzureCredential type: "#class:org.apache.camel.kamelets.azure.DefaultAzureCredentialWrapper"
route: id: "kafka-to-log" from: uri: "kafka:{{kafka.my-topic}}" parameters: autoOffsetReset: earliest brokers: "{{kafka.bootstrapServers}}" saslJaasConfig: 'org.apache.kafka.common.security.plain.PlainLoginModule required username={{kafka.username}} password={{kafka.password}};' saslMechanism: PLAIN securityProtocol: SASL_SSL valueDeserializer: 'com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer' groupId: 'my-consumer-group' additionalProperties.schema.registry.url: '{{kafka.schema-registry-url}}' additionalProperties.schema.group: avro additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential' additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):org.apache.camel.kamelets.eventhubs.models.Order' additionalProperties.specific.avro.reader: '#valueAs(boolean):true' steps:
- to: uri: "kamelet:log-sink" parameters: showStreams: true showHeaders: true multiline: true
And this is the error:
2023-09-24 17:49:53.389 WARN 18716 --- [sumer[my-topic]] l.component.kafka.KafkaFetchRecords : Error creating org.apache.kafka.clients.consumer.KafkaConsumer due to: Failed to construct kafka consumer org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.
(KafkaConsumer.java:825) ~[kafka-clients-3.2.3.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer. (KafkaConsumer.java:666) ~[kafka-clients-3.2.3.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer. (KafkaConsumer.java:647) ~[kafka-clients-3.2.3.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer. (KafkaConsumer.java:627) ~[kafka-clients-3.2.3.jar:?] at org.apache.camel.component.kafka.DefaultKafkaClientFactory.getConsumer(DefaultKafkaClientFactory.java:34) ~[camel-kafka-3.20.1.jar:3.20.1] at org.apache.camel.component.kafka.KafkaFetchRecords.createConsumer(KafkaFetchRecords.java:245) ~[camel-kafka-3.20.1.jar:3.20.1] at org.apache.camel.component.kafka.KafkaFetchRecords.createConsumerTask(KafkaFetchRecords.java:205) ~[camel-kafka-3.20.1.jar:3.20.1] at org.apache.camel.support.task.ForegroundTask.run(ForegroundTask.java:94) [camel-support-4.0.0.jar:4.0.0] at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:127) [camel-kafka-3.20.1.jar:3.20.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?] at java.lang.Thread.run(Thread.java:833) [?:?] Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class com.azure.core.credential.TokenCredential (java.lang.String is in module java.base of loader 'bootstrap'; com.azure.core.credential.TokenCredential is in unnamed module of loader org.apache.camel.main.download.DependencyDownloaderClassLoader @3e8f7922) at com.microsoft.azure.schemaregistry.kafka.avro.AbstractKafkaSerdeConfig.getCredential(AbstractKafkaSerdeConfig.java:66) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:?] at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializerConfig.getCredential(KafkaAvroDeserializerConfig.java:11) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:?] at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.configure(KafkaAvroDeserializer.java:55) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer. (KafkaConsumer.java:718) ~[kafka-clients-3.2.3.jar:?] ... 13 more The error looks exactly the same as in the original error of this issue, and using camel-kafka-3.22.0 I was able to resolve it, like so:
camel run kafka-log.yaml camel-kafka-3.22.0.jar --properties application.properties
— Reply to this email directly, view it on GitHub https://github.com/apache/camel-kamelets-examples/issues/21#issuecomment-1732603257, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABG6XV2BRBKXIXBUVATQ2VTX4BJ5RANCNFSM6AAAAAAYNGZDLM . You are receiving this because you were assigned.Message ID: @.***>
I don't have instructions for Azure resources yet, but I'll open the PR in a few minutes with everything else.
@davsclaus thoughts?
Is # camel-k: dependency=mvn:org.apache.camel.kamelets:azure-identity:0.1
a JAR you have made yourself internally?
Its not a good idea to use org.apache.camel as that is for the official ASF project.
Is org.apache.camel.kamelets.eventhubs.models.Order
a class from your own JAR ?
And for
additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):org.apache.camel.kamelets.eventhubs.models.Order'
Can you try with
additionalProperties.specific.avro.value.type: '#type:org.apache.camel.kamelets.eventhubs.models.Order'
Yes, I have made the JAR for the example, not for the company internally. It also contains Order
. I will rename it.
Running what you suggested with camel-kafka-4.0.0 gives the following error upon KafkaConsumer construction:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:825) ~[kafka-clients-3.2.3.jar:?]
...
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class com.azure.core.credential.TokenCredential (java.lang.String is in module java.base of loader 'bootstrap'; com.azure.core.credential.TokenCredential is in unnamed module of loader org.apache.camel.main.download.DependencyDownloaderClassLoader @3e8f7922)
Using camel-kafka-3.22.0 it fails earlier:
Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: kafka://my-topic?additionalProperties.schema.group=avro&additionalProperties.schema.registry.credential=%23bean%3AdefaultAzureCredential&additionalProperties.schema.registry.url=https%3A%2F%2Flabscamel-eventhubs-dev.servicebus.windows.net&additionalProperties.specific.avro.reader=%23valueAs%28boolean%29%3Atrue&additionalProperties.specific.avro.value.type=%23type%3Acom.acme.example.eventhubs.models.Order&autoOffsetReset=earliest&brokers=labscamel-eventhubs-dev.servicebus.windows.net%3A9093&groupId=my-consumer-group&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL&valueDeserializer=com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer due to: Error binding property (specific.avro.value.type=#type:com.acme.example.eventhubs.models.Order) with name: specific.avro.value.type on bean: {schema.registry.url=https://labscamel-eventhubs-dev.servicebus.windows.net, schema.group=avro, schema.registry.credential=com.acme.example.azure.DefaultAzureCredentialWrapper@322204dc, specific.avro.reader=true} with value: #type:com.acme.example.eventhubs.models.Order
at org.apache.camel.impl.engine.AbstractCamelContext.doGetEndpoint(AbstractCamelContext.java:856)
at org.apache.camel.impl.engine.AbstractCamelContext.getEndpoint(AbstractCamelContext.java:758)
at org.apache.camel.support.CamelContextHelper.getMandatoryEndpoint(CamelContextHelper.java:60)
at org.apache.camel.reifier.AbstractReifier.resolveEndpoint(AbstractReifier.java:195)
at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:96)
at org.apache.camel.reifier.RouteReifier.createRoute(RouteReifier.java:76)
... 26 more
Caused by: org.apache.camel.PropertyBindingException: Error binding property (specific.avro.value.type=#type:com.acme.example.eventhubs.models.Order) with name: specific.avro.value.type on bean: {schema.registry.url=https://labscamel-eventhubs-dev.servicebus.windows.net, schema.group=avro, schema.registry.credential=com.acme.example.azure.DefaultAzureCredentialWrapper@322204dc, specific.avro.reader=true} with value: #type:com.acme.example.eventhubs.models.Order
at org.apache.camel.support.PropertyBindingSupport.doSetPropertyValue(PropertyBindingSupport.java:548)
at org.apache.camel.support.PropertyBindingSupport.doBuildPropertyOgnlPath(PropertyBindingSupport.java:420)
at org.apache.camel.support.PropertyBindingSupport.doBindProperties(PropertyBindingSupport.java:297)
at org.apache.camel.support.PropertyBindingSupport$Builder.bind(PropertyBindingSupport.java:1906)
at org.apache.camel.support.PropertyBindingSupport.bindProperties(PropertyBindingSupport.java:132)
at org.apache.camel.component.kafka.KafkaComponent.createEndpoint(KafkaComponent.java:87)
at org.apache.camel.component.kafka.KafkaComponent.createEndpoint(KafkaComponent.java:34)
at org.apache.camel.support.DefaultComponent.createEndpoint(DefaultComponent.java:170)
at org.apache.camel.impl.engine.AbstractCamelContext.doGetEndpoint(AbstractCamelContext.java:822)
... 31 more
Caused by: java.lang.IllegalStateException: Cannot select single type: com.acme.example.eventhubs.models.Order as there are no beans in the registry with this type
at org.apache.camel.support.PropertyBindingSupport.resolveBean(PropertyBindingSupport.java:1597)
at org.apache.camel.support.PropertyBindingSupport.resolveValue(PropertyBindingSupport.java:902)
at org.apache.camel.support.PropertyBindingSupport.doSetPropertyValue(PropertyBindingSupport.java:538)
... 39 more
After defining an Order as a bean:
- beans:
- name: order
type: "#class:com.acme.example.eventhubs.models.Order"
The following error occurs, upon deserializing the message (still using camel-kafka-3.22.0):
org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition my-topic-0 at offset 11. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1435) ~[kafka-clients-3.2.3.jar:?]
...
Caused by: java.lang.ClassCastException: class com.acme.example.eventhubs.models.Order cannot be cast to class java.lang.Class (com.acme.example.eventhubs.models.Order is in unnamed module of loader org.apache.camel.main.download.DependencyDownloaderClassLoader @3e8f7922; java.lang.Class is in module java.base of loader 'bootstrap')
Could you provide a link to any documentation describing the operations invoked with #
(#valueAs
, #bean
, #type
, #class
...) ?
I am having trouble using these and cannot seem to find the docs.
The old example works fine with 3.20.6 and 4.0.0. However I think you have since added more configuration, in particular
additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):org.apache.camel.kamelets.eventhubs.models.Order'
I have not had time to dive more into this, but it would be good to have a reproducer, and also I wonder if its the karaf client / avro that does the classloading that it cannot load this class.
It may be of the nature of dynamic classloading via jbang is complex. I had another use-case today https://issues.apache.org/jira/browse/CAMEL-19924
In the mean time, you can try to export this to a standard spring-boot / quarkus or camel-main project and run it with pure maven / java.
camel export * --runtime=spring-boot --directory=code
But still it would be great to get this issue resolved and work OOTB with camel-jbang. However a bit busy this week.
You are right, running with Spring Boot works with 4.0.0, so I've added that as a note in the example's readme.
Trying to follow the README and to run produce.java I'm getting
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.1.0:java (default-cli) on project azure-identity: An exception occurred while executing the Java class. Status code 401, "{"error":{"code":"ErrorInRequest","message":"Authorization failed for specified action: SchemaGroupWrite,SchemaWrite. TrackingId:1a558a26-cdc0-4bca-9e7c-065076e7a3e0_G5, SystemTracker:pippo-test123-namespace.servicebus.windows.net:$schemaGroups\/avro\/schemas\/com.acme.example.eventhubs.models.Order, Timestamp:2023-10-02T10:03:43"}}
The application.properties seems to be correct and I used the terraform main.tf
Are you sure we don't need some more manual steps in Azure?
The command is:
mvn compile exec:java -Dexec.mainClass="com.acme.example.eventhubs.Produce"
in Azure Identity folder.
Did You add anything to the pom.xml to run it that way? The command doesn't work for me.
I ran it in IntelliJ, and seems to work for me, but I suspect it's because I have already authenticated to Azure using Azure CLI.
Could you try running az login
before running Produce.java
?
I'm logged in already.
I didn't add anything special to POM
I'll try again and get back to you
Il mar 10 ott 2023, 22:20 Vedran Kolka @.***> ha scritto:
Did You add anything to the pom.xml to run it that way? The command doesn't work for me. I ran it in IntelliJ, and seems to work for me, but I suspect it's because I have already authenticated to Azure using Azure CLI. Could you try running az login before running Produce.java ?
— Reply to this email directly, view it on GitHub https://github.com/apache/camel-kamelets-examples/issues/21#issuecomment-1756164054, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABG6XV3DZGDMXAZP67L2LDLX6WU2HAVCNFSM6AAAAAAYNGZDLOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTONJWGE3DIMBVGQ . You are receiving this because you were assigned.Message ID: @.***>
I just modify the template and renamed as application.properties by populating the field before running that command.
It worked, in the terraform configuration is missing the Schema Registry Contributor role assignment.
Great! Could You add it to the terraform script? Or perhaps just mention the role as a prerequisite? Because assigning roles through terraform also requires additional roles.
Done. Added a note about this, the role assignment could be done just one time. So if we force it in terraform configuration it will start to fail after the first apply.
Hi and thanks for all the examples with running Camel Routes with JBang!
Still, I'm having trouble using beans to configure a route in YAML DSL, when a property of a Kamelet expects an object (not a string, int, ...).
The bean I am using is a class I implemented as a wrapper around DefaultAzureCredential implementing TokenCredential, because DefaultAzureCredential can only be instantiated using a builder class, which I cannot do in YAML DSL.
Use case: let's say messages on Kafka are in Avro format, and use Azure Schema Registry for schemas. To integrate with a service on which consumers only understand JSON, a Camel Route would need to read a Kafka message, deserialize it using the schema registry, convert to JSON, and send to the sink. The deserializer expects a property
schema.registry.credential
of typecom.azure.core.credential.TokenCredential
, which I define as a bean, but can't seem to reference correctly.These are the relevant parts of route definition:
The error when running
camel run route.yaml --properties application.properties
:Referencing the bean like this (which works for the headerDeserializer):
results with the same Exception.
Basically, my question is: Is it possible to use custom SerDe classes for integration with various schema registries (in this case, Azure Schema Registry), as you would in a normal Kafka consumer/producer microservice, but in Camel's YAML DSL?