Closed joehanna closed 3 years ago
Hey Joe,
Thank you for your support of the library! :-)
Easy.MessageHub is an implementation of the Event Aggregator pattern and I have intentionally tried to avoid adding features that would deviate from the pattern. IMO any additional logic that would determine who receives the message would turn this into a Mediator which is not the problem the library is concerned with. In fact, I think the library is feature complete unless of course bugs are found which I will fix.
For your particular use-case, I would suggest housing this logic within your subscriber; Given you know the type of the message e.g. Person
, you can combine Easy.MessageHub with System.Threading.Channel
whereby any incoming invocation through the hub is funnelled into the Channel<Person>
within which you will have a bunch of Func<Person, bool>
deciding how you should handle the message.
I highly recommend using this library in conjunction with System.Threading.Channel as it unlocks many potentials which allows you to build higher-level features on top the library.
HTH.
for the record do you have an example of what are suggesting? You mean to have a "singleton" channel for that message type and then within the channel consumer redirect the event to who has to receive it based on the list of Fun<person, bool> ?
I have an event bus similar to your, where I built a SubscriptionManager
class to handle better the registration/unregistration process and in it I've a clause (something that I think could be useful to you too) and I've added a simple method like this to handle the "clause"
/// <summary>
/// Registers a <see cref="IEventBus"/> message <paramref name="handler"/> that will be automatically
/// unregistered once the component gets disposed.
/// </summary>
/// <param name="subscription">The subscription.</param>
protected void RegisterLifeTimeEventHandler<TEventBase>(Func<TEventBase, ValueTask> handler, Func<TEventBase, bool>? clause = null) where TEventBase : EventBase
{
_eventSubscriptionManager.RegisterSubscriptions(eventBus.Subscribe<TEventBase>(async (message) =>
{
// ensure we listen only to events we are interested into
if (clause != null && !clause(message)) return;
await handler(message).ConfigureAwait(false);
}));
}
it's not a perfect solution (performance wise) because this causes to have an handler wrapped within another handler but it does it's job
P.S. my implementation is opensourced here if you want to adopt the SubscriptionManager in your library https://github.com/MithrilMan/MithrilShards/tree/master/src/MithrilShards.Core/EventBus
Here's a quick example which should give you a good perf (so long as your not adding or removing registrations too many times) and more ideas on how to expand:
void Main()
{
IMessageHub hub = new MessageHub();
using var fancyHub = new MyFancyHub<MyBaseEvent>(hub);
Action<MyBaseEvent> handlesAllEvents = x => Console.WriteLine("[A] " + x.ToString());
Action<MyBaseEvent> handlesPositiveEvents = x => Console.WriteLine("[+] " + x.ToString());
Action<MyOtherEvent> handlesOtherEvents = x => Console.WriteLine("[O] " + x.ToString());
fancyHub.Register(handlesAllEvents);
fancyHub.Register(handlesPositiveEvents, x => x.Number > 0);
fancyHub.Register(handlesOtherEvents, x => x.Name == "Foo");
fancyHub.Publish(new MyBaseEvent { Number = 42 });
fancyHub.Publish(new MyOtherEvent { Number = -42, Name = "Foo" });
Console.ReadLine();
}
public sealed class MyFancyHub<T> : IDisposable
{
private readonly IMessageHub _hub;
private readonly Channel<T> _channel;
private readonly CancellationTokenSource _cts;
private readonly Guid _token;
private readonly List<FancyRegistration> _registrations;
public MyFancyHub(IMessageHub hub)
{
_hub = hub;
UnboundedChannelOptions options = new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
SingleWriter = false,
SingleReader = true
};
_channel = Channel.CreateUnbounded<T>(options);
_cts = new CancellationTokenSource();
_registrations = new List<FancyRegistration>();
_token = _hub.Subscribe<T>(x => _channel.Writer.TryWrite(x));
_ = StartReadingFromChannel(_channel.Reader, _cts.Token);
}
public void Publish<TPayload>(TPayload payload) where TPayload : T => _hub.Publish(payload);
public Guid Register<TPayload>(Action<TPayload> handler) where TPayload : T => Register(handler, x => true);
public Guid Register<TPayload>(Action<TPayload> handler, Predicate<TPayload> predicate) where TPayload : T
{
var registration = new FancyRegistration(handler, predicate, typeof(TPayload));
lock (_registrations)
{
_registrations.Add(registration);
}
return registration.Token;
}
public bool Remove(Guid token)
{
lock (_registrations)
{
int possibleIdx = _registrations.FindIndex(x => x.Token == token);
if (possibleIdx < 0)
{
return false;
}
_registrations.RemoveAt(possibleIdx);
return true;
}
}
public void Dispose()
{
_hub.Unsubscribe(_token);
_channel.Writer.Complete();
_cts.Cancel();
}
private Task StartReadingFromChannel(ChannelReader<T> reader, CancellationToken cToken) =>
Task.Factory.StartNew(async () =>
{
while (await reader.WaitToReadAsync(cToken).ConfigureAwait(false))
{
while (reader.TryRead(out T item))
{
if (cToken.IsCancellationRequested) { break; }
lock (_registrations)
{
foreach (var registration in _registrations)
{
bool handled = registration.TryHandle(item);
}
}
}
}
},
TaskCreationOptions.LongRunning);
private sealed class FancyRegistration
{
private readonly Delegate _handler;
private readonly Delegate _predicate;
private readonly Type _payloadType;
public FancyRegistration(Delegate handler, Delegate predicate, Type payloadType)
{
_handler = handler;
_predicate = predicate;
_payloadType = payloadType;
Token = Guid.NewGuid();
}
public Guid Token { get; }
public bool TryHandle<TIn>(TIn input) where TIn : T
{
Type inputType = input.GetType();
bool shouldHandle = inputType.IsAssignableTo(_payloadType);
if (!shouldHandle) { return false; }
if (_predicate is Predicate<TIn> pred && pred(input))
{
var handler = (Action<TIn>)_handler;
handler(input);
return true;
}
if ((bool)_predicate.DynamicInvoke(input))
{
_handler.DynamicInvoke(input);
return true;
}
return false;
}
}
}
public class MyBaseEvent
{
public int Number { get; set; }
public override string ToString() => Number.ToString();
}
public sealed class MyOtherEvent : MyBaseEvent
{
public string Name { get; set; }
public override string ToString() => $"{Number.ToString()} | {Name.ToString()}";
}
ok I see thanks, as I thought, even if I'd avoid DynamicInvoke that's very slow, you'd want to use a generic predicate only
Another thing to point out is that this way the consumer is on its own thread: events are executed sequentially but it's no more blocking and so you can't use events to drive the workflow if you want, it may or may not be problem depending your needs, it's fine as the user knows the limitations
what I mean is that you can't do like
class MyEvent{
public bool Cancel
}
var myEvent = new MyEvent();
eventbus.Publish(myEvent);
if(myEvent.Cancel){
stop();
}
P.S. I don't encourage anyway to have events that control flows, even if this way you can have your dynamic pipeline that can manage an object and sometime can be useful
The performance of DynamicInvoke
can easily be addressed I leave that as an excercise for the reader! And I agree that event cancellation as you showed is a bad idea ;-)
I have been using Easy MessageHub for years now and I couldn't live without - thank you!
Have you explored the idea of conditional subscriptions? ie. subscribing to not only the
Type
but a subset of instances by checking a field value of the object.Guid token = hub.Subscribe<Person>(p => Console.WriteLine($"Id is: {p.Id}"));
gives you all instances ofPerson
.If I only wanted to subscribe to
Person
whereGender == Male
, then it would allow for more targeted subscriptions and various areas of the app would not unnecessarily receive events to then have to filter the contents - this is a lot of unnecessary processing.I understand this could be achieved by subclassing
Person
but you would need to know the filter at build time. I have a case where the user can choose the filter value at runtime so it would be great to be able to add that to the subscription method.This idea is very much inspired by Rx but didn't want to have to use that whole library for limited cases.
Thanks!