reactive-ipc / reactive-ipc-jvm

Reactive IPC for the JVM
Apache License 2.0
55 stars 13 forks source link

Multiplexing TCP Writes #21

Closed benjchristensen closed 9 years ago

benjchristensen commented 9 years ago

A TcpConnection should support multiple writes to allow multiplexing. I suggest also that flush semantics need to be supported per writer.

For example:

write(publisherA);
write(publisherB);

Both A and B can onNext on the same TCP connection and then it depends on the protocol to order the messages if required. e.g.: When implementing an HTTP/2 kind of protocol, when there can be multiple streams on the same connection, and every stream frame is independent, one can write the items without the need of ordering across streams.

This could instead be done by forcing our APIs to only allow a single Publisher to be given to a connection, but then we should not have a write method that can be called more than once.

I suggest that write(Publisher<T> p) (or something matching this signature) exist and that it must support multiple concurrent writers/publishers so that multiplexing is embraced.

I also suggest that each writer needs to be able to control flush semantics independently of other writers.

Continuing the example above, if "A" and "B" are both writing, but "A" doesn't care when it flushes but "B" does, "B" should be able to trigger a flush which will of course flush messages from both "A" and "B", but is only triggered by "B".

Or "A" and "B" could have different logic as to when they need to flush and are both permitted independently to trigger flush semantics.

One approach to this is overloading the write method to support each writer providing it's own control for flushing so if "A" does not care but "B" does it could be like this:

write(publisherA);
write(publisherB, flushSelector);
smaldini commented 9 years ago

Quick question, why multiplexing is not something handled by the composition layer ? Wouldn't it be more easy in the end to address a single Publisher writer or even a Processor writer (that propagates write acks to its subscribers) ?

benjchristensen commented 9 years ago

Reason2 we have found is:

1) it is awkward to enforce only 1 write per connection 2) the Publisher can't influence or invoke flush and we needed an ability allow each to control flushing independently of each other

The first can be perceived as aesthetic but it also affects usability since it is surprising to people not not be able to write more than once without composing to a single Publisher.

The second reason is more functionally concrete. If the second didn't exist I wouldn't argue strongly on the first.

NiteshKant commented 9 years ago

I think the question is also "what benefit would we get by disallowing multiple writes".

If we have a reason for that, we will be able to better discuss as to which is more important, allowing or disallowing multiple writes.

Control on flush per Publisher is important and so is the ability to get write result per publisher, if we allow a single write & merge multiple write sources in the composition layer, we would no be able to give any feedback per write source which will be important if writes are independent.

IMO, disallowing multiple writes is a big assumption and we may be impeding requirements of many protocols built on top of TCP.

smaldini commented 9 years ago

I may not follow then, a publisher can publish chunked data T then complete. Flush is orthogonal to that and actually could be mapped with other reactive stream behavior (e.g. on subscription.request) or directly in the writer method, defaulting to flush on complete. Besides write() can accept any form of Publisher and coud make use of "flushing" Publisher if detected (e.g. FlushPublisher extending Publisher). There are many more issues to consider in merging multiple writes, specifically the concurrency and the ordering.

But again I might not understand, I just don't want to create a publisher per chunk write :+1: and I don't see the need for write multiplexing at IO level since the writer is anyway a Subscriber that only accepts non concurrent onNext signals. Unless we want to implement some kind of Thread Stealing / Serialized Subscriber ... It's easy enough to use combinatory operations from Rx or other RS impl like Reactor to leave that decision away to the user e.g. https://gist.github.com/smaldini/4c85515c210503b2092e#file-gistfile1-java-L4 is creating windows of things to accumulate in ByteBuf (Reactor Buffer) merged in flatMap.

smaldini commented 9 years ago

Also if multiple writers for a single connection are allowed, how is Subscription#request meant to work then, are we splitting the demand, dedup ? Assuming the writer uses Reactive Stream contract to propagate back pressure assisted by Subscriber.onSubscribe(Subscription), which is intended to have a single upstream publisher.

