reactive-ipc / reactive-ipc-jvm

Reactive IPC for the JVM
Apache License 2.0
55 stars 13 forks source link

Additional considerations around the first PR iteration #20

Closed jbrisbin closed 9 years ago

jbrisbin commented 9 years ago

We've spent quite a few hours in discussion of every detail of this PR so we propose a call to discuss how we feel we've taken concepts introduced in the original PR (like the FlushSelector) and expanded them to include additional functionality in a consistent way.

Another point of discussion is the behavior of the write functionality. We spent over two hours on one call just discussing the method signature. We feel like our approach here takes what we liked from the work so far and expands it to make it a little easier to work with when composing into streams of data.

Particular attention has been paid to how we can avoid GC overhead by keeping any wrapping or delegate objects to zero if possible. To that end, the concept of interception has been removed and the generics eliminated from the connection signature. It was causing no end of frustration and in truth is not really needed except when using the API directly, which is likely not ever going to be the case. We couldn't find good reasons to go through the pain of maintaining the chain of generics when transformations are applied to the connection so we instead will rely on a much simpler mechanism to perform these functions: Reactive Streams components that are wired into the pipeline. This still allows RS-only components to do things like length-field decoding, line-ending decoding, and other things that would be common in any TCP environment but only use the Reactive Streams APIs rather than an additional interception API that only exists in the RIPC kernel.

We decided in this iteration to not pursue end-user composition because the signatures and interaction of the connection write was so different than in previous iterations that it didn't make a lot of sense to spin wheels on that when some of the basic questions haven't been satisfactorily answered. That said, this PR attempts to make the reading and writing so natural to Reactive Streams components as to make it a simple matter of plugging directly into an RS pipeline in a plug-and-play fashion, with only advanced functionality needing additional listeners.

Backpressure has been implemented on both the reading and writing side. On the writing side, dynamic backpressure is implemented so you can change the rate at which you pull items from your write Publisher based on the rate at which you are successfully writing items out. This logic is encapsulated in the DemandCalculator which is implemented in a very simplistic way in the example SingletonPublisher in core.

jbrisbin commented 9 years ago

@benjchristensen @NiteshKant @rstoyanchev @smaldini I tried to lay out some of the rationale for taking the approach we did in expanding on Nitesh's PR and in some cases ignoring some things just for the time being so we can focus on one problem at a time.

Since some of this approach looks considerably different than the previous PR, it's not going to be be readily apparent how we took what was suggested there and tried to keep, expand, or otherwise refine those concepts to include as many use cases as possible. I suggest a call to discuss this since we spent many (many) hours going over every line of code in Nitesh's PR as well as our own experiments. We have cussed and discussed some of these things to a considerable extent and I think it would be good to present some distillation of that discussion on a chat rather than through GH comments.

rstoyanchev commented 9 years ago

As Jon mentioned the key focus area has been around TcpConnection, taking the proposal from Nitesh on how to write, flush, and get write confirmations, and evolving the concepts (hopefully). There are many other aspects of the proposal by Nitesh that we have initial thoughts on but have not discussed sufficiently. It's worth talking through the TcpConnection before moving on. Baby steps.

rstoyanchev commented 9 years ago

The currently proposed Connection has one write(Publisher) method that can be called many times where flush occurs on completion, and a second write method also taking a FlushSelector. There are good concepts there but lots of question marks too.

For example using a single Publisher is a natural starting point to think about writing. I could use the first write method but then I don't get useful write confirmations nor useful options to flush. I'd probably use the second method then with the FlushSelector but I still get no useful write confirmations. This is not to say that write confirmations are the ultimate issue but only that if the API provides them they should make sense for the common case, which is one write at at time (vs batching).

Also calling write multiple times with a Publisher makes working with backpressure more difficult. I think I'd have to go ahead and write first, then wait until that's completed, then I know can write again. Maybe I'm wrong but at least it's clear that it's not as simple as request(1) -> onNext(T).

It would be very clear to only have a single Publisher representing the write side (and that would also mirror the read side). Hence what we are proposing.

As far as write confirmations and flushing are concerned, eventually we arrived at the idea that they are quite closely related. How do we even provide write notifications to begin with? At first we thought the transport implementation should provide some sort of Publisher mechanism that provides each item after it has been written as a way of telling you that it has been written but then we realized what we are talking about is a notification mechanism, not necessarily a fit for a reactive streams Publisher with backpressure. From there if you look at the FlushSelector it's pretty much that. It tells you that something was written and how many items have been written. So rather than focusing on flushing alone, we see it more generally as part of an IO layer notification mechanism where one might listen to write completions and flush as part of that.

