rsocket / rsocket-rpc-java

Standard RSocket RPC Java Implementation
http://rsocket.io/
Apache License 2.0
172 stars 41 forks source link

Make metrics pluggable #26

Open OlegDokuka opened 5 years ago

OlegDokuka commented 5 years ago

User Story

As an RSocket-RPC-Java user, I want to have metrics pluggable

Actual Behavior

In the current implementation, metrics are built-in, so it is hardly manageable to eliminate it from the generated code.

Expected Behavior

The current implementation provides proper abstraction and code generation that does not require a user to live with opentracing and micrometer dependencies as an built-in code. In turn, it would be much easier to have an addon to current plugin that can generate adapters for metrics and tracing that could be plugged in as in the following sample:

new TracingAwareMetricsSnapshotHandlerServerAdapter(
    new MeterAwareMetricsSnapshotHandlerServerAdapter(
        new MetricsSnapshotHandlerServer(...),
        meterRegistry
    )
)

in that case server/client, impl will be metrics-free code which does the same and adapters code will look like the following (example of generate server adapter):

public class MeterAwareGreeter extends AbstractRSocketService {

    private final AbstractRSocketService delegate;

    private final Function<? super Publisher<Void>, ? extends Publisher<Void>> streamGreet6MeterFunction;
    private final Function<? super Publisher<Payload>, ? extends Publisher<Payload>> streamGreet1MeterFunction;
    private final Function<? super Publisher<Payload>, ? extends Publisher<Payload>> streamGreet4MeterFunction;
    private final Function<? super Publisher<Payload>, ? extends Publisher<Payload>> streamGreet2MeterFunction;
    private final Function<? super Publisher<Payload>, ? extends Publisher<Payload>> streamGreet3MeterFunction;
    private final Function<? super Publisher<Payload>, ? extends Publisher<Payload>> streamGreet5MeterFunction;

    public MeterAwareGreeter(AbstractRSocketService delegate, MeterRegistry registry) {
        this.delegate = delegate;
        this.streamGreet6MeterFunction = Metrics.timed(registry, "rsocket.server", "service", Greeter.SERVICE, "method", Greeter.METHOD_STREAM_GREET6);
        this.streamGreet1MeterFunction = Metrics.timed(registry, "rsocket.server", "service", Greeter.SERVICE, "method", Greeter.METHOD_STREAM_GREET1);
        this.streamGreet4MeterFunction = Metrics.timed(registry, "rsocket.server", "service", Greeter.SERVICE, "method", Greeter.METHOD_STREAM_GREET4);
        this.streamGreet2MeterFunction = Metrics.timed(registry, "rsocket.server", "service", Greeter.SERVICE, "method", Greeter.METHOD_STREAM_GREET2);
        this.streamGreet3MeterFunction = Metrics.timed(registry, "rsocket.server", "service", Greeter.SERVICE, "method", Greeter.METHOD_STREAM_GREET3);
        this.streamGreet5MeterFunction = Metrics.timed(registry, "rsocket.server", "service", Greeter.SERVICE, "method", Greeter.METHOD_STREAM_GREET5);
    }

    @Override
    public Class<?> getServiceClass() {
        return delegate.getServiceClass();
    }

    @Override
    public String getService() {
        return delegate.getService();
    }

    @Override
    public Mono<Void> fireAndForget(Payload payload) {
        String methodName;

        try {
            ByteBuf metadata = payload.sliceMetadata();
            methodName = Metadata.getMethod(metadata);
        } catch (Throwable t) {
            payload.release();
            return Mono.error(t);
        }

        switch(methodName) {
            case "streamGreet6":
                return delegate.fireAndForget(payload)
                               .transform(streamGreet6MeterFunction);
            default: {
                payload.release();
                return Mono.error(new UnsupportedOperationException());
            }
        }
    }

    @Override
    public Mono<Payload> requestResponse(Payload payload) {
        String methodName;

        try {
            ByteBuf metadata = payload.sliceMetadata();
            methodName = Metadata.getMethod(metadata);
        } catch (Throwable t) {
            payload.release();
            return Mono.error(t);
        }

        switch(methodName) {
            case "streamGreet1":
                return delegate.requestResponse(payload)
                               .transform(streamGreet1MeterFunction);
            case "streamGreet4":
                return delegate.requestResponse(payload)
                               .transform(streamGreet4MeterFunction);
            default: {
                payload.release();
                return Mono.error(new UnsupportedOperationException());
            }
        }
    }
....
}
bsideup commented 5 years ago

I think this approach is much cleaner and scales better.

It will also allow having different tracing implementations (Zipkin, OpenTracing, etc...)