benjchristensen commented 9 years ago

I'll let @NiteshKant provide the detailed answer, but simply the answer for backpressured multiplexed writes is the same as the merge operator in Rx/RS. Each Publisher writing is being "merged" and thus has the backpressure semantics of an Rs/RS merge. Each has a buffer and request semantics.

Putting aside other requirements for multiple write invocations, this should be done once and well in ReactiveIPC so users do not need an RS implementation with merge to do this.

That said, write is needed for completion and error handling and flush control, so since it's already needed for those it is also exposed for multiple writes to allow multiplexing without requiring an RS impl with merge.

NiteshKant commented 9 years ago

In order to explain the usecase, let me take a more concrete example:

Let us assume we are creating a factorial server that multiplexes multiple factorial requests on the same connection. Each request has a requestId and the response contains the requestId to which this response belongs.

If we do not provide multiple writes, the code will look like this:

connection.write(connection.flatMap(req -> calculateFactorial(req)));

The above is absolutely fine till we get into errors from write. So, let us assume that one of the factorial calculation caused an encoder error (overflow, malformed response, etc.). What should be the state of write after this? Should we continue or should we abort the rest of the writes? If we continue, how does the producer of the malformed response know that the response was not sent? If we abort, one malformed data aborts other independent results.

If we were giving feedback of write for each stream, one could easily rectify the error (if possible) as:

// I am assuming that the handler is returning a Publisher<Void> here, otherwise, this requires a subscribe here, which is another issue I will file.
connection.flatMap(req -> connection.write(calculateFactorial(req))
                                   .onErrorResumeNext(t -> {
                                       if (t instanceof EncoderException) {
                                           // corrected response.
                                           return connection.write(Observable.just(req));
                                       } else {
                                           return Observable.error(t);
                                       }
                                   }));

There are many more issues to consider in merging multiple writes, specifically the concurrency and the ordering.

Re: Concurrency

The assumption will be that the networking library supports concurrent writes (I do not know any mature library that does not). If not, then yes, we will have to introduce concurrency control to funnel the writes.

Re: Ordering

Ordering is totally governed by protocol and it is not required by all protocols. If we provide feedback on writes (return Publisher) we can easily provide ordering across writes via concatMap. Can you elaborate what issues we will get into w.r.t ordering?

I just don't want to create a publisher per chunk write

Totally, the publisher will be per stream. In the factorial example above, it is per request.

I don't see the need for write multiplexing at IO level since the writer is anyway a Subscriber that only accepts non concurrent onNext signals.

My usecase above provides a reason for multiplexing. A channel is sequential should not mean that I can not have multiple writers. Multiple writes are mainly because of the nature of processing of data as opposed to the implementation detail of concurrency handling on the channel.

Also if multiple writers for a single connection are allowed, how is Subscription#request meant to work then, are we splitting the demand, dedup

As @benjchristensen mentioned, this is the same usecase as a stream merge. The scenario is that there are multiple streams to write. Whether the streams are merged at the composition layer or at the channel, the backpressure semantics does not change. You will have to split the demand upstream as we can not predict which stream will give the next item on request.

jbrisbin commented 9 years ago

This is very, very useful information. Thank you for taking the time to lay these specifics out. It has helped our understanding of your goals considerably. @rstoyanchev is going to provide some specific feedback as a result of our discussion today on this and #22. I think we'll spend more time talking about #23 tomorrow.

rstoyanchev commented 9 years ago

Indeed the examples and use cases here and in #22 have helped us understand the motivation for writing with multiple Publisher's. It makes sense to have ReactiveIPC support multiple independent output streams (multiplexing) while managing concurrency and back-pressure across those streams rather than leaving that as a problem to be solved separately. It also makes sense to allow logical streams to be split up (batching) in order to provide a natural opportunity for flushing and for composing and sequencing of I/O operations at boundaries meaningful to the layers above (e.g. an Object serialized as a sequence of data chunks).

