Open apoorvmintri opened 9 months ago
The headers coming from camel are prefixed as CamelHeader.
@oscerd I believe I have found the issue/workaround. I will paste images here rather to make things more readable. FYI, I am using redpanda console to make life easier.
Using the connector config described in the problem above. I get the following info in my Kafka -
As you can see RecordId does not exist in the value or the header. Please note the "Account Name" I provided in Salesforce was "Using 3.18.2 without rawPayload" - I will come back to this.
I tried setting rawPayload=true
through both camel.component.salesforce
& camel.source.endpoint
, but that didnt work. With component.salesforce, I also had to provide loginUrl, clientId etc as well on top of the kamelet requirement as well - otherwise it would give an exception.
I decided to use this version to avoid kamelet altogether.
Please note the above image has "camel.component.salesforce.rawPayload": "true"
. I did try both true and false (default) for this property and this was the result.
The header is the same as before. But as you can see recordIds come with the value payload when rawPayload is set to true. Please also note I had to use the ByteArrayConverter
with marshal=json-jackson
There is no way I could find to set rawPayload=true
via kamelet or any other property as mentioned above. Hence I decided to force this property into the endpoint via the topic name -
"camel.kamelet.salesforce-source.topicName": "subscribe:/data/AccountChangeEvent?rawPayload=true&"
NOTE: The &
is very important at the end of the topicName otherwise when the endpoint is concatenated in the end with other properties, it adds ?
again, which will raise an exception.
By including this property via the topicName, I was able to receive the value as below.
The final workaround does not give a proper json rather a string with \
escape character. I was unable to use marshal as it would throw an exception with v3.18.2. This could be managed via the pipeline / Kafka consumer.
I also tried using SMT as follows, as mentioned by you in another github issue related to telegram I believe -
Unfortunately, this converts the value payload to a JWT style, which then needs to be decoded.
If developers want to use v3.18.2 the above workaround will do the job as intended but not ideal i.e. not clean. Otherwise we need to revert to v0.11.5 for now. Overall, I believe kamelet
is not ready for salesforce and required properties like rawPayload
is missing. FYI, these issues are pertaining to our use case and we aren't using all the potential properties. There might be other issues with kamelet when used with salesforce as source.
Also, we haven't tested salesforce as sink with v3.18.2 yet. Which might reveal other issues. Currently, I need to discuss with my team on testing sink versions for smooth sailing before finalizing the actual connector versions. I'll try to keep you posted.
I might be talking out of my arse here, but I think the fastest solution for the newer versions of kafka-connector for salesforce using kamelet is to allow developers to create the endpoint uri directly in the config class and remove all defaults from the properties. Having the option to use properties and/or directly creating the endpoint uri would be flexible.
We don't have the capacity to follow the development of Kamelets and at the same time generating the connectors as we have done in the past. We need to find a tradeoff and the tradeoff is using Kamelets as building block for Kafka connectors. We could add more options to the Kamelets, like I'll do for rawPayload. If you want to use the component fully and with all the options, my suggestion is using Camel plain with Camel-Salesforce and Camel-Kafka component and avoid using the Kafka connector from Camel-Kafka-Connector. We're not going to change Kamelets as building block in the short term and probably in the long term too.
And as always you're welcome to help on the Kamelets side or also by continuing reporting issues. Thanks for all your findings!
@oscerd That makes complete sense.
Honestly I would love to help out, figuring out the workaround and being a detective is fun but I have 0 java knowledge. But considering what you have mentioned regarding the way forward with kafka connector, I will talk to my company on developing a testing framework for salesforce connector - this won't be in java unfortunately as none of us in the team use it. Rather via simple setup & scripts like I am doing now. I will share those methodology & scripts.
Whenever new versions come out and as long as I am working on this project, I will ensure to test using those scripts and post results and findings on that thread.
So far I have the following -
I will be moving forward with testing salesforce sink connectors, will update you with my findings.
Hey @apoorvmintri , How did v0.11.5
solve the problem?
When i use rawPayload=true + CONNECT_VALUE_CONVERTER='org.apache.kafka.connect.storage.StringConverter' with SALESFORCE_CONNECTOR_VERSION='0.11.5' i get messages like
{data={schema=8pgFT5npbFPH6CEgjcgdqA, payload={LastModifiedDate=2024-02-20T08:54:58.000Z, ChangeEventHeader={commitNumber=1708419299123765249, commitUser=005Fg000002qeMrIAI, sequenceNumber=1, entityName=Fan__c, changeType=UPDATE, changedFields=[Ljava.lang.Object;@5c965d4a, changeOrigin=, transactionKey=00000be8-388d-9057-a1e6-e0528859ed76, commitTimestamp=1708419298000, recordIds=[Ljava.lang.Object;@2ad7ba7f}, Email__c=rambo@fighter.com}, event={replayId=4401}}, channel=/data/Fan__ChangeEvent}
not real json. JsonConverter for value throws null pointer exception.
@aonamrata I downloaded the jar file from https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-salesforce-kafka-connector/0.11.5/
Note this is v0.11.5 Add the jar file to your kafka location and add the path to the plugin path. i.e. the usual stuff.
And then used Kafka Connects REST API to create my connector -
{ "name": "sf-source-connector", "config": { "tasks.max":"1", "connector.class":"org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "camel.component.salesforce.rawPayload": "true", "camel.component.salesforce.notifyForFields": "ALL", "camel.idempotency.kafka.bootstrap.servers": "uds-kafka:9092", "camel.component.salesforce.sObjectQuery": "SELECT Id,Name FROM Account", "camel.source.path.topicName": "/data/AccountChangeEvent", "camel.component.salesforce.notifyForOperationCreate": "true", "camel.component.salesforce.notifyForOperationUpdate": "true", "camel.component.salesforce.notifyForOperationDelete": "true", "camel.component.salesforce.notifyForOperationUndelete": "true", "topics": "camelsfstream", "camel.component.salesforce.loginUrl": "https://login.salesforce.com/", "camel.component.salesforce.clientId": "Redacted", "camel.component.salesforce.clientSecret": "Redacted", "camel.component.salesforce.userName": "Redacted", "camel.component.salesforce.password": "Redacted", "camel.source.marshal":"json-jackson" } }
And got the correct json result in my kafka.
Now I'd to add that we have moved away from camel connectors cause we faced lots of issues with the salesforce sink connector with 0.11.5, 3.18.2 & 4.0.0. We just couldnt make it work and decided to build our own set of producers and consumers with python. The salesforce CDC was achieved with python based cometd client - https://pypi.org/project/aiosfstream/
You'll have to fork this project and update it if you plan to use it with py 3.10 or greater. So if you are using 3.7.x to 3.9.x then you should be fine. not sure if it'll work with 3.6.x or lower.
Connector: v3.18.2
Issue:
The salesforce source kafka connector does not contain the salesforce recordId/sObjectId on any event - Create, Update etc. Its not in the message value nor in the header. So unable to relate the update to the particular record that generated the update event. It only contains the fields, last modified date etc. Am I missing something?
Connector Config:
{ "name": "sf-source-connector", "config": { "tasks.max":"1", "connector.class":"org.apache.camel.kafkaconnector.salesforcesource.CamelSalesforcesourceSourceConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "camel.kamelet.salesforce-source.notifyForFields": "ALL", "camel.main.streamCachingEnabled": "false", "camel.idempotency.kafka.bootstrap.servers": "uds-kafka:9092", "camel.kamelet.salesforce-source.query": "SELECT * FROM Account", "camel.kamelet.salesforce-source.topicName": "subscribe:/data/AccountChangeEvent", "camel.kamelet.salesforce-source.loginUrl": "https://login.salesforce.com/", "camel.kamelet.salesforce-source.clientId": "<Redacted>", "camel.kamelet.salesforce-source.clientSecret": "<Redacted>", "camel.kamelet.salesforce-source.userName": "<Redacted>", "camel.kamelet.salesforce-source.password": "<Redacted>", "camel.kamelet.salesforce-source.notifyForOperationCreate": "true", "camel.kamelet.salesforce-source.notifyForOperationUpdate": "true", "camel.kamelet.salesforce-source.notifyForOperationDelete": "true", "camel.kamelet.salesforce-source.notifyForOperationUndelete": "true", "topics": "camelsfstream" } }