So we are proposing an event listener mechanism on the connection. You can listen to "write completion" events as one example and for each written item you can return a boolean flag much like the FlushSelector to indicate flushing should occur. The exact details of that interface can be discussed. We already have some ideas on improve it and there's lots that can be done with that which is great and there are many other kinds of useful events like changes in readability, writeability, transport errors, etc that could be exposed. We just wanted to communicate the general concept first before going further with it.

A more general comment on flushing is it's something that components further up the composition chain might be interested in but it makes no sense to expose any kind of "flush" operation at that level, nor is it feasible to do that or to send flush signals through the data pipeline, wrap data in an envelope, etc. None of these make sense. The FlushSelector we felt was a good start since it keeps the concern at the I/O layer. By turning it into a listener mechanism (vs a write method argument), and more generalized for that, it becomes more accessible.

benjchristensen commented 9 years ago

concept of interception has been removed

That is a critical aspect of the proposed design to allow extension. I don't see how this proposal addresses the extension use cases such as logging, caching, authentication, compression, etc in a compositional manner.

benjchristensen commented 9 years ago

On first pass of this code I'm not a fan of the EventListener and Handler aspects of this proposal, nor is it clear why that approach was needed as opposed to sticking to the RS solution.

benjchristensen commented 9 years ago

I'd like to see this proposal include examples of how it is used and how the adapter layers look.

For example, the other PR has this: https://github.com/reactive-ipc/reactive-ipc-jvm/pull/19/files#diff-6327a044d2d50969539215f133a15744R25

benjchristensen commented 9 years ago

Another thought while trying to understanding this ... is the use of EventListener intended to always be hidden behind an adapter?

Can you provide an example of an adapter like we did in #19 to see how you're thinking it would be layered?

rstoyanchev commented 9 years ago

Take the write method with the FlushSelector proposed by Nitesh. Suppose you are only allowed to call it once with a single Publisher to write. The contract could then change to a writer(Publisher) method plus another method to register a FlushSelector. If you look at the FlushSelector more closely you can see it provides the last written item. So it's not just flushing. It notifies you that something was written, which is convenient since we have been talking how write completion notifications can be exposed? Returning Publisher<Void> from a write method that also takes also takes a Publisher is inadequate. Instead you could simply observe notifications of items that have been written. Hence the realization that FlushSelector could serve both purposes and the resulting WriteCompleteListener which by the way is identical to the FlushSelector if you compare the two.

So naming aside it's almost identical to the original proposal but focusing on a single Publisher for writing plus the ability to track every write when it is completed plus the ability to flush using any strategy equivalent to the FlushSelector.

rstoyanchev commented 9 years ago

is the use of EventListener intended to always be hidden behind an adapter?

It has nothing to do with the adapter, just like the FlushSelector from the original proposal it serves the same purpose. However note that the FlushSelector has to be provided at the time of writing and hence only whatever component is at the boundary of the reactive-ipc layer (some operator from a composition library perhaps?) can supply it. By contrast the listener mechanism is exposed on the Connection and hence it's much more accessible to anyone interested.

jbrisbin commented 9 years ago

On first pass of this code I'm not a fan of the EventListener and Handler aspects of this proposal, nor is it clear why that approach was needed as opposed to sticking to the RS solution.

To summarize our discussion around this: not everything needs to be hit with the RS hammer. Case in point: write notifications. If you provide a Publisher which produces write notifications you get a Subscription in your Subscriber. Besides the allocation of the Subscription and whatever resources are required by it, what does .request(N) mean in that case? Can you really apply backpressure to write complete notifications? The only way to do so is to push back on the writer Publisher and tell it "I can't process more write complete notifications so don't send me any more". In the transport layer, if you can't send write complete notifications then that implies you can't create them, which implies you can't do any more writing. That would be okay except the writer Publisher has its own Subscriber (provided by the transport) and Subscription through which demand is calculated. There's no direct connection from the write completion "stream" to the writing "stream". If you can't use .request(N) in a meaningful way, then it's possible that backpressure doesn't apply in that situation. If backpressure doesn't apply, then we probably shouldn't be using the RS API which can be extremely expensive in terms of GC and object allocation if it's overused.

There are events happening to the connection that are orthogonal to reading and writing (core concerns of Reactive Streams in this kernel). IMO those events should be processed by simple callbacks rather than shoehorned into the RS API because it's cheaper and those events don't have a flow capable of reacting to backpressure. e.g. you can't apply backpressure to a connection close notification.

EDIT: Stephane and I have been talking about this issue and I think there is a case to be made that a Processor component could be used for notifications that connects backpressure from the end of the downstream, which would in this case be write notifications, to the upstream, which would be the write Subscription that's requesting items from the Publisher. If there exists a Subscriber of write complete events, for example, then the Subscription talking to the write Publisher would actually wait to do any requests until it heard from the downstream, which would be the Subscription in the write complete Subscriber. We have a use case for this exact thing in another area in Reactor and we're going to try and implement this pattern and see where it leads. Whether it's appropriate for what we're doing here in RIPC is another discussion.