That said we have concerns still with the proposal. The main concern is that multiplexing and splitting a stream into batches are two completely different use cases but the API does not in any way recognize that. For example when multiplexing it's perfectly okay and it's expected to call write(Publisher) concurrently. When splitting a stream into batches it becomes a question whether concurrent write(Publisher) calls (that are part of the same logical stream) should even be legal. Considering that each write is done without a corresponding back-pressure request, to respect back-pressure, batches should be sent sequentially, meaning that you wait for one Publisher to be completed before sending another one. Otherwise how would you know if you're writing more than the implementation can handle?

Not only that but if the concurrent writing of batches is allowed, it becomes very hard for the implementation to split back-pressure requests fairly. Instead of splitting write capacity across several multiplexed logical streams, with concurrent batch writes there is no way to tell which logical stream is unfairly writing too much.

The second concern is that leaving the handling of concurrency to the transport layer is problematic. We should not assume that the underlying transport allows concurrent writes (e.g. Servlet HTTP response, Java standard WebSocket, ZeroMQ, Chronicle, Kafka?) which could lead to different implementations (some using a queue for serialization, others relying on the transport) and so it shouldn't be up to the transport layer to do this. If it is exposed as a feature of ReactiveIPC it should work the same and it should be customizable the same way. On the point of merging back-pressure, reactive streams does not have such concept, so it needs to be defined first and it should be possible to customize. A simple thought might be for each Publish to be given some (fixed) write capacity or a Publisher may get more if others aren't writing as much but then also how do you manage that as more concurrent write(Publisher) calls are made.

Unfortunately we don't have an alternative proposal yet. This simply summarizes the concerns we have.

jbrisbin commented 9 years ago

I think one of the things that was hanging me up is that the write proposed is less of a write as it is usually known in the TCP world (as an active, push-based operation) and more of a send much closer to say a ZeroMQ ZMsg.send(). We're just replacing a pre-allocated and fixed bounded wrapper (the message) with a dynamic bounded wrapper (the Publisher). A ZeroMQ message consists of a set of frames very much like a Publisher has onNext signals. It also is of a fixed length, which is analogous to an onComplete signal.

A ZeroMQ library would need to be able to express the standard patterns like REQ/REP. Expressed as Groovy this would look like:

Publisher<Header> headers
Publisher<Body> body

connection.request {
    Publishers.concat(headers, body)
}.subscribe { resp ->
    // handle response
}

In this case there are no write confirmations because the ZMQ implementation doesn't provide that information. There's also no flushing like in Netty, but the actual send() call (you don't send a ZMQ message you don't know the size of). But in both cases the boundaries of the "message" are clearly demarcated by the Subscriber.onComplete signal.

NiteshKant commented 9 years ago

For example when multiplexing it's perfectly okay and it's expected to call write(Publisher) concurrently. When splitting a stream into batches it becomes a question whether concurrent write(Publisher) calls (that are part of the same logical stream) should even be legal.

PR #19 does not suggest implementing batching via multiple writes. I am seeing batching here only for the purpose of flushing and the flush semantics were based on the flushSelector on a single stream.

Not only that but if the concurrent writing of batches is allowed, it becomes very hard for the implementation to split back-pressure requests fairly. Instead of splitting write capacity across several multiplexed logical streams, with concurrent batch writes there is no way to tell which logical stream is unfairly writing too much.

I will answer this ignoring batching, as it is not the usecase for multiple writes.

With multiple writes, the problem is really a stream merging problem and honoring backpressure. If we are merging n streams and the downstream subscriber requires 1 item, there is no way to predict which of the n streams will emit the next item. The only thing we can do is to request 1 item from each stream and whichever produces the item first will be sent to the downstream subscriber. This will result in buffering 1 item (in this case) per stream and hence a total of n-1 items. May be we can have some heuristics around a possible candidate to emit the next item, but it gets way to complex when the heuristics go wrong and we have to wait for some time before requesting another subscriber. IMO, it is not worth the complexity/overhead.

