Closed benjchristensen closed 9 years ago
Not sure in use cases 1 and 3 what is subscribing to the concatWith, if you can elaborate on how you see this. Is concatWith an eager subscribe operation ? To me it seems we leak the write everywhere where the same stream could be unified and composed in a single connection.write call where the returned publisher would inform of completed writes and let its subscribers requesting manually for more data to write.
connection.write(protocolRequest.concatWith(
connection.getInput()
.concatMap(response -> { //note that concatMap could happen after the concatWith too
// handshake response
if(response instanceof HANDSHAKE && response == ABC) {
// handshake ABC so output in "ABC" format for next write
return ABC-formatted-request()
} else if(response instanceof HANDSHAKE && response == DEF) {
// handshake DEF so output in "DEF" format for next write
return DEF-formatted-request()
} else {
// response data here
return doStuffAsynchronouslyWith(response);
}
})
).subscribe(postWritesSubscriber); //optional, would default to the IO writer to auto-request the user defined write publisher.
what is subscribing to the concatWith
For brevity I didn't include the outer handler code. These would all be returning a composed Publisher lazily to the server implementation which would subscribe and run the lifecycle. Thus, the Publisher emitted by concatWith is returned to the server which subscribes to invoke the lifecycle for that connection.
Is concatWith an eager subscribe operation
No, nothing here is eager. It all composes and waits to be subscribed to by the server. This is a handler definition.
it seems we leak the write everywhere
In what way is it leaked? Just that the developer can call write
multiple times?
the same stream could be unified and composed in a single connection.write cal
That is elegant code and may work well for this example.
The one concern I can think of right now is related to flushing as it would only work if it flushed on every onNext
(or had a complicated flushSelector that knew how to differentiate). For example, the protocolRequest
portion would immediately complete after writing the contents asynchronously, then connection.getInput()
would wait for data but would never respond if protocolRequest
was not flushed.
With the independent calls to write
each can naturally call flush
when onComplete
is emitted:
connection.write(protocolRequest)
connection.write(ABC-formatted-request)
A single write
though would just see a single onComplete
and flush once:
connection.write(protocolRequest.concatWith(ABC-formatted-request))
Thus, if they are all just emitted as onNext
s on a single Publisher
then how does it know to call flush
after protocolRequest
but then efficiently stream ABC-formatted-request
without flushing every onNext
? The only way I can see this working is a non-trivial flushSelector
somehow interpreting the onNext
data dynamically. I imagine it would have to count onNext
signals or inspect data, neither of which sound good as options.
This leads back to the discussion in #21. If there is a mechanism for a developer to easily and deterministically control flushing then my stance on write
would change as it would be very elegant to return just a single Publisher
and let everything be composed into it.
Ah, talking with @NiteshKant reminded me of a key point I completely missed in my response, and far more important than flush semantics.
The other key reason why individual write
calls is so important is for error and completion handling on each write independently.
If write
is just done via the output Publisher
, then code can not make the choice to wait on a previous write
.
Using concatWith
allows waiting whereas mergeWith
allows writing concurrently without waiting. These are very important to allow.
Equally important, conditional error handling can not be composed onto writes if just a single Publisher
is returned.
For example, protocolRequest.concatWith(connection.getInput()
specifically uses concatWith
to wait until the write
completes successfully, or if it fails to skip the rest.
Here are simple examples showing merge
and concat
behavior with writes:
// this writes A and if it succeeds then writes B
connection.write(publisherA).concatWith(write(publisherB))
// which is the same as this
concat(connection.write(publisherA), connection.write(publisherB))
// this writes A and B concurrently
connection.write(publisherA).mergeWith(connection.write(publisherB))
// which is the same as this
merge(connection.write(publisherA), connection.write(publisherB))
// since merge is concurrent we could use a single write for it
connection.write(publisherA.mergeWith(publisherB));
// which is the same as this
connection.write(merge(publisherA, publisherB));
I have provided an example in issue #21 comment which explains why feedback of completion/error for a write stream is important in the case of stream multiplexing on a single connection.
After a few discussions in #21 I realize that the usecase in sample 3 of this issue may not be clear. So, here is an attempt to clarify it.
The sample is modeling a protocol that does the following:
protocolRequest
)ABC
and DEF
in the sample)ABC
then send a reply for ABC
style handshake. Referred to as ABC-formatted-request()
which returns a Publisher
DEF
then send a reply for DEF
style handshake. Referred to as DEF-formatted-request()
which returns a Publisher
doStuffAsynchronouslyWith(response);
(The variable response
here is misleading. It should just be a request
maybe)(The two handshake flavors aren't required but provided just to demonstrate condition processing)
In order to draw an analogy to an established protocol, with this hypothetical protocol. Let me take an example of HTTP/2. In HTTP/2 case,
protocolRequest
would essentially be a HTTP/1.1 request with an Upgrade
header requesting HTTP/2.handshake
would be a connection preface frame sent by the endpoint to negotiate the settings.Based on the above protocol definition, I am explaining the code below:
Step 1 in the protocol is done by:
connection.write(protocolRequest)
Step 2 says that if the request was successfully sent, then listen for handshake or requests. So, in order to make sure that input is subscribed only if the write was successful, we do:
connection.write(protocolRequest)
.concatWith(connection.getInput()
The sample now, intends to make the request/handshake processing sequential i.e. next message is processed only when the previous message was successfully processed and written back to the peer. For this reason, the sample uses concatMap
on the input:
connection.write(protocolRequest)
.concatWith(connection.getInput()
.concatMap(response -> {
This behavior is surely just an approach we took in this sample, it is not prescribed by the protocol.
Now, the processing of each message handshake/request is asynchronous and hence returns a Publisher
which is returned from this concatMap
on the input. The sample at this point is just demonstrating how to process different messages asynchronously.
connection.write(protocolRequest) // negotiate protocol
.concatWith(connection.getInput()
.concatMap(response -> {
// handshake response
if(response instanceof HANDSHAKE && response == ABC) {
// handshake ABC so output in "ABC" format for next write
return connection.write(ABC-formatted-request)
} else if(response instanceof HANDSHAKE && response == DEF) {
// handshake DEF so output in "DEF" format for next write
return connection.write(DEF-formatted-request)
} else {
// response data here
return doStuffAsynchronouslyWith(response);
}
})
}))
There are a few assumptions here:
Publisher<Void>
which reactive-ipc implementation subscribes to, that triggers this whole processing (discussed in issue #26)In the above example there are two points where the write composition is used:
protocolRequest
. Using concatWith
we are making sure that input is not subscribed if the write failed.concatMap()
of input where the asynchronous processing result of a message results in a write
. The write
and hence the processing is delayed (as concatMap
does not eagerly subscribes to the mapped Publisher
) till the time the previous processing is completed (written on the connection).By providing the ability to compose writes, we can achieve following variances:
protocolRequest
to be written before receiving further messages. So we would use connection.write(protocolRequest) // negotiate protocol
.mergeWith(connection.getInput()
instead of
connection.write(protocolRequest) // negotiate protocol
.concatWith(connection.getInput()
connection.getInput()
.flatMap(response -> {
instead of
connection.getInput()
.concatMap(response -> {
Closing this as we all seem to agree on the need of write composition.
It is required that writes be composable, particularly when everything is async since non-blocking writes can not be ordered or waited upon without an abstraction permitting composition.
Consider the following use cases:
The approach taken above involves
write
returning aPublisher
that can be composed. If that is not the desired implementation these same composition capabilities must be accommodated by whatever is done in its place.