Open jbrisbin opened 9 years ago
Given the consensus under https://github.com/reactive-streams/reactive-streams-jvm/issues/268 towards a Promise that is not a Reactive Streams Publisher, it makes sense to at least define the semantics of what using Publisher<Void>
in reactive-ipc means. Another temporary option for the time being might be to create a sub-type of Publisher
To illustrate further, in the current Reactor sample, if read backpressure is added, no demand is generated since Reactor operators do not generate demand by themselves unless the sample is written as follows in which case consume()
generates the demand:
TcpServer<ByteBuf, ByteBuf> transport = Netty4TcpServer.<ByteBuf, ByteBuf>create(0);
ReactorTcpServer.create(transport)
.start(connection -> {
connection.flatMap(inByteBuf -> {
String text = "Hello " + inByteBuf.toString(Charset.defaultCharset());
ByteBuf outByteBuf = Unpooled.buffer().writeBytes(text.getBytes());
return connection.writeWith(Streams.just(outByteBuf));
}).consume();
return Streams.never();
});
The assumption is that no demand is requested on the `Publisher<Void>` returned from the handler.
I think I see what is causing the confusion here. Let me start with the Tcp Echo example that is present for RxJava as well as reactor. The code in the example really is wrong, ideally, one should write the echo server as:
(RxJava example)
RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(connection.map(bb -> {
String msgStr = "Hello " + bb.toString(defaultCharset());
return Unpooled.buffer().writeBytes(msgStr.getBytes());
})));
with an additional construct on the write to instruct a flush on each write, something like:
RxTcpServer.create(transport)
.startAndAwait(connection -> connection.writeAndFlushOnEach(connection.map(bb -> {
String msgStr = "Hello " + bb.toString(defaultCharset());
return Unpooled.buffer().writeBytes(msgStr.getBytes());
})));
However, we could not agree on the flush semantics in our discussions in issue #8 and hence I did not add the method to flush on each write (or similar).
When writing the example like this and returning the result of writes from the connection handler, the demand is generated by the writer on the channel which drives the demand on the read. The Publisher<Void>
returned by the connection handler would then just trigger the subscribe (as designed).
Coming to the Promise
question, I don't like the idea of introducing the Promise
here as a Promise
is hot and always returns the same result (completion/error in case of Void
). Returning a Promise<Void>
from a write
method for example would mean that we can not re-subscribe to the returned Promise
to replay the write, which I don't necessarily like.
In the reactive streams issue linked here, I see it more towards introducing a 0 or 1 item Publisher
(just like what RxJava introduced as Single
) as opposed to a Promise
per se, so I think the use of the term Promise
is a bit confusing in this issue.
This exposes something we hadn't noticed before in TcpConnectionImpl:
@Override
public Publisher<Void> write(final Publisher<? extends W> data) {
return new Publisher<Void>() {
@Override
public void subscribe(Subscriber<? super Void> s) {
nettyChannel.write(data).addListener(new FutureToSubscriberBridge(s));
}
};
}
So if you subscribed twice, the write would be performed twice? Can you elaborate on the intent here?
In terms of the example, I see your point about how else it might have been written in which case demand is generated by the write side. That's perfectly valid but it's also possible for a TcpHandler to return Publisher
ReactorTcpServer.create(transport)
.start(connection -> {
Promise<Void> promise = Promises.prepare();
connection.flatMap(inByteBuf -> {
String input = inByteBuf.toString(Charset.defaultCharset()).trim();
if ("quit".equalsIgnoreCase(input)) {
promise.onComplete();
return promise;
} else {
String text = "Hello " + inByteBuf.toString(Charset.defaultCharset());
ByteBuf outByteBuf = Unpooled.buffer().writeBytes(text.getBytes());
return connection.writeWith(Streams.just(outByteBuf));
}
}).consume();
return promise;
});
I'm not sure why I got the opposite impression of where the Reactive Streams discussion of Promise was headed. Ben's last comment was he doen't like the "is a" relationship of a Promise to a Publisher. Then Victor commented about a (new) spec and "converters for RS Publisher" which I took to mean that such a Promise would not be a RS Publisher.
Regardless, I think the question around what the use Publisher<Void>
still stands as much as before, if not more. Do you foresee calls to onSubscribe and generating demand as per RS for example?
So if you subscribed twice, the write would be performed twice? Can you elaborate on the intent here?
Yep, the write will be performed as many times, the returned Publisher
is subscribed. Such a Publisher
is a "cold publisher" which means that it does nothing unless subscribed. In RxJava, all out of the box Observables
are cold and the source is re-run on every subscribe. This behavior can be altered by using an operator such as cache
, publish
, etc. which caches the source and does not re-subscribe to the origin on each subscription.
Coming to why write
specifically is lazy and re-run on every subscription is that being lazy makes it possible to define the write as part of a declarative Rx chain which is run on subscription. It is re-run on every subscription to make the subscriptions isolated (no shared state between subscriptions and unnecessary checks to make sure there is only one subscription or caching of result). It also is the most granular, low-level construct that supports both replay write (default) and single write (by using higher level operators in higher layers like RxJava). OTOH, if we do not replay the write, then addressing the use case where we have to actually send the same message multiple times can only be achieved by allocating a new source Publisher
for every write.
Are there use cases, where replaying the write is required?
Yes, I have scenarios where I use this nature of RxNetty writes, eg: for a TCP connection that sends scheduled heartbeats till the connection is alive. For such cases, you can create and keep this lazy write Publisher
and subscribe to it whenever the message is to be sent.
I think the question around what the use Publisher
still stands as much as before, if not more.
Sure, we should discuss this. At this point, it appears to me that what we want is more of a disambiguation that the subscriber of the returned entity (Publisher<Void>
as of now) would never generate a demand, the execution is just deferred. Am I correct?
At this point, it appears to me that what we want is more of a disambiguation that the subscriber of the returned entity (Publisher
as of now) would never generate a demand, the execution is just deferred. Am I correct?
After some discussion we generally agree with this (for now at least) and yes it needs to be documented on TcpConnection and the TcpHandler.
That said are there some differences between the two? If the Publisher
Looking at you example here, a subscribe is necessary to trigger the write. At the same time, Publisher
Regarding the Publisher
We should discuss the appropriateness of using a
Publisher<Void>
rather than an actualPromise
. This is related to reactive-streams/reactive-streams-jvm#268 which suggests that a standardizedPromise
type should be created. In that discussion the issue is brought up that aPromise
should not be aPublisher
directly (though it could be converted to one).We are using a
Publisher<Void>
in several places as a stand-in for a "real"Promise
but we treat it differently than we do other publishers in that we don't callonSubscribe
and we don't wait for aSubscription.request
to do work. We expect theSubscriber
passed to thatPublisher
to buffer theonComplete
event until its ready to handle it if it isn't ready immediately.There are two issues here that may or may not be separate:
1) Should we use a "real" Promise type in RIPC that is not a
Publisher
and that deals withonComplete | onError
as always terminal events with defined and well-known semantics? 2) Depending on the answer to 1) and the results of the work to create a standardizedPromise
type under the reactive-streams umbrella, should we define what the semantics of aPublisher<Void>
are since they seem to be different than any other publishers and have to be treated differently as a result?