We should not assume that the underlying transport allows concurrent writes (e.g. Servlet HTTP response, Java standard WebSocket, ZeroMQ, Chronicle, Kafka?) which could lead to different implementations (some using a queue for serialization, others relying on the transport) and so it shouldn't be up to the transport layer to do this.

I did not quiet understand the issue with multiple transports making sure that concurrent writes are supported. In fact, I believe it is better done at the transport layer. We can create adapters (using common serialization technique) to adapt to transports that do not support concurrent writes. With us doing it at the ReactiveIPC core irrespective of what the underlying transport does, we would be introducing an unnecessary queuing point.

rstoyanchev commented 9 years ago

PR #19 does not suggest implementing batching via multiple writes. I am seeing batching here only for the purpose of flushing and the flush semantics were based on the flushSelector on a single stream.

Okay forget the term batching for a moment. Here is what I meant. In the simplest case there is one Publisher (e.g. read file and write it) or for multiplexing there are multiple Publishers (e.g. HTTP/2 push reading files and writing each with a separate Publisher). Each of those is a logical sequential stream as per RS spec.

22 points to something very different where we split a single logical stream into multiple Publisher's so we can flush and compose. However we are now entering an entirely different mode of operation on the connection where concurrent writes should probably be illegal and furthermore where writing again before the previous Publisher has completed should also be illegal.

In the HTTP/2 push example, I could choose to split a file into multiple Publisher's perhaps to let the browser re-prioritize which files are more important. Clearly I can't just call write in a loop:

Path fileToStream = Paths.get("stream.data");- 
long totalSize = Files.size(fileToStream);
File file = ... ;

for (long l = 0; l < totalSize; l += 1024L) {
    connection.write(new ChunkPublisher(file, l, l + 1024L));
}

To be clear we agree fully with the motivation for #21 and #22. It's more a question of what the API should look like. Perhaps something along the lines of providing a MultiplexWriter to the connection to make it clear that you have one or more concurrent multiplex streams, each of which can be split up further but is sequential and driven by demand:

public interface MultiplexWriter<W> extends Publisher<Publisher<W>> { 
    Publisher<Void> write(Publisher<W> data);
}

With multiple writes, the problem is really a stream merging problem and honoring backpressure ... IMO, it is not worth the complexity/overhead.

We disagree here. We see this is a clear point of variation to be exposed by the API for extension. If the merging of multiple write streams is a core feature of ReactiveIPC then the way it distributes back-pressure is a key part of that mechanism and a keep point of interest. Yes in the simplest case you can ask for 1 item of each Publisher but you could also take into account how much each Publisher has to write before making a decision along the lines of Jon's DemandCalculator. Or the HTTP/2 push example again, you might have specific clues available about how to prioritize the write streams.

I did not quiet understand the issue with multiple transports making sure that concurrent writes are supported. In fact, I believe it is better done at the transport layer. We can create adapters (using common serialization technique) to adapt to transports that do not support concurrent writes. With us doing it at the ReactiveIPC core irrespective of what the underlying transport does, we would be introducing an unnecessary queuing point.

Right, we don't want to impose a queuing mechanism on the Netty transport which is perfectly capable of handling concurrent writes. However we need to recognize that some transports will need such a mechanism and provide it so that it doesn't have to be re-implemented. Also since merging, buffering and back-pressure are very closely related, as much as possible we see it as a shared mechanism that simply lets transports like Netty do concurrent writes.

NiteshKant commented 9 years ago

22 points to something very different where we split a single logical stream into multiple Publisher's so we can flush and compose.

I think there is a misunderstanding about the intent of issue #22. Splitting a single logical stream into multiple Publishers wasn’t the intent or a proposal in that issue. The issue was instead discussing how to provide control of flush per Publisher, when there are multiple publishers. The write composition is essentially providing the result of a write as a Publisher so that higher order functions can be composed over top of the write result. e.g.: (from @benjchristensen’s comment)

