bennidi / mbassador

Powerful event-bus optimized for high throughput in multi-threaded applications. Features: Sync and Async event publication, weak/strong references, event filtering, annotation driven
MIT License
955 stars 146 forks source link

Potential lambda enhancement #166

Open regbo opened 3 years ago

regbo commented 3 years ago

I wanted to find a way to use lambdas with this library, so that listeners can be created like this:

MBassadorLFP<Date> bus = new MBassadorLFP<Date>();
var listener1 = bus.subscribe(d -> {
    System.out.println("event consumer 1: " + d);
});

I was able to achieve this by adding 2 methods and a couple of hacks that depend on this library: https://github.com/jhalterman/typetools/

The methods are:

public Consumer<T> subscribe(Consumer<T> listener);

public Consumer<T> subscribe(Consumer<T> listener, HandlerOptions handlerOptions);

This allows me to simplify code as such:

Code:

MBassadorLFP<Date> bus = new MBassadorLFP<Date>();
var listener1 = bus.subscribe(d -> {
    System.out.println("event consumer 1: " + d);
});
Consumer<Date> listener2 = d -> {
    System.out.println("event consumer 2: " + d);
};
bus.subscribe(listener2);
System.out.println("should print 1 & 2");
bus.post(new Date()).now();
bus.unsubscribe(listener1);
System.out.println("should print 2");
bus.post(new Date()).now();
bus.unsubscribe(listener2);
System.out.println("should print none");
bus.post(new Date()).now();
System.err.println("done");

Output:

should print 1 & 2
event consumer 2: Thu Feb 25 18:30:13 EST 2021
event consumer 1: Thu Feb 25 18:30:13 EST 2021
should print 2
event consumer 2: Thu Feb 25 18:30:13 EST 2021
should print none

The HandlerOptions class represents a builder class for the handler annotation. It is accessed in a hacky way by tracking threads. This works for me, but it'd be great if there was a way to have something similar implimented without the current thread hack. Below are the two class modifications

MBassadorLFP.java

@SuppressWarnings("rawtypes")
public class MBassadorLFP<T> extends MBassador<T> {

    private static final Object CONSUMER_ACCEPT_METHOD_NAME = "accept";

    private static final Map<Thread, HandlerOptions> HANDLER_OPTIONS_TRACKER = new ConcurrentHashMap<>();

    /**
     * Default constructor using default setup. super() will also add a default
     * publication error logger
     */
    public MBassadorLFP() {
        this(getDefaultConfiguration());
    }

    /**
     * Construct with default settings and specified publication error handler
     *
     * @param errorHandler
     */
    public MBassadorLFP(IPublicationErrorHandler errorHandler) {
        this(getDefaultConfiguration().addPublicationErrorHandler(errorHandler));
    }

    /**
     * Construct with fully specified configuration
     *
     * @param configuration
     */
    public MBassadorLFP(IBusConfiguration configuration) {
        super(modifyConfiguration(configuration));
    }

    public Consumer<T> subscribe(Consumer<T> listener) {
        return subscribe(listener, null);
    }

    public Consumer<T> subscribe(Consumer<T> listener, HandlerOptions handlerOptions) {
        if (handlerOptions == null)
            handlerOptions = new HandlerOptions();
        var thread = Thread.currentThread();
        HANDLER_OPTIONS_TRACKER.put(thread, handlerOptions);
        try {
            subscribe((Object) listener);
        } finally {
            HANDLER_OPTIONS_TRACKER.remove(thread);
        }
        return listener;
    }

    private static IBusConfiguration modifyConfiguration(IBusConfiguration configuration) {
        if (configuration == null)
            configuration = getDefaultConfiguration();
        var syncPubSub = configuration.getFeature(Feature.SyncPubSub.class);
        if (syncPubSub == null) {
            syncPubSub = Feature.SyncPubSub.Default();
            configuration.addFeature(syncPubSub);
        }
        var delegate = syncPubSub.getMetadataReader();
        syncPubSub.setMetadataReader(new MetadataReader() {

            @Override
            public MessageListener getMessageListener(Class target) {
                MessageListener messageListener;
                if (delegate != null)
                    messageListener = delegate.getMessageListener(target);
                else
                    messageListener = super.getMessageListener(target);
                return modifyMessageListener(target, messageListener);
            }
        });
        return configuration;
    }

    private static MessageListener modifyMessageListener(Class target, MessageListener messageListener) {
        if (!Consumer.class.isAssignableFrom(target))
            return messageListener;
        var handlers = messageListener.getHandlers();
        if (handlers != null && handlers.length > 0)
            return messageListener;
        MessageHandler messageHandler = getConsumerMessageHandler(target, messageListener);
        if (messageHandler != null)
            messageListener.addHandler(messageHandler);
        return messageListener;
    }

