reactive-ipc / reactive-ipc-jvm

Reactive IPC for the JVM
Apache License 2.0
55 stars 13 forks source link

Server Interception #23

Open benjchristensen opened 9 years ago

benjchristensen commented 9 years ago

As a concrete requirement for server-as-a-function (https://github.com/reactive-ipc/reactive-ipc-jvm/issues/15), servers need the ability to intercept, short-circuit and compose.

Clients and servers have different needs that we (Nitesh and I) have found can not be achieved with unidirectional stream pipelines (such as Reactive Streams and Rx). For example, a "cache filter" that can work on both input and output requires a different signature than RS offers, since RS can only represent either Input or Output, but not both. These are not use cases that are acting on a single input/output pipeline or impact backpressure. Those types of cases most definitely should just be processors/operators in an RS pipeline. 

I would very much prefer that all interception can be achieved with RS, but over the past year we have learned it can't be done. This is why we are adding function composition over client/servers in RxNetty 0.5 to enable functionality currently missing that RxJava Observables alone have not been able to achieve. 

Additionally, a map function (as added here https://github.com/reactive-ipc/reactive-ipc-jvm/pull/20#issuecomment-87073657) is not sufficient for some use cases such as responding from a cache and short-circuiting the response. For example:

        Netty4TcpServer.<ByteBuf, ByteBuf>create(0)
                       .intercept(log())
                       .<ByteBuf, ByteBuf>intercept(handler -> {
                           final AtomicLong connCounter = new AtomicLong();
                           return input -> {
                               if (connCounter.incrementAndGet() % 2 == 0) {
                                   logger.error("Short-circuiting further processing.");
                                   return input.write(Publishers.just(Unpooled.buffer().writeBytes("Go Away!!! \n".getBytes())));
                               }
                               return handler.handle(input);
                           };
                       })
                       .start(connection ->
                                      toPublisher(toObservable(connection)
                                                          .flatMap(byteBuf -> {
                                                              String msg = "Hello " + byteBuf.toString(defaultCharset());
                                                              ByteBuf toWrite = Unpooled.buffer().writeBytes(msg.getBytes());
                                                              return toObservable(connection.write(Publishers.just(toWrite)));
                                                          })));

With just map interception the request still flows through to the final handler. On a cache use case it must short-circuit and return a new "server" that just returns from cache.

Another example is a performance logger which is capturing response time. It must have access to both input and output so it starts the timer on input, completes on output and emits an event (or modifies the response, etc). Again this is something that a unidirectional map function can not do as it needs access to the input and output.

The function composition approach as proposed in https://github.com/reactive-ipc/reactive-ipc-jvm/issues/15 and similarly implemented in Finagle and Retrofit is an attempt at addressing these use cases.

smaldini commented 9 years ago

Can't intercept be what Rx calls replay in the cache use case ? Can't the performance logger be 2 observers one upstream and one before write? If you pipe your input directly in a write(), it looks like we have a kind of echo server already addressing both input and output. Adding Processors in the middle will effectively behave like intercept or I miss something. I'd rather consider something like a connection.setup() or events() callback to observe low-level lifecycle events if for instance you want to add handlers in your netty pipeline or monitor open/close.

benjchristensen commented 9 years ago

Can't intercept be what Rx calls replay in the cache use case ?

A TcpConnection is made out of 1 Publisher for the input, and then one or more Publishers for output.

Putting a replay on the input Publisher does not result in a cached response to the output, it would cause the input values to be repeated to the subscriber inside the handler.

Can't the performance logger be 2 observers one upstream and one before write?

Only if the developers closes over state of both the input Publisher and output Publisher(s) with side-effecting operators on both and shared state between them. This is generally not possible for infrastructure code to do since they don't have access to the Publishers that other developers submit to the write method in the handler code. This is equivalent to saying Servlet Filters can't be used and all Servlet implementations must embed the performance tracing code directly.

Interceptors should be orthogonal to the various handlers an application may implement. The performance logger should be composed into the server without affecting or relying upon developer handler code. At a minimum the interceptor needs to decorate both the input Publisher and output Publisher.

If you write you pipe your input directly in a write(), it looks like we have a kind of echo server already ...

Yes it does achieve an echo server, but what is that demonstrating in relation to intercepting server input/output and achieving the use cases mentioned above? An echo server does not represent the types of use cases we are trying to solve that involve multiplexed writing, conditional responses from caches, trace logging of input/output latency, encryption/compression/authentication/authorization as part of the server (not handler) and other such things.

The behaviors of these interceptors should be similar to Servlet Filters which expose the input/output and filter chaining and support injecting and layering of functionality on a server – separate from the actual servlets (handlers) themselves.

I'd rather ...

What about the proposed function composition approach is unwanted? What use cases does it prevent or not achieve? It allows composing behavior into a server using similar patterns that make the monadic Reactive Streams so powerful and achieves the decoupling and flexibility of Servlet Filters.

smaldini commented 9 years ago

I was thinking outside write multiplexing use case. You need a federated method call to achieve this as soon as you can have multiple writer. Otherwise, unlike Servlets, the Reactive Stream input/write output is already a filter chain, in fact there wouldn't be any servlet but only filters in a RS approach. Point taken that it doesn't work with #21 tho.

rstoyanchev commented 9 years ago

Based on https://github.com/reactive-ipc/reactive-ipc-jvm/issues/27#issuecomment-89485436 I modified the TcpServerSample from https://github.com/reactive-ipc/reactive-ipc-jvm/pull/19 to use a TcpHandler that reads and writes String instead of ByteBuf. You can see the modifications https://github.com/rstoyanchev/reactive-ipc-jvm/commit/ae1f0b1f3b5482e7da16bfeb7b2a1e4c417cfc35. When a connection is opened I get this:

java.lang.ClassCastException: io.netty.buffer.SimpleLeakAwareByteBuf cannot be cast to java.lang.String
at io.ripc.transport.netty4.tcp.TcpServerSample$StringToByteBufSubscriber.onNext(TcpServerSample.java:131)
at io.ripc.transport.netty4.tcp.ChannelToConnectionBridge.channelRead(ChannelToConnectionBridge.java:84)

As far as I can see each time intercept is called, a new server (w/ potentially new I/O generic types) is returned:

TcpServer<A, A> --> interceptAtoB<A,A,B,B> -->  intercepBtoC<B,B,C,C> --> ...

The above gives a TcpServer<C,C> which can be started with TcpHandler<C,C>. Now when an Object is read from the Netty channel, it is cast to (C) and this is the invocation order:

Netty channel --> ((C) Object) --> interceptBtoCHandler --> inteceptAtoBHandler --> tcpHandler<C,C>

In my case the type C is String but what's coming from Netty is a ByteBuf, hence the class cast. Unless I'm missing something I don't see how to create a basic example where a TcpHandler reads and writes something converted through a TcpInterceptor?

On the read side, I'd expect the first handler to take whatever comes from Netty (e.g. ByteBuf) and then as it passes through the handler chain, the ByteBuf to be converted to whatever the handler expects. Instead the server casts immediately to whatever the handler expects.

rstoyanchev commented 9 years ago

Further thoughts following my last comment. There are issues with the implementation. The handlers are decorated so the order of declaration (from left to right) is reverse to the order of invocation (from right to left) except for the handler passed to the start() method which is still invoked at the end. The order of declaration should be consistent with the order of invocation. More importantly it doesn’t work in terms of generics (so I’ll assume it’s a bug). For example:

TcpServer<A,A> → TcpServer<B,B> → TcpServer<C,C> → TcpHandler<C,C>

the resulting TcpServer<C,C> passes its generic types down to the transport which assumes (incorrectly) that whatever comes from the Netty channel should match the generic type “C” and casts down.

I’ve experimented with a simple chain of handlers where both the order of declaration and the order of invocation is from left to right. Note that in doing so I left out the part where the server gets re-created. Instead there is one server and then a chain of interceptors where an interceptor is essentially a handler that delegates to another and optionally transforms the connection:

https://github.com/rstoyanchev/reactive-ipc-jvm/tree/tcp-poc

I think that’s simple, intuitive, and functionally equivalent. Re-creation of the TcpServer could be added still but is it really needed? I like the idea of TcpServer as the transport layer boundary dealing with whatever types go in and out of Netty and then a handler chain that can transform the connection. It’s very much like the Finagle Services & Filters.