camunda-community-hub / kafka-connect-zeebe

Kafka Connect for Zeebe.io
Apache License 2.0
95 stars 53 forks source link

Send valid JSON from Source #35

Open vringar opened 4 years ago

vringar commented 4 years ago

When sending messages from a Zeebe workflow with the example ping-pong configs, messages are overly escaped to the point that they aren't valid JSON anymore:

{\"key\":2251799813685264,
\"type\":\"ping\",
\"customHeaders\":{\"topic\":\"pong\"},
\"workflowInstanceKey\":2251799813685256,
\"bpmnProcessId\":\"ping-pong\",
\"workflowDefinitionVersion\":1,
\"workflowKey\":2251799813685249,
\"elementId\":\"ServiceTask_1sx6rbe\",
\"elementInstanceKey\":2251799813685263,
\"worker\":\"kafka-connector\",
\"retries\":3,\"deadline\":1580056465256,
\"variables\":\"{\\\"foo\\\":\\\"bar\\\",\\\"key\\\":123456,\\\"name\\\":\\\"pong\\\"}\",
\"variablesAsMap\":{\"foo\":\"bar\",\"key\":123456,\"name\":\"pong\"}}

and while this works for the round trip other consumers expecting proper JSON are failing on this message.

menski commented 4 years ago

@npepinpe after checking the code I think this is expected at the moment? The source task publishes a message with the record json as string schema, which leads to the observed escaping. Can you confirm this?

npepinpe commented 4 years ago

That's correct, the current solution is just to publish the record as a string - the other side has to parse it back as JSON. I'll try out with the JSON schema and report back.

vringar commented 4 years ago

From our exploration we found two options:

  1. Remove the JSON mapper in source.json so the already valid JSON generated in the SourceTask doesn't get scrambled
  2. Define a new Schema with the schema type Schema.Type.STRUCT and define the right mappings there and then just pass the ActivatedJob

We chose the first one because it was easier for us. @npepinpe what JSON schema are you referring to? Is there a Kafka data schema for JSON already?

berndruecker commented 4 years ago

@vringar: So you switched from JsonConverter to a StringConverter (here: https://github.com/zeebe-io/kafka-connect-zeebe/issues/18)? That could maybe make sense to change that in the example too - right? Would you be willing to provide a Pull Request for it?

For the Schema: I think we should switch to the typical Kafka transformation logic (https://github.com/zeebe-io/kafka-connect-zeebe/issues/18) - that makes it more flexible. But currently we still wait for feedback on the usage before investing in that.

Can you share anything about your use case? Also happy to get that via email offline.