    @SuppressWarnings("unchecked")
    private static MessageHandler getConsumerMessageHandler(Class target, MessageListener messageListener) {
        var handlerOptions = HANDLER_OPTIONS_TRACKER.get(Thread.currentThread());
        if (handlerOptions == null)
            return null;
        var applyMethods = getConsumerAcceptMethods(target);
        if (applyMethods.length != 1)
            return null;
//*** WHERE THINGS GET SPICY ***
        var rawArgument = TypeResolver.resolveRawArgument(Consumer.class, target);
        if (rawArgument == null || TypeResolver.Unknown.class.equals(rawArgument))
            return null;
        var handler = applyMethods[0];
        Handler handlerConfig = handlerOptions.toHandler();
        Enveloped enveloped = new Enveloped() {

            @Override
            public Class<? extends Annotation> annotationType() {
                return Enveloped.class;
            }

            @Override
            public Class[] messages() {
                return new Class[] { rawArgument };
            }
        };
        if (!handlerConfig.enabled())
            return null;
        var handlerProperties = MessageHandler.Properties.Create(handler, handlerConfig, enveloped,
                new IMessageFilter[0], messageListener);
        handlerProperties.put(MessageHandler.Properties.Enveloped, false);
        MessageHandler handlerMetadata = new MessageHandler(handlerProperties);
        return handlerMetadata;
    }

    private static Method[] getConsumerAcceptMethods(Class target) {
        var methods = ReflectionUtils.getMethods(m -> {
            if (Modifier.isAbstract(m.getModifiers()))
                return false;
            if (!CONSUMER_ACCEPT_METHOD_NAME.equals(m.getName()))
                return false;
            if (m.getParameterCount() != 1)
                return false;
            if (!Object.class.isAssignableFrom(m.getParameterTypes()[0]))
                return false;
            return true;
        }, target);
        return methods;
    }

    private static IBusConfiguration getDefaultConfiguration() {
        return new BusConfiguration().addFeature(Feature.SyncPubSub.Default())
                .addFeature(Feature.AsynchronousHandlerInvocation.Default())
                .addFeature(Feature.AsynchronousMessageDispatch.Default());
    }

}

HandlerOptions.java

public class HandlerOptions {

    private static final Method HandlerOptions_toHandler_METHOD;
    static {
        try {
            HandlerOptions_toHandler_METHOD = HandlerOptions.class.getMethod("toHandler");
        } catch (NoSuchMethodException e) {
            throw new RuntimeException(e);
        }
    }

    private Filter[] filters;
    private String condition;
    private Invoke delivery;
    private Integer priority;
    private Boolean rejectSubtypes;
    private Boolean enabled;
    private Class<? extends HandlerInvocation> invocation;

    @Handler
    public Handler toHandler() {
        var handler = HandlerOptions_toHandler_METHOD.getAnnotation(Handler.class);
        return new Handler() {

            @Override
            public Class<? extends Annotation> annotationType() {
                return handler.annotationType();
            }

            @Override
            public Filter[] filters() {
                return Optional.ofNullable(filters).orElse(handler.filters());
            }

            @Override
            public String condition() {
                return Optional.ofNullable(condition).orElse(handler.condition());
            }

            @Override
            public Invoke delivery() {
                return Optional.ofNullable(delivery).orElse(handler.delivery());
            }

            @Override
            public int priority() {
                return Optional.ofNullable(priority).orElse(handler.priority());
            }

            @Override
            public boolean rejectSubtypes() {
                return Optional.ofNullable(rejectSubtypes).orElse(handler.rejectSubtypes());
            }

            @Override
            public boolean enabled() {
                return Optional.ofNullable(enabled).orElse(handler.enabled());
            }

            @Override
            public Class<? extends HandlerInvocation> invocation() {
                return Optional.ofNullable(invocation).orElse((Class) handler.invocation());
            }
        };
    }

    public HandlerOptions() {

    }

    public HandlerOptions(Filter[] filters, String condition, Invoke delivery, Integer priority, Boolean rejectSubtypes,
            Boolean enabled, Class<? extends HandlerInvocation> invocation) {
        this.filters = filters;
        this.condition = condition;
        this.delivery = delivery;
        this.priority = priority;
        this.rejectSubtypes = rejectSubtypes;
        this.enabled = enabled;
        this.invocation = invocation;
    }

    public HandlerOptions setFilters(Filter[] filters) {
        this.filters = filters;
        return HandlerOptions.this;
    }

    public Filter[] getFilters() {
        return this.filters;
    }

    public HandlerOptions setCondition(String condition) {
        this.condition = condition;
        return HandlerOptions.this;
    }

    public String getCondition() {
        return this.condition;
    }

    public HandlerOptions setDelivery(Invoke delivery) {
        this.delivery = delivery;
        return HandlerOptions.this;
    }

    public Invoke getDelivery() {
        return this.delivery;
    }

    public HandlerOptions setPriority(Integer priority) {
        this.priority = priority;
        return HandlerOptions.this;
    }

    public Integer getPriority() {
        return this.priority;
    }

    public HandlerOptions setRejectSubtypes(Boolean rejectSubtypes) {
        this.rejectSubtypes = rejectSubtypes;
        return HandlerOptions.this;
    }

    public Boolean getRejectSubtypes() {
        return this.rejectSubtypes;
    }

    public HandlerOptions setEnabled(Boolean enabled) {
        this.enabled = enabled;
        return HandlerOptions.this;
    }

    public Boolean getEnabled() {
        return this.enabled;
    }

    public HandlerOptions setInvocation(Class<? extends HandlerInvocation> invocation) {
        this.invocation = invocation;
        return HandlerOptions.this;
    }

    public Class<? extends HandlerInvocation> getInvocation() {
        return this.invocation;
    }

}