ReactiveX / RxNetty

Reactive Extension (Rx) Adaptor for Netty
Apache License 2.0
1.38k stars 255 forks source link

Interceptor API proposal #337

Closed elandau closed 8 years ago

elandau commented 9 years ago

The following proposal addresses the need for an interceptor/filter model that can allow requests and responses to be replaced, modified or hooked into. Use cases of interceptors include logging, caching, request augmentation (such as adding HTTP headers) and Hystrix. This implementation assumes that request execution can be modeled as a Func1<Request, Observable<Response>>. Interceptors are 'configured' or applied as an ordered list.

Request execution contract,

Func1<Request, Observable<Response>>

Each specific client implementation will have a single request execution class for the client's specific API. For example,

public class RequestExecution<Request, Response> implements Func1<Response, Observable<Response>> {
    private final Client client;

    public Operation(Client client) {
        this.client = client;
    }

    public Observable<Response> call(Request request) {
        return client.submit(request);
    }
}

The Interceptor contract extends the basic execution contract by making it possible to iterate through a list of Interceptors to construct the final request Observable stream.

Func2<Request, Func1<Request, Observable<Response>>

Interceptors are chained together as follows,

public class Executor<Request, Response> implements Func1<Request, Observable<Response>> {

    private final List<Interceptor<Request, Response>> interceptors;
    private final Func1<Request, Observable<Response>> operation;

    private static class ChainedInterceptor<Request, Response> implements Func1<Request, Observable<Response>> {
        private final Iterator<Interceptor<Request, Response>> iter;
        private final Interceptor<Request, Response> interceptor;
        private final Func1<Request, Observable<Response>> operation;

        public ChainedInterceptor(Iterator<Interceptor<Request, Response>> iter, Func1<Request, Observable<Response>> operation) {
            this.iter = iter;
            this.interceptor = iter.next();
            this.operation = operation;
        }

        @Override
        public Observable<Response> call(Request request) {
            if (iter.hasNext()) {
                return interceptor.call(request, new ChainedInterceptor<Request, Response>(iter, operation));
            }
            else {
                return interceptor.call(request, operation);
            }
        }
    }

    public Executor(List<Interceptor<Request, Response>> interceptors, Func1<Request, Observable<Response>> operation) {
        this.interceptors = interceptors;
        this.operation = operation;
    }

    @Override
    public Observable<Response> call(Request request) {
        if (interceptors.isEmpty()) {
            return operation.call(request);
        }
        else {
            return new ChainedInterceptor<Request, Response>(interceptors.iterator(), operation).call(request);
        }
    }
}

Example of a logging interceptor that doesn't modify the original request but does augment the returned Observable to log every stage during request execution.

public class LoggingInterceptor<Request, Response> implements Interceptor<Request, Response> {
    public static <Request, Response> LoggingInterceptor<Request, Response> create() {
        return new LoggingInterceptor<Request, Response>();
    }

    @Override
    public Observable<Response> call(final Request request, Func1<Request, Observable<Response>> next) {
        return next.call(request)
            .doOnSubscribe(RxUtil.info("begin : " + request))
            .doOnNext(RxUtil.info("next : " + request))
            .doOnCompleted(RxUtil.info("done : " + request))
            .doOnError(RxUtil.error("failed : " + request))
            ;
    }

    @Override
    public String toString() {
        return "LoggingInterceptor []";
    }
}

Example of a failure injecting interceptor that may optionally stop processing the interceptor chain, transform the response and introduce delays into request execution,

public class FailureInjectingInterceptor<Request, Response> implements Interceptor<Request, Response> {

    @Override
    public Observable<Response> call(Request request, Func1<Request, Observable<Response>> next) {
        Observable<Response> o;

        Throwable error = getError();
        if (error != null) {
            o = Observable.error(error);
        }
        else {
            o = next.call(request);
        }

        Func1<Response, Response> transformer = getResponseTransformer();
        if (transformer != null) {
            o = o.map(transformer);
        }

        long delay = getDelay();
        if (delay > 0) {
            o = o.delaySubscription(delay, getDelayUnits());
        }

        return o;
    }

    /**
     * @return Function to modify a successful response
     */
    private Func1<Response, Response> getResponseTransformer() {
        return null;
    }

    /**
     * @return Get subscription delay to introduce.  Note that this is an absolute delay to however
     * long the operation may take.
     */
    protected long getDelay() {
        return 0;
    }

    /**
     * @return Units to use for the delay.  Default is MILLISECONDS
     */
    protected TimeUnit getDelayUnits() {
        return TimeUnit.MILLISECONDS;
    }

    /**
     * @return Get error to inject
     */
    protected Throwable getError() {
        return null;
    }
}

Base contract for interceptors that just modify the request

public abstract class TransformingInterceptor<Request, Response> implements Interceptor<Request, Response> {

    @Override
    public Observable<Response> call(Request t, Func1<Request, Observable<Response>> next) {
        return next.call(transform(t));
    }

    protected abstract Request transform(Request request);
}

For example, this interceptor adds an HTTP header to every outgoing request

public class RequestHeaderAddingInterceptor<HttpRequest, Observable<HttpResponse>> {
    protected HttpRequest transform(HttpRequest request) {
        return request.withHeader("X_CustomHeader", "Value1");
    }
}
NiteshKant commented 9 years ago

Thanks @elandau !

Adding context from the 0.5.0 issue #303

This implementation assumes that request execution can be modeled as a Func1<Request, Observable>

Kind of yes, this is the API:

HttpClientRequest<I, O> createGet(String uri) where HttpClientRequest is an extension to Observable, such as

public abstract class HttpClientRequest<I, O> extends Observable<HttpClientResponse<O>>

So, I can think of these interceptors as part of the HttpClientRequest, eg:

RxNetty.createHttpClient()
             .createGet("http://localhost:8999/hello")
             .intercept((request, next) -> {
                  return next.call(request.addHeader("foo", "bar"));
             })
            .intercept((request, next) -> {
                 return Observable.error(new IllegalArgumentException("I dont like your request!));
            })
           .intercept((request, next) -> {
                 return next.call(request)
                                  .addHeader("foo", "bar"); // This API is not yet there i.e. to change response
            })

In this way chaining is implicit (order in which you add the interceptors) and hence does not require a different abstraction.

futurely commented 9 years ago

The most popular RESTful API specification Swagger implemented REST over WebSocket using the Atmosphere interceptor. Considering that Atomosphere was one of the fianlists of the Most innovative Java Technology of the JAX Innovation Awards 2014, you may want to take a look at the interceptors of Atmosphere and Nettosphere.

GrapeBaBa commented 8 years ago

Hi Kant, If we use TcpClient to send a custom protocol request, maybe also need interceptor. If you add to HttpClientRequest, how to reuse it? Also i want to know if native RxJava Observal abstraction have a intercept method?

NiteshKant commented 8 years ago

Interceptors are now available in the current snapshot and the examples can be found in rxnetty-examples module both for TCP & HTTP.