DataStax Bulk Loader (DSBulk) is an open-source, Apache-licensed, unified tool for loading into and unloading from Apache Cassandra(R), DataStax Astra and DataStax Enterprise (DSE)
Apache License 2.0
85
stars
30
forks
source link
Refactor LogManager to remove UnicastProcessor #459
It's one of the most complex classes in DSBulk and besides, it now contains a maintenance risk: it relies heavily on UnicastProcessor and FluxSink, two Reactor classes that were deprecated for removal and removed in Reactor 3.5.0.
These classes are supposed to be replaced by the new Sinks.Many API, but the new API does not offer support for multi-threaded access to the sink, see:
Many threads could be invoking the same error logging routines in parallel.
Access to different log files must be serialized in order to avoid printing garbled content.
UnicastProcessor is capable of serializing incoming messages received in parallel, and process them sequentially, which elegantly solves both of the problems above.
The closest thing in the new API to UnicastProcessor would be Sinks.many().unicast(), but last time I tried, it throws FAIL_NON_SERIALIZED almost immediately when 2 threads attempt to send messages to the same sink in parallel.
So we are left with the task of re-implementing a way to serialize messages passed to the sink, or to imagine a whole different design.
We can fix item 1 above by making it impossible for 2 threads to emit items on the same sink. This can most certainly be achieved with some refactoring. The key is to not reuse the same Function instance returned by LogManager in more than one Flux.transform() call.
That leaves us with just item 2 to solve. Maybe the new Sinks.API is enough here? If not, we can always use locks, even if that goes against the general reactive philosophy.
I'd like to propose to refactor
LogManager
.It's one of the most complex classes in DSBulk and besides, it now contains a maintenance risk: it relies heavily on
UnicastProcessor
andFluxSink
, two Reactor classes that were deprecated for removal and removed in Reactor 3.5.0.These classes are supposed to be replaced by the new
Sinks.Many
API, but the new API does not offer support for multi-threaded access to the sink, see:https://stackoverflow.com/questions/65029619/how-to-call-sinks-manyt-tryemitnext-from-multiple-threads
UnicastProcessor
was chosen for two reasons:UnicastProcessor
is capable of serializing incoming messages received in parallel, and process them sequentially, which elegantly solves both of the problems above.The closest thing in the new API to
UnicastProcessor
would beSinks.many().unicast()
, but last time I tried, it throwsFAIL_NON_SERIALIZED
almost immediately when 2 threads attempt to send messages to the same sink in parallel.So we are left with the task of re-implementing a way to serialize messages passed to the sink, or to imagine a whole different design.
We can fix item 1 above by making it impossible for 2 threads to emit items on the same sink. This can most certainly be achieved with some refactoring. The key is to not reuse the same
Function
instance returned byLogManager
in more than oneFlux.transform()
call.That leaves us with just item 2 to solve. Maybe the new
Sinks.API
is enough here? If not, we can always use locks, even if that goes against the general reactive philosophy.\cc @weideng1 @absurdfarce
┆Issue is synchronized with this Jira Task by Unito