Closed buritos closed 4 years ago
@VaughnVernon made some changes for reducing allocation overhead from defensive use of ByteBuffers. I think that the new version is a good balance between safety and performance.
I used Mono
from reactor because that was what was already there. Perhaps we should switch to Completes
? How would I go about that?
@alexguzun perhaps you can improve on my solution in Outbound::broadcast
. I am not too familiar with reactor, so there are good chances that there is a better way to do what I did there.
I also looked into RSocketClienChannel::requestWith
but could not find a way to do something similar. @alexguzun is it correct that channelSocket.requestChannel(this.publisher)
returns a Flux with the Payloads from the responder?
@buritos Since I previously approved too early I am leaving this to @alexguzun to approve.
ManagedOutboundChannel::write(ByteBuffer)
write
into aMono<Void>
by default. The returnedMono
is used further by the caller to registerdoFinally
(e.g. release theConsumerByteBuffer
) and to control when processing starts.ByteBuffer::allocate
andByteBuffer::put
inRSocketOutboundChannel::write
.Outbound::sendTo
to usewriteAsync
Outbound::broadcast
to usewriteAsync
. We wrap the backing byte array into a read onlyByteBuffer
so that each thread gets its own position to read from.MockManagedOutboundChannel::write
to make a copy of the read onlyByteBuffer