// this writes A and if it succeeds then writes B
connection.write(publisherA).concatWith(write(publisherB))
// this writes A and B concurrently
connection.write(publisherA).mergeWith(connection.write(publisherB))

I could choose to split a file into multiple Publisher's perhaps to let the browser re-prioritize which files are more important.

I think it will be a bad idea to split a single file into multiple chunks. A single source of data should be a single Publisher so that backpressure composes well through the Publisher. If the same file is split into multiple HTTP/2 streams, how would stream prioritization apply to a file?

Perhaps something along the lines of providing a MultiplexWriter to the connection to make it clear that you have one or more concurrent multiplex streams, each of which can be split up further but is sequential and driven by demand

As I mentioned earlier, multiple writes support is primarily driven by disjoint processing of data as opposed to achieving parallelization for write. What I read in your comment here is basically a concat usecase (but is sequential and driven by demand) where in we have multiple writers but they are sequentially subscribed. If so, then it does not really solve the usecase of multiplexing writes. It is basically just giving write composition.

you could also take into account how much each Publisher has to write before making a decision

How would we know how much a Publisher has to write without requesting that many items from the Publisher? If we request and not write, we are buffering.

along the lines of Jon's DemandCalculator.

Perhaps the cause of confusion is that Jon’s proposal of a DemandCalculator is intending to provide a generic scheme for demand calculation. IMO, the demand is driven either by local limits (socket buffers or soft buffer (e.g.: write watermarks in netty)) or by the peer when the protocol supports flow control (like in HTTP/2). In both the cases, it will be inside a specific protocol implementation, so I do not quiet get the need of a generic abstraction for DemandCalculator.

I think we should discuss write backpressure in a separate issue, I will create one.

rstoyanchev commented 9 years ago

I think there is a misunderstanding about the intent of issue #22. Splitting a single logical stream into multiple Publishers wasn’t the intent or a proposal in that issue.

A good API should guide and shouldn't be open for misunderstanding. However I think it comes down to more than a question of interpretation.

The issue was instead discussing how to provide control of flush per Publisher, when there are multiple publishers. The write composition is essentially providing the result of a write as a Publisher so that higher order functions can be composed over top of the write result. e.g.: (from @benjchristensen’s comment)

The 3rd example under #22 shows two writes part of one logical stream. They happen to be sequential in this example but that may not be the case.

I think it will be a bad idea to split a single file into multiple chunks. A single source of data should be a single Publisher so that backpressure composes well through the Publisher.

Ideally perhaps but not for us to say. There may be a reason to write some from a file, then flush, then do something, then write some more. Perhaps the answer is to use write(Publisher, FlushSelector) with a single Publisher? That would let me flush at any point but it's harder to figure out when to flush and clearly I can not compose after a flush through the FlushSelector.

As I mentioned earlier, multiple writes support is primarily driven by disjoint processing of data as opposed to achieving parallelization for write.

We are not trying to achieve parallelization of writes. On the contrary it's the very problem we are trying to point out.

It boils down to this. For multiplexing write(publisherA) and write(publisherB), as shown at the top of this issue, two writes are used to provide two independent logical streams. For flushing and composing as shown in the 3rd example of #22 two writes are used to split a single logical stream up for the purpose of flushing and composing. The former can be concurrent and called at any time. The latter should never be concurrent and should never be called until the last write is complete. The former makes sense expressed as a single RS stream. The latter however we see as an RS stream where items are Publisher's. Hence the idea of the MultiplexWriter above that extends Publisher<Publisher<W>.

I do not quiet get the need of a generic abstraction for DemandCalculator.