jbrisbin commented 9 years ago

I'd like to see this proposal include examples of how it is used and how the adapter layers look.

I fully intend to provide some Reactor code that uses these abstractions to show an end-user experience (hopefully today). But we've spent so much time on getting this put together I simply didn't have time to put that in and I wanted to get something put out there at this level because I knew there would be need for discussion of this first.

jbrisbin commented 9 years ago

That is a critical aspect of the proposed design to allow extension. I don't see how this proposal addresses the extension use cases such as logging, caching, authentication, compression, etc in a compositional manner.

I'm not saying there's no need for the concept of interception at all, I'm simply pointing out that as we have discussed it, I think we're missing the forest for the trees.

We already have a very competent extension mechanism in the form of the Reactive Streams pipeline. It is the interception mechanism. If you want to compose additional functionality into the read side like authentication, then you simply use an AuthenticationProcessor which you subscribe to the Publisher<?> returned from .reader(). You then pass the Processor on up the chain as if it had come from the connection.

Providing any mechanism to inject itself into the Reactive Streams pipeline that doesn't follow the spec is problematic in any number of ways; it doesn't propagate backpressure, it doesn't respect the specification, etc... After removing the generics signatures and being freed from the generics hell that Java puts you in just to do simple things, it's no longer necessary to provide some way to translate <R,W> into <NEWR,NEWW>.

Not having to do that wrapping results in a more compressed stacktrace as well, which is sometimes ridiculously difficult to deal with when debugging through many layers of anonymous inner classes.

jbrisbin commented 9 years ago

@benjchristensen FWIW @rstoyanchev @smaldini and I had some more discussion around interception and as I'm creating the Reactor components for this, I can see use cases for turning an RIPC TcpConnection into something else rather than dealing with a TcpConnection directly.

This just to point out that we didn't leave interception out because we don't think it has a place but that we were attacking one issue at a time (just like the concept of an abstract Server). I have a commit to this PR coming that will show Reactor code using these underlying APIs and I can definitely see a use for some kind of transformation at the server level.

jbrisbin commented 9 years ago

@benjchristensen Here's some example code I just added that uses Reactor to handle data (in and out) and do interception in the Stream (the .log()). We're cheating here, of course, because every Reactor Action is a Reactive Streams Processor so we don't need any additional code to plug this in. But imagine that we're just use the Processor type directly rather than a Reactor Stream and you get the same effect.

rstoyanchev commented 9 years ago

BTW the generics in the current intercept proposal don't work. The TcpServer has the same parameters as the start method. So even if intercept goes from ByteBuf->String for example the start method is red. Try it.

jbrisbin commented 9 years ago

FWIW I introduced an Interceptor and added it to the newly-created TcpServer interface to allow you to change the type of connection being used and thus avoid a delegate situation where you have to use a Handler<TcpConnection> but translate that to something else. This interception allows you to change the connection from the RIPC TcpConnection to something arbitrary that makes sense for the composition library. In this case I'm using my own ReactorTcpConnection type.

This is in line with #15 and actually makes the code more succinct.

Here's the updated example using Reactor of an echo server with logging "interceptors" (code name for simply putting an RS Processor between the .reader() and the user) and using the new connection interceptor.

jbrisbin commented 9 years ago

I tried to add an example implementation of RxTcpServer today but I'm not familiar enough with Rx to do that to my satisfaction. It seemed I was being very verbose and things just weren't working like I expected so I gave up on that and figured I'd leave that part to the experts.

I added another example of using the EventListener approach to make very clean extensions of the transport by adding a ChannelActiveListener that can configure the Netty pipeline. I used this in the Reactor implementation to add a Reactor Buffer channel handler that converts the Netty ByteBuf to a Reactor Buffer which allows us to use our built-in Codec support.

One of the things I'm finding I like about this approach so far is that everything has a very clean and clear extension point without introducing unnecessary abstractions at the protocol level. It also meant all I had to change on my test code was the generic type (from ByteBuf to String) and pass the appropriate Codec.

benjchristensen commented 9 years ago

Instead of prolonging this discussion with further back-and-forth here are 3 use cases that seem to not be supported by this proposal:

These 3 are use cases of #19 that seem to not be accounted for in this counter proposal. I'd like your help in either showing me how they are accommodated, or in disproving our perception of them as requirements.

By specifying them in separate issues we can discuss each independently and independent of any given PR.

NiteshKant commented 9 years ago

I am closing this PR in favor of #19 which has been merged.