Open NiteshKant opened 9 years ago
IMO this should be handled by the Subscription
implementation which can either request(1)
, which means "give me the next message to write to the client and flush it" or request(N)
, which means "give me the next N items and I'll write them in a batch".
IMO this should be handled by the Subscription implementation which can either request(1), which means "give me the next message to write to the client and flush it" or request(N), which means "give me the next N items and I'll write them in a batch".
In that case, the flush boundaries will be defined by the receiver of the content (reactive-ipc server infrastructure in this case), however, the usecase of providing a flush semantics would be for the producer to define the flush boundaries.
IOW, I as an invoker of write
must be able to decide when to flush (based on business logic of what logically constitutes a batch) and not the channel implementation of when is it the optimal time to call flush
which is the request(n)
semantics
I think "batch writes" is a reasonable feature to give applications (the Publisher in this case) control over buffering of writes because they may be able to anticipate cases when that would be beneficial.
However isn't some control over flushing necessary simply because servers have a write buffer? For example if I'm sending a STOMP message over TCP when the full frame is written I want any pending writes flushed as soon as possible, not when the buffer fills up.
My only concern with disconnecting flush from onComplete
is that you have to maintain a collection of active write subscribers that you can later onComplete
internally if you expose a flush
method. I was hoping to avoid having to track active writes and leave them to be active via scope.
The Publisher
would remain in control of the flush since it doesn't have to provide the number of onNext
calls requested--it just can't provide any more than that. So the write Publisher
could be configured to only call onNext
once and then onComplete
, which causes the transport to flush.
We have implemented a batch write in Reactor by exposing a Consumer
the user can get access to. In our case, we would provide a Subscriber
the layer above could write batches by simply calling onNext
and then onComplete
.
Subscriber<T> writeBatch();
My only concern with disconnecting flush from onComplete is that you have to maintain a collection of active write subscribers that you can later onComplete internally if you expose a flush method.
The write method currently has no Javadoc. Do you mean for that to be called once providing a Publisher for outgoing data? It sounds more like you intend for it to be called any number of times since you refer to managing multiple write subscribers. Wouldn't it make more sense to have a single Publisher for output? Just like there is one Publisher for input.
So the write Publisher could be configured to only call onNext once and then onComplete, which causes the transport to flush.
So if I have to write and flush 5 times I would call call write
5 times each time passing a Publisher? How would I then indicate that there will be no more writes?
What I was thinking about is in terms of flushing is that it would simply translate to a call to Netty's Channel.flush()
. I don't see why it has to be anything more than that.
Any of the @reactive-ipc/owners are welcome to change, tweak, or completely rewrite and reorganize the current code. It's just a sketch to get discussion going.
@jbrisbin is it so that you think the proposal is to provide a separate flush
method? What I was proposing was to provide a flush semantic via the connection and the proposal was to provide flush boundaries on a stream of data, something like:
Observable<Void> writeAndFlush(Observable<W> msgs, Func1<W, Boolean> flushSelector);
I think what you are saying is that the flush is always on onCompleted
so we can either have a separate Publisher
per batch or provide a "batch writer" kind of an API on which the user can fire items to write and onComplete
to flush. Having a separate Publisher
per batch means extra object allocation and the batch writer API seems awkward as it is disconnected from the main write method and hence provides more abstractions than desired IMO.
However isn't some control over flushing necessary simply because servers have a write buffer? For example if I'm sending a STOMP message over TCP when the full frame is written I want any pending writes flushed as soon as possible, not when the buffer fills up.
@rstoyanchev It's a good point that the underlying implementation should be able to flush
data and it surely will be the case, however, it will be a transport implementation detail and need not be a part of the API. Flush here is more of a hint from the Publisher
about what constitutes a batch and not a protocol that has to be strictly followed by the transport implementation. IOW, a flush boundary means at least flush at that boundary and not only flush on that boundary.
In the ZMQ API you can choose to always send a Msg
with the indication that there are more coming via the ZMQ_SNDMORE
flag. This is the batch semantic that I have in my mind when thinking about an abstraction that would cover that use case as well without having to introduce something unique to accommodate it.
IMO both ZMQ SNDMORE
and Netty's flush
semantics are the same. They place a somewhat arbitrary boundary around a series of writes. writeAndFlush
then is simply a ZMQ .send()
with the SNDMORE
flag being false
or absent. It's a write
signal immediately followed by an endOfBoundary
signal, which is interpreted in Netty as a flush but in ZMQ as a SNDMORE == false
.
Part of the problem here is that Reactive Streams has no concept of "checkpointing". We can't send an interrupt down a pipeline. There's either the next chunk of data or there's a terminal complete. In order to flush a stream we have to be able to checkpoint it in some way without completing it to demarcate data boundaries.
If we use onComplete
as the demarcation, then we have to recycle or throw away that Subscriber
. Pros are that it's a clearly identifiable and an existing demarcation in Reactive Streams. Cons are that it will involve recycling or throwing away objects since onComplete
is terminal.
Without using onComplete
exclusively you would probably need a flow like:
Subscription.request(bufferSize)
(0..batchSize).onNext()
pending == 0
signal flush
Subscription.request(bufferSize)
onComplete && pending > 0
signal flush
Part of the problem here is that Reactive Streams has no concept of "checkpointing". We can't send an interrupt down a pipeline. There's either the next chunk of data or there's a terminal complete.
The next chunk of data (or the next item).. there is a Buffer abstraction to represent it in the current code. Couldn't that be used not only as a container for a byte buffer but also to express additional write semantics such a flush hint?
@rstoyanchev That was my thought. I wanted to see what others had to say before suggesting that since I seem to be playing from a different score here.
The next chunk of data (or the next item).. there is a Buffer abstraction to represent it in the current code. Couldn't that be used not only as a container for a byte buffer but also to express additional write semantics such a flush hint?
I have two problems with this approach:
Buffer
object: I think this is a bigger concern separated from flush and we can discuss this in the issue #10.In order to explain 1 above, let us consider this simple server that writes a range of integers to every connection. Such a server can be written as:
RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(Observable.<Integer>range(1, 100)));
Now, let us consider a protocol in which the client requests for n
items at a time. n
is sent at the time of handshake. I am omitting the handshake to reduce verbosity and let us assume that I get the number n
per client connection.
Now, I know that my data is static (it is a static list of 100), so I would want to write n
items and then flush as I know that the client will need atleast n
items and I know I have that many or I flush on complete.
If I was to annotate flush
boundaries in the data, I would have to do something like:
RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(Observable.<Integer>range(1, 100)
.map(/*convert i to Buffer and add flush boundaries*/)));
This means I would have one extra object allocation (Buffer
) per item per connection.
I can not even emit Buffer
from the source as that will mutate the Buffer
with flush boundaries per subscription.
However, if the flushing semantics are decoupled from the source of data, we can layer it in any which way we want. In this case, it will be like:
RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(Observable.<Integer>range(1, 100),
(count, integer) -> count % n));
I have experimented with a different way of flushing, one that provides the same capabilities and on the implementation level is very much the same. Rather than providing a FlushSelector
through an overloaded write(Publisher, FlushSelector)
method, in this alternative the Subscriber
of the transport implements Flushable
with a single flush()
method. So a Publisher that cares about flushing can alternate between calls to onNext(T)
and flush()
.
https://github.com/rstoyanchev/reactive-ipc-jvm/tree/tcp-poc
The end result is functionally equivalent and much cleaner.
writer(Publisher)
, no awkward questions about trade-offs, when to use which, etc.TcpConnection
is cleaner without FlushSelector and flushing as a concept elevated to that level.Flushable
is a choice for the implementation as it should be since flushing may not apply.@rstoyanchev can you provide an example of how this will work when the publisher is coming from a different component? May be taking the same example as I had provided in my previous comment:
let us consider this simple server that writes a range of integers to every connection.
In addition let us assume that the range of integers are coming from a component IntegersProvider
which has an interface:
public interface IntegersProvider {
Publisher<Integer> provide();
}
So, now the server would be:
RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(integersProvider.provide()));
Good point since a Publisher may not know how to flush or there may be reasons to use the same Publisher with different flushing strategies.
If the write Subscriber exposes some polymorphic behavior (like flushing) that a Publisher doesn’t know how to use or doesn’t want to, it can be adapted. I’ve experimented with a WritePublisher that can adapt any Publisher for writing purposes with flushing. Obtaining a Publisher for writing becomes:
WritePublisher.adapt(integersProvider.provide()).withFlushCount(n)
Note that this is only meant to illustrate an idea. The adapter can expose any number of alternative ways to flush. Instead of the simple flush count above, perhaps some flush trigger syntax (like a cron trigger), for example “5/100,2” could mean “flush after every 5th item for the first 100, every 2nd thereafter”. In general it should be very feasible to satisfy most common needs, especially those involving an item count, through a declarative option like that. For more advanced cases the WritePublisher could still be configured with some flush strategy that accepts the last written item and Flushable but maybe we don’t need that to start.
Note that this is only meant to illustrate an idea. The adapter can expose any number of alternative ways to flush.
I think it is useful to see how this approach will look like with different adapter layers vis-a-vis let us say providing a simple flush()
method on the connection.
Having to intercept subscriptions to know about a specific kind of Subscriber
implementation to decide whether flushing is available or not, does not seem to intuitive/clean to me. IMO it is too convoluted.
I think that's looking at it from the implementation side too much.
The idea with WritePublisher is that it is provided and it can be used to adapt any Publisher for writing to the transport with flushing. It becomes a good place to split out and expose any number of flush-related convenience options at will (declarative flush trigger, simple flush count, flush callback strategy,...) something we'd be hard-pressed to do in TcpConnection for good reasons. I actually liked the result of adding WritePublisher to my own sample and also having a single method for writing on TcpConnection is important because it communicates clearly there is one way to write and that has nothing to do with flushing.
I think it is useful to see how this approach will look like with different adapter layers
I'm assuming what can be done with RS should be possible with adapters as well but yes, need to do that in general.
I'm assuming what can be done with RS should be possible with adapters as well
I am sure we can make it work. It is a question of how intuitive or error prone it can be. In order to explain, let me elaborate how it will look like when used with RxJava:
Observable<ByteBuf> stream = Observable.interval(1, TimeUnit.SECONDS)
.map(aLong -> Unpooled.buffer()
.writeBytes(("Interval: " + aLong).getBytes()));
RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(stream.lift(subscriber -> {
final Flushable flushable = (subscriber instanceof Flushable) ? (Flushable)subscriber : null;
return new Subscriber<ByteBuf>(subscriber) {
private int count;
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(ByteBuf o) {
subscriber.onNext(o);
if (null != flushable && ++count % 2 == 0) {
// flush
}
}
};
})));
(The reason for using lift above is because that is how you can intercept subscription.)
If we have flush()
as a method on the connection, it will look like:
Observable<ByteBuf> stream = Observable.interval(1, TimeUnit.SECONDS)
.map(aLong -> Unpooled.buffer()
.writeBytes(("Interval: " + aLong).getBytes()));
RxTcpServer.create(transport)
.startAndAwait(connection -> {
final AtomicInteger itemCount = new AtomicInteger();
return connection.write(stream.map(bb -> {
if (itemCount.incrementAndGet() % 2 == 0) {
connection.flush();
}
}));
});
and when we have the flush selector as proposed in #19 this will look like:
RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(stream), (count, item) -> count % 2 == 0));
In the above you can see the intuitiveness of the approaches.
Moving from intuitiveness to error prone nature of the design, let us say, after applying our operator (via lift()
in the first example), there is any other operator that is applied to the stream
, the subscriber type will change (every operator introduces a subscriber in the chain). This transformation can be as part of the reactive IPC code or user code depending on how it is used. In presence of a transformation, suddenly the (subscriber instanceOf Flushable)
will fail because the passed Subscriber
is no more the Subscriber
used by the writer of the Publisher
on the channel. This is a huge issue.
It becomes a good place to split out and expose any number of flush-related convenience options at will (declarative flush trigger, simple flush count, flush callback strategy,...) something we'd be hard-pressed to do in TcpConnection for good reasons.
Is there a good reason that the alternative approaches I proposed above are not appropriate? Also, with those approaches do you think the different strategies you mentioned can not be applied without providing overloaded methods on the Connection
object?
there is one way to write and that has nothing to do with flushing.
Isn’t it so that we are talking about two different ways of writing, viz.,
flush
(if applicable) at the completion of the stream.Whether we push it one level deep by having the flush
as part of the Subscriber
or on Connection
. It does not really change that, rite?
This is not at all what I meant but I do see the challenge when used with RxJava. A Publisher is a simple interface that's easy to implement to decorate another Publisher transparently. The same is not possible with an Observable so you are going down the path of applying an operator which again is not what I intended at all.
I'm not saying that Observable should implement an interface. I understand Observable is about composition and it's functional by nature. However when an Observable is provided to the RxConnection write method, it will no longer be used for its composition API. It will be turned to a Publisher. In other words the write method in RxConnection is nothing but a convenience to spare you from saying toPublisher so I feel there is a way to not lose that convenience and still be able to decorate the Publisher. We'll play around and get back to you.
Isn’t it so that we are talking about two different ways of writing, viz., Write with flush (if applicable) at the completion of the stream. Write with different flush strategies.
This is exactly what I'd like to avoid. To me the write(Publisher) method looks like first class citizen and write(Publisher, FlushSelector) like something to use when the other doesn't work. So now I start thinking how can I elegantly use the first method so I get the flush behavior I want? Perhaps I can split into two Publishers, or perhaps if I'm really cool I might think of something along the lines of observable.window(5,timeUnit.Second).flatMap( conn.writer(it)).subscribe(flushSubscriber)
. I really don't think that flushing should become so pervasive a question.
There should be one way to write and flushing should be expressed as a separate concern. The fact that there is a flush in onComplete is just something that happens. Not another flushing strategy.
Even something like this:
connection.write(stream), (count, item) -> count % 2 == 0)
which I'll admit looks nice and short but is imperative for the simplest of cases. You're not saying when to flush but how to flush vs something like flushEvery(5)
.
This is not at all what I meant but I do see the challenge when used with RxJava. A Publisher is a simple interface that's easy to implement to decorate another Publisher transparently. The same is not possible with an Observable so you are going down the path of applying an operator which again is not what I intended at all.
The example I gave was not for decorating Observable
per se, it was for intercepting subscriptions (as I mentioned in my comment (The reason for using lift above is because that is how you can intercept subscription.)
) and implementing an operator is a way to intercept subscriptions. It was because the proposed design requires a knowledge of what interface a Subscriber
implements
However when an Observable is provided to the RxConnection write method, it will no longer be used for its composition API
Not really. It just depends on how the implementation uses it. eg: I can transform the passed Observable
to implement a flushing strategy, metrics, insights, etc. which means applying an operator on the passed Observable
which modifies the Observable
and hence has the issue about changing the Subscriber
instance as I mentioned earlier.
This is exactly what I'd like to avoid. To me the write(Publisher) method looks like first class citizen and write(Publisher, FlushSelector) like something to use when the other doesn't work.
It depends on whether we want to provide flush
semantics on the connection or not. If we do, flush
is a first class citizen. Just like flush
is a first class citizen in a OutputStream
, Netty channel
, ServletOutputStream
, etc. Either we provide flush or not, providing a flush semantic and hiding it in another abstraction is something I do not follow.
I really don't think that flushing should become so pervasive a question.
Where do you think flush is getting pervasive here? Is it so that every user of an API uses all functionalities of the API? Simple example: Java Collection
has two methods to add elements add
and addAll
. I can surely use add
in a loop if I want to and addAll
becomes totally redundant. That doesn't mean addAll
is pervasive, it is just another option if required by some user.
You're not saying when to flush but how to flush vs something like flushEvery(5).
AFAIU, we are only saying when to flush, how to flush (how to flush data on the physical socket) is an implementation detail.
I won't answer inline as we seem to be going in circles. Rather than spending any more time on this, could we go with the writer(Publisher)
method with flushing onComplete for now, something we seem to agree on and move on? Perhaps we need to progress a little more before coming back to this question.
I agree with Rossen. This discussion is going nowhere and we have places to be and other thorny problems to solve.
Please let's start with common ground and make forward momentum to flesh out these other issues. I suspect that as we get further into this we'll see an obvious but good solution. I think right now we're just going to have to agree to disagree on the FlushSelector (which at this point is more of a premature optimization IMO).
I'd hate to see this effort miss its goals because we spent a month arguing over a feature not universal to all implementations we know will come soon (ZeroMQ and Java Chronicle to name two) and used only in specific use cases in transports that actually do support it.
I think we agree more than we disagree on the other issues and we should focus on those and actually iterate some code.
This has been discussed partially in for in issue #3 here and here
Flush semantics intend to optimize on reducing system calls to write data on to a physical connection and hence provides a way to do "batch writes" as one system call. This is a fairly established semantic across all network libraries.
This issue is to discuss whether we need flush semantics as part of a
TcpConnection
(and also for other transports)Since, the only pure RS way of writing to a connection is via a
Publisher
, if we provide flush, we will have to connect it to the items emitted by thatPublisher
, which essentially means that we should get a feedback from somewhere, upon each emitted item, whether this emission should be followed by a flush or not.One way of doing so would be like what RxNetty does (in an unreleased version) here via a
flushSelector
like:The
flushSelector
function is invoked after everyonNext
on the stream and if the selector returns true, a flush is invoked. With this we do not need an explicit flush function but the concept is available to the user.