I think the idea is that it provides hints how much a Publisher wants to write, so the name is probably a little misleading (it's more like hint of how much a Publisher wants to write) but the main point is that write back-pressure should be a point of variation. Thanks for creating #25.

jbrisbin commented 9 years ago

The idea behind DemandCalculator is to provide a Subscriber with additional information about what possible values it can use in the next Subscription.request() call. It doesn't have to take that information into account. But it's a way to make the request size flexible and configurable and informed by information other than what the Subscriber has access to (which isn't much).

rstoyanchev commented 9 years ago

Based on example under #22, I might want to send multiple protocol messages with flushing:

connection.write(protocolRequest)
    .concatWith(connection.getInput()
                    .concatMap(response -> {
                        if(response instanceof HANDSHAKE && response == ABC) {
                            connection.write(protocolMessage1);
                            connection.write(protocolMessage2);
                            ... 
                        } else if(response instanceof HANDSHAKE && response == DEF) {
                            ... 
                        } else {
                            ... 
                        }
                    })
    }))

Or I might have a server-side component that's simply writing a stream of protocol messages and wants to flush after each message. Clearly I can't call write(Publisher) because this isn't disjoint data. I could use write(protocolMessage, flushSelector) but that's where the API breaks down I think. I'm left with using a flushing mechanism I don't want to use (counting chunks of data). I should be able to use write(Publisher) only and use a FlushSelector only when I have some other flushing strategy in mind that fits the idea of counting chunks.

NiteshKant commented 9 years ago

Clearly I can't call write(Publisher) because this isn't disjoint data.

I did not follow this, which part of the example above, are you referring to?

connection.write(protocolRequest) or connection.write(protocolMessage1);?

I should be able to use write(Publisher) only and use a FlushSelector only when I have some other flushing strategy in mind that fits the idea of counting chunks.

Sure, isn't that the reason why we have write(Publisher) and write(Publisher, flushSelector) as two different methods?

clearly I can not compose after a flush through the FlushSelector.

You can think of flush as a side-effecting action. It side-effects a prior write by completing it (by flushing it on the socket). So, one would compose on write result and not flush action. This is the reason why it is not having a result. Flush is really just an optimization, if absent, then the socket is flushed onComplete of the Publisher If individual write action results are required then they would be separate writes that can be composed.

NiteshKant commented 9 years ago

@rstoyanchev I realize from the discussions here that issue #22 did not do justice to explaining what Example 3 in the code sample was trying to achieve. So, I have provided explanation in this comment

Specifically, I think your comments mixing flush with the sample there is conflating the usecase. We can possibly discuss flush semantics in a different issue if there lies confusions. Flush is an optimization and the only connection with multiplexed writes is the ability to give control for flushing per write Publisher. Let me know if I am reading your comments incorrectly.

For flushing and composing as shown in the 3rd example of #22 two writes are used to split a single logical stream up for the purpose of flushing and composing.

The intent was not to split a single logical stream into multiple writes. All the writes (protocol request, handshake and replies) in the sample are totally disjoint, they are wired together using write composition to follow the protocol. If the intent is not clear of the sample, we can discuss that first in that issue.

NiteshKant commented 9 years ago

@jbrisbin apologies for replying late to your comment.

write as it is usually known in the TCP world (as an active, push-based operation)

This is something that I have though about in RxNetty about the names. One argument is being pedantic on expressing that the operation is not eager and the other argument is that the fact that an operation returns a Publisher, it makes it lazy. I have come to a conclusion that people dealing with these abstractions should have a basic understanding that all operations in this library are lazy and pull based when they return a Publisher. If for any reason, they are not lazy (I went back and forth on this for close() and write() in RxNetty and in the next release I am making everything lazy), then the name can be explicit or the return type can suggest so. I am happy to discuss this further if you guys disagree.

In this case there are no write confirmations because the ZMQ implementation doesn't provide that information.

I have not at all played with ZMQ, but if this is the case, it makes me feel that the ZMQ abstractions are incorrect and not suitable to be used in this kind of a low level library that provides insights into all networking operations.

rstoyanchev commented 9 years ago

