Closed fdelbrayelle closed 4 years ago
The body of JacksonSerde.java.ejs could be as follows:
JacksonSerde.java.ejs
import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; public class JacksonSerde<T> implements Serde<T> { private final Serializer<T> serializer; private final Deserializer<T> deserializer; public JacksonSerde(Class<T> cls) { this.deserializer = new JacksonDeserializer<>(cls); this.serializer = new JacksonSerializer<>(); } public static <T> JacksonSerde<T> of(Class<T> cls) { return new JacksonSerde<>(cls); } @Override public void configure(Map<String, ?> settings, boolean isKey) { this.serializer.configure(settings, isKey); this.deserializer.configure(settings, isKey); } @Override public void close() { this.deserializer.close(); this.serializer.close(); } @Override public Serializer<T> serializer() { return this.serializer; } @Override public Deserializer<T> deserializer() { return this.deserializer; } }
The body of JacksonSerializer.java.ejs could be as follows:
JacksonSerializer.java.ejs
import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; public class JacksonSerializer<T> extends JacksonAbstract implements Serializer<T> { public JacksonSerializer() { super(); } @Override public byte[] serialize(String topic, T message) { if (null == message) { return new byte[0]; } try { return getMapper().writeValueAsBytes(message); } catch (JsonProcessingException e) { throw new SerializationException(e); } } }
The body of JacksonDeserializer.java.ejs could be as follows:
JacksonDeserializer.java.ejs
import java.io.IOException; import java.util.Map; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; public class JacksonDeserializer<T> extends JacksonAbstract implements Deserializer<T> { private Class<T> cls; public JacksonDeserializer(Class<T> cls) { super(); this.cls = cls; } @Override public void configure(Map<String, ?> settings, boolean isKey) { //Unneeded } @Override public T deserialize(String topic, byte[] bytes) { if (null == bytes) { return null; } try { return getMapper().readValue(bytes, this.cls); } catch (IOException e) { throw new SerializationException(e); } } }
The body of JacksonAbstract.java.ejs could be as follows:
JacksonAbstract.java.ejs
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategy; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; import java.io.Closeable; public class JacksonAbstract implements Closeable { private static ObjectMapper mapper; static { mapper = new ObjectMapper(); mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.registerModule(new JavaTimeModule()); mapper.registerModule(new ParameterNamesModule()); mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); } public static ObjectMapper getMapper() { return mapper; } @Override public void close() { //Unneeded } }
And then JacksonSerde would be used like this in KafkaProperties: JacksonSerde.of(Entity.class).serializer() or JacksonSerde.of(Entity.class).deserializer().
JacksonSerde
KafkaProperties
JacksonSerde.of(Entity.class).serializer()
JacksonSerde.of(Entity.class).deserializer()
Linked with #63
Only create a EntitySerde.java.ejs instead of Jackson* classes above
EntitySerde.java.ejs
Jackson*
The body of
JacksonSerde.java.ejs
could be as follows:The body of
JacksonSerializer.java.ejs
could be as follows:The body of
JacksonDeserializer.java.ejs
could be as follows:The body of
JacksonAbstract.java.ejs
could be as follows:And then
JacksonSerde
would be used like this inKafkaProperties
:JacksonSerde.of(Entity.class).serializer()
orJacksonSerde.of(Entity.class).deserializer()
.