cloudevents / sdk-java

Java SDK for CloudEvents
https://cloudevents.github.io/sdk-java/
Apache License 2.0
383 stars 156 forks source link

Running into exception while using cloudevent values in kafka streams #597

Open supriya-albal-polestar opened 9 months ago

supriya-albal-polestar commented 9 months ago

I want to take join between a kafka stream and a ktable. The poc works fine with stream data. However, when I use CloudEvent, I keep running into some or other issue related to serialization.

Here is my code sample -

Map<String, Object> ceSerializerConfigs = new HashMap<>();
ceSerializerConfigs.put(ENCODING_CONFIG, Encoding.STRUCTURED);
ceSerializerConfigs.put(EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);

CloudEventSerializer serializer = new CloudEventSerializer();
serializer.configure(ceSerializerConfigs, false);

CloudEventDeserializer deserializer = new CloudEventDeserializer();
deserializer.configure(ceSerializerConfigs, false);
Serde<CloudEvent> cloudEventSerde = Serdes.serdeFrom(serializer, deserializer);
KStream<String, CloudEvent> kStream = builder.stream("stream-topic", Consumed.with(Serdes.String(), cloudEventSerde));
KTable<String, CloudEvent> kTable = builder.table("ktable-topic", Consumed.with(Serdes.String(), cloudEventSerde));
KStream<String, CloudEvent> joined = kStream
    .join(kTable, (left, right) -> CloudEventBuilder.v1().withId(left.getId().concat(right.getId())).build());
joined.to(output, Produced.with(Serdes.String(), eventsSerde));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamProps);
kafkaStreams.start();

I also tried using WrapperSerde - Issue with configuring Serdes for Kafka Streams

However I keep running into exception -

   18:12:08.691 [basic-streams-updated-0630c691-0080-4e02-8c85-7bff650f34e9-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [basic-streams-updated-0630c691-0080-4e02-8c85-7bff650f34e9] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000002, topic=cloudevent-ktable, partition=0, offset=80, stacktrace=java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent)

     Caused by: java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent) at io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:84) ~[cloudevents-kafka-2.5.0.jar:?] at io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:38) ~[cloudevents-kafka-2.5.0.jar:?] at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:82) ~[kafka-streams-2.8.0.jar:?] at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:73) ~[kafka-streams-2.8.0.jar:?] at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:30) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:200) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:200) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:122) ~[kafka-streams-2.8.0.jar:?]
     18:12:08.691 [basic-streams-updated-0630c691-0080-4e02-8c85-7bff650f34e9-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [basic-streams-updated-0630c691-0080-4e02-8c85-7bff650f34e9] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000002, topic=cloudevent-ktable, partition=0, offset=80, stacktrace=java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent)

Caused by: java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent) at io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:84) ~[cloudevents-kafka-2.5.0.jar:?] at io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:38) ~[cloudevents-kafka-2.5.0.jar:?] at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:82) ~[kafka-streams-2.8.0.jar:?] at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:73) ~[kafka-streams-2.8.0.jar:?] at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:30) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:200) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:200) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:122) ~[kafka-streams-2.8.0.jar:?]

Am I missing anything, any help from CloudEvent team is appreciated.

pierDipi commented 5 months ago

The deserializer only supports the Kafka protocol binding for cloudevents as per spec https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/kafka-protocol-binding.md, I believe the records are not formatted as expected by the spec ?