@NiteshKant the example is clear. I asked a what-if question that modified your example a little. I never claimed it was the exact use case you had in your mind.

Suppose I want to write objectA through objectZ with flushing after each. The write(Publisher) method has taught me that I can pass publisherA and it will flush after onComplete. That's simple and intuitive. However I can't just call write(publisherA) through write(publisherZ) can I? First I have to realize that's illegal. Then I have to use write(publisherAthruZ, FlushSelector) but my job of figuring out when to flush just became so much harder in trying to determine when objectA ends and when objectB starts by looking at the chunks that were written.

FlushSelector should be required only when that's the kind of strategy that fits the way you think about flushing (i.e. by counting written chunks or bytes). Overall I should be able to stick to one intuitive flushing model. I shouldn't be forced to alternate between write(Publisher) and write(Publisher, FlushSelector) within a few lines of code that do very similar things and where using FlushSelector is so much more work.

NiteshKant commented 9 years ago

However I can't just call write(publisherA) through write(publisherZ) can I?

Depends, whether ordering is important or not. If ordering is important, you can do:

write(A).concatWith(B).... concatWith(Z)

Of course, this means that you create a Publisher per object which means higher object allocations.

First I have to realize that's illegal.

I don't think we can say that is illegal. It totally depends on the protocol you are writing. If ordering/sequential writes are desired, then sure it is illegal but the TcpConnection does not make that decision, a protocol writer does. TcpConnection should be able to support ordered as well as unordered semantics.

but my job of figuring out when to flush just became so much harder in trying to determine when objectA ends and when objectB starts

Guess that is the disconnect. objectA ends when objectA is emitted. Flush selector does not work on bytes, it work on the item written (in this case objectA - objectZ)

This is the contract for the FlushSelector:

/**
     * A function that is used for determining when a flush has to be invoked on the underlying channel.
     *
     * @param <W> Type of items emitted by the stream using this selector.
     */
    interface FlushSelector<W> {

        /**
         * Selects whether flush should be invoked on the channel.
         *
         * @param count The index of this item. Starts with 1.
         * @param lastWrittenItem Item which was last written before calling this selector.
         *
         * @return {@code true} if flush should be invoked.
         */
        boolean select(long count, W lastWrittenItem);

    }

Based on this, the FlushSelector for flushing on each item would be like what I have in this example as:

RxTcpServer.create(transport)
                   .intercept(log())
                   .startAndAwait(connection -> connection.write(connection.map(bb -> {
                       String msg = "Hello " + bb.toString(defaultCharset());
                       return Unpooled.buffer().writeBytes(msg.getBytes());
                   }), (count, item) -> true));

Notice that the flush selector (lambda: (count, item) -> true)) always returns true.

NiteshKant commented 9 years ago

May be we should discuss flush semantics in the issue #8 that we were discussing it in some time back.

rstoyanchev commented 9 years ago

Depends, whether ordering is important or not. If ordering is important, you can do: write(A).concatWith(B).... concatWith(Z) Of course, this means that you create a Publisher per object which means higher object allocations.

Yep there is a clear disconnect right here! When I say publisherA I don't mean a Publisher emitting ObjectA. My expectation is that the ReactiveIPC layer is doing the minimal possible and its input and output works with byte buffers and object conversion is left as a separate concern. So when I say PublisherA I assume a Publisher emitting all the data chunks for ObjectA.

I realize now your intent is to pass objects down to the Netty channel and rely on codecs? That means the application is coupled to the transport. I see a conversion mechanism independent of the transport as more useful.

NiteshKant commented 9 years ago

My expectation is that the ReactiveIPC layer is doing the minimal possible and its input and it works with byte buffers and object conversion is left as a separate concern.

Let us discuss this first. I have created the issue #27 for it.

NiteshKant commented 9 years ago

I think the discussions digressed in this issue but we seem to be agreeing in general that we should support multiplexing writes on a channel. I am closing this issue, if there are any disagreements, please re-open.