Closed 595972434 closed 2 years ago
Hi, I'm not that familiar with Flink and I would be happy to see what is required to support Flink.
cc @slinkydeveloper
Hi, the question is whether do you expect the Flink support to look like. What are you trying to do? Which Flink API are you using (DataStream, Table) ?
@slinkydeveloper Thanks for your reply. We are trying to add cloudevent attribute to the record in Flink. For now, we plan to use Table in kafka sink(If it does not work, using DataStream is fine). When I try to use sdk-java in flink(in DataStream), the main problem is that CloudEvent
can't be serialized.
Do you have any suggestions for that? thanks
Flink DataStream can accept a SerializationSchema
/DeserializationSchema
for reading//writing records to external systems. You should adapt those interfaces to the format you want to use (e.g. JSON), so Flink can send it to your kafka topic. One alternative is to directly use the kafka ser/de provided by this library, setting it up from the flink kafka connector options
Hi @slinkydeveloper , I have followd flink document to implement KafkaSerializationSchema
interface and reused the CloudEventSerializer
to serialize CloudEvent to bytes array. I can send Kafka record in this way but can't add any headers.
CloudEventSerializer
manually and I found the CloudEventSerializer
do not implement this method
@Override
public byte[] serialize(String topic, CloudEvent data) {
throw new UnsupportedOperationException("CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent)");
}
So I use a temporary way to call this function in my code, the mock header also does not exist in kafak record header.
val value = new CloudEventSerializer().serialize(topic, new RecordHeaders().add("id", "mockHeader".getBytes(Charset.defaultCharset())), element)
Here are some major code snippets are written in scala
//main class
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaConfig = new Properties()
kafkaConfig.setProperty("bootstrap.servers", "broker:9092")
val kafkaProducer = new FlinkKafkaProducer(
"test-topic-out",
CloudEventSeders.serializer("test-topic-out"),
kafkaConfig,
Semantic.AT_LEAST_ONCE
)
env.fromCollection(List("one", "two", "three"))
.map(element => {LOG.info(s"Element is ${element}"); element})
.map(element => CloudEventBuilder.v1()
.withId("mock.data.id")
.withType("au.com.realestate.property.mock.data")
.withTime(OffsetDateTime.now)
.withDataContentType("avro/binary")
.withSource(URI.create("urn:rea:events:v1:external:mock-data"))
.withExtension("partitionkey", "mockKey")
.withData(element.getBytes())
.build())
.map(element=> {LOG.info(s" CloudEvent is ${element.toString}"); element})
.addSink(kafkaProducer)
env.execute(config.appName)
//implement KafkaSerializationSchema
object CloudEventSeders{
def serializer(topic: String ): KafkaSerializationSchema[CloudEvent] =
new KafkaSerializationSchema[CloudEvent] {
override def serialize(element: CloudEvent, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
val value = new CloudEventSerializer().serialize(topic, new RecordHeaders().add("id", "mockHeader".getBytes(Charset.defaultCharset())), element)
new ProducerRecord(topic, "test-key".getBytes(Charset.defaultCharset()), value)
}
}
}
@595972434 you don't need to use header, you can write the cloudevent to kafka as structured, look here: https://github.com/cloudevents/sdk-java/blob/master/kafka/src/main/java/io/cloudevents/kafka/CloudEventSerializer.java#L90
@slinkydeveloper Do you mean putting the CloudEvent attribute in the kafka value body(Structured model)? We prefer to use binary-model since we do not need to change our previous kafka record avro schema?
Do you mean putting the CloudEvent attribute in the kafka value body(Structured model)?
yes
We prefer to use binary-model since we do not need to change our previous kafka record avro schema?
Then isn't this enough? https://github.com/cloudevents/sdk-java/blob/master/kafka/src/main/java/io/cloudevents/kafka/CloudEventSerializer.java#L93
You create the header map manually, and then pass through the new ProducerRecord
:
RecordHeaders headers = new RecordHeaders();
byte[] body = new KafkaSerializerMessageWriterImpl(headers)
.writeBinary(cloudEvent);
return new ProducerRecord<>(topic, partition, timestampToWrite, key, body, headers);
Is this what you're looking for?
@slinkydeveloper Yeah that is what I want. I found that KafkaSerializerMessageWriterImpl
will extract header attributes defined in CloudEvent and set them to the header variable. My final code looks like this
override def serialize(element: CloudEvent, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
val headers = new RecordHeaders()
val value = new CloudEventSerializer().serialize(topic, headers, element)
new ProducerRecord(topic, 0, "string".getBytes(Charset.defaultCharset()), value, headers)
}
Thank you very much for your help! 👍
Thanks @slinkydeveloper! :)
Hi do we have a plan to support using this SDK in Flink? It seems that the
CloudEvent
class can not be serialized in Flink.