Open gordon-rennie opened 3 years ago
I have done some code analysis for serialisation:
fs2-kafka-vulcan
interfaces with the Java code here:
https://github.com/fd4s/fs2-kafka/blob/de26d891bdf4f28585371d7e480ef99c324010f2/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala#L26F.pure(serializer.serialize(topic, value))
doesn't seem quite right to me - serializer.serialize
calls into the Java code, which is not pure and performs IO including checking a schema cache and (on cache miss) making a schema registry API call. This seems like the call site that should be enhanced with retry logic.serializer.serialize(...)
invokes Java code in KafkaAvroSerializer
which defers to the AbstractKafkaAvroSerializer
here.IOException
or RestClientException
, and are caught and mapped here to either SerializationException
(as observed in my linked stack trace) or an org.apache.kafka.common.KafkaException
respectively. Information about the API response's statusCode: int
and errorCode: int
are preserved in the causing RestClientException
and could be used to be selective about which cases to retry on.
F.pure(serializer.serialize(topic, value))
doesn't seem quite right to me -serializer.serialize
calls into the Java code, which is not pure and performs IO including checking a schema cache and (on cache miss) making a schema registry API call.
This is wrapped in a defer
block, so it does end up getting suspended. But I agree it's not the clearest way to do it.
This seems like the call site that should be enhanced with retry logic.
I agree - a PR would be very welcome!
I agree - a PR would be very welcome!
:+1: thanks for the feedback! I am happy to take a crack at this (I will also review and include deserialisation in scope, too).
Serialisation / deserialisation using vulcan codecs in
fs2-kafka-vulcan
requires communication with a Schema Registry REST API, implemented in Confluent's Java code whichfs2-kafka-vulcan
wraps. Transient problems communicating with the registry's API - e.g. network problems, timeouts, unexpected server-side errors - bubble up from the schema registry client's Java code tofs2-kafka-vulcan
and cause fatal exceptions unless deliberately handled in your application code -fs2-kafka-vulcan
makes no attempts to retry. This makes Avro ser/des a weak point forfs2-kafka
application resiliency.For example, I recently experienced an unhealthy Kafka cluster affecting both broker and schema registry. Interactions with the broker were robust and self-recovered from various errors, but a
SocketTimeoutException
communicating with the Schema Registry during serialisation blew up the application (stacktrace).Is there appetite to include Schema Registry retries in
fs2-kafka-vulcan
? As I see it, ideally the library would have some default retry config - similar to the situation with the Kafka broker publisher/consumer. Having reviewed the Confluent Java code, there's no allowance for retries in the client or related classes, so this would have to be added infs2-kafka-vulcan
.