Eventuous / dotnet-sample

Sample application using Eventuous .NET
38 stars 11 forks source link

In need of guidance on how to use the AddCompositionEventHandler in a SignalR use case #3

Closed JimiSweden closed 1 year ago

JimiSweden commented 1 year ago

Hi 😃

I want to add a CompositionEventHandler and I looked at the implementation of Eventuous.Subscriptions.Polly.SubscriptionBuilderExtensions; AddEventHandlerWithRetries I also looked at Eventuous.EventStore.Subscriptions.SubscriptionBuilderExtensions; UseCheckpointStore

But I don't really get it to work 🤣 So please show me what I'm doing wrong 🙏

What I want to achieve A Wrapping handler that sends messages on SignalR IHubContext after a MongoDbProjection is executed.

Everything works as expected with my Wrapper; executing after the wrapped EventHandler

my IHubContext when used from BookingsHubOnMongoDbProjected* never has any clients, although client is online and gets messages from other places; such as inside MyBookingsProjection (added here for testing, but I don't want it here) and from a controller method in CommandApi**

when using the BookingHubService - Yes clients when using the IHubContext - Yes clients

I have also tried to use a Transient version of BookingHubService (as a "don't think it will matter but I'm running out of ideas") IHubContext still has no clients.

Implementation

Program.cs

//add SignalR before Eventuous due to DI of IHubContext in HubService inside Eventuous subscribers.
builder.Services.AddSignalR(hubOptions =>
{
    hubOptions.EnableDetailedErrors = true;
});

builder.Services.AddEventuous(builder.Configuration);
//...

Registrations.cs - AddEventuous()..

//...

//signalr stuff for AddEventHandlerWithSignalrMessaging
services.AddSingleton<IBookingsHubService, BookingsHubService>(s =>
    //added this as a test, but nothing changes.
    new BookingsHubService(s.GetRequiredService<IHubContext<BookingsHub>>())
);

services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
            "BookingsProjections",
            builder => builder
                //.Configure(cfg => cfg.ConcurrencyLimit = 2)
                .UseCheckpointStore<MongoCheckpointStore>()
                .AddEventHandler<BookingStateProjection>()
        // -- this does not work (my IHubContext never has any clients, although client is online and gets messages from other places, such as inside MyBookingsProjection and from a controller method in CommandApi)
                .AddEventHandlerWithSignalrMessaging<MyBookingsProjection>()
                .AddEventHandler<StatisticsOfBookingsProjection>()
                //TODO: add projection holding the available rooms and booked dates
                .WithPartitioningByStream(2)
        );
//...

My Bookings.Hubs.SubscriptionBuilderExtensions.cs

public static class SubscriptionBuilderExtensions
{
     public static SubscriptionBuilder AddEventHandlerWithSignalrMessaging<TInnerHandler>(
        this SubscriptionBuilder builder
    ) where TInnerHandler : class, IEventHandler
    {

        var hubService = sp.GetRequiredService<IBookingsHubService>();

        //var sp = builder.Services.BuildServiceProvider();
        //Since I'm not 100% sure if getting required service here could somehow create another "instance" in the registration in SubscriptionBuilder
        //I tried to pass the ServiceProvider and get required service inside BookingsHubOnMongoDbProjected,
        //although I belive it should never make a difference, but I'm trying "everything" :D 

        return builder.AddCompositionEventHandler<TInnerHandler, BookingsHubOnMongoDbProjected>(
            h => new BookingsHubOnMongoDbProjected(h, hubService)
            //h => new BookingsHubOnMongoDbProjected(h, sp)
        );
    }
}

My Bookings.Hubs.BookingsHubOnMongoDbProjected.cs

public class BookingsHubOnMongoDbProjected : IEventHandler
{
    readonly IEventHandler _inner;
    private readonly IBookingsHubService? _bookingsHubService;
    private readonly IHubContext<BookingsHub>? _hubContext; //testing with hubcontext directly    

    public string DiagnosticName { get; }

    /// <summary>
    /// Using BookingHubService (which uses IHubContext)
    /// </summary>
    /// <param name="inner"></param>
    /// <param name="bookingsHubService"></param>
    public BookingsHubOnMongoDbProjected(IEventHandler inner, IBookingsHubService bookingsHubService)
    {
        _inner = inner;
        _bookingsHubService = bookingsHubService;
        //_bookingsHubContext = bookingsHubContext;
        //_afterMongoDbAction = afterMongoDbAction;
        DiagnosticName = _inner.DiagnosticName;
    }

    /// <summary>
    /// Using IHubContext directly
    /// </summary>
    /// <param name="inner"></param>
    /// <param name="hubContext"></param>
    public BookingsHubOnMongoDbProjected(IEventHandler inner, IHubContext<BookingsHub> hubContext)
    {
        _inner = inner;
        _hubContext = hubContext;
        DiagnosticName = _inner.DiagnosticName;
    }

    /// <summary>
    /// Exectues inner event followed by sending event messsage using SignalR
    /// </summary>
    /// <param name="context"></param>
    /// <returns></returns>
    public async ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext context)
    {
        return await Execute();

        async Task<EventHandlingStatus> Execute()
        {
            try
            {
                //removes version prefix from message type
                var messageTypeName = context.MessageType.Split('.').ToList().Last();

                var status = await _inner.HandleEvent(context);
                var streamId = context.Stream.GetId();

                if (status == EventHandlingStatus.Success)
                {
                    //set status message to "failure"

                    switch (messageTypeName)
                    {
                        case nameof(BookingEvents.V1.RoomBooked):
                            _bookingsHubService?.RoomBooked(streamId).ConfigureAwait(false);
                            _hubContext?.Clients.Group("AllLoggedIn").SendAsync("LoggedIn", "test message from wrapped handler");

                            break;
                        case nameof(BookingEvents.V1.BookingChanged):
                            _bookingsHubService?.BookingChanged(streamId).ConfigureAwait(false);
                            _hubContext.Clients?.Group("AllLoggedIn").SendAsync("LoggedIn", "testmeddelande from wrapped handler");
                            break;
                        default:
                            break;
                    }

                }
                else if (status == EventHandlingStatus.Failure)
                {
                    //set status message to "failure"
                }
                return status;

            }
            catch (Exception e)
            {
                Debugger.Break();
                throw;
            }
        }
    }
}
alexeyzimarev commented 1 year ago

SignalR hubs register as transient, and event handlers always register as singletons. You can't have a singleton having a transient dependency as it effectively makes that dependency a singleton. It gets instantiated once when the application starts receiving events, and it doesn't connect to anything.

JimiSweden commented 1 year ago

Thanks Alexey :) Yes i know that the hub is transient ant handlers are singletons, And controllers being transient explains why the hubcontext work inside a controller. But how come I can use the hubcontext inside a singleton service being called from a controller?

My idea for getting around the issue is to use Mediatr or something similar. What do you think is a good solution? Thanks!

alexeyzimarev commented 1 year ago

I don't know really, haven't thought about it. Why do you need a query model to be sent to the client, is the state and the new events collection not enough?

JimiSweden commented 1 year ago

Well, I don't need the query model to be sent. I wanted to broadcast a message but only after the data is available in Mongo. F ex:

I can for sure do this in other ways,

But perhaps I'm using Eventuous in a "wrong way" here.

Regarding using Mediator or similar I reallise I will end up with the same issue of the Hubcontext not being the same/synced. Thus I think I would have to use somehing like an observable queue, event bus or separate API where SignalR is acting on incoming messages.

And regarding the service registration (not my strongest in depth knowledge) I realised I can not use "builder.Services.BuildServiceProvider();" to get a serviceprovider for Geting a service - as this would be on a different "container/instance"

JimiSweden commented 1 year ago

This is how I solved handling sending messages to SignalR (in an "ugly way") since I couldn't figure out how to use the existing functionality in Eventuous to subscribe to All with "subscribe from now" aka "Volatile subscriptions". I did try to figure out how to implement the abstract class EventSubscription but didn't get it work.

in Registrations.cs I subscribe to the "catch-up" AllStreamSubscription. And use the MongoCheckpointStore as a "first filter" to not get old messages. Then to avoid handling old messages if the MongoCheckpointStore gets reset/deleted I use a ConsumeFilter to only allow "recently" created events.

services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
            "SubscriptionForSendningSignalRMessages",
            builder => builder
                .UseCheckpointStore<MongoCheckpointStore>() //to get latest checkpoint as a first filter to not act on old events.
                //Ugly filter.. to not get old messages, in case the checkpoint store is reset..
                //should be handled with a subscription only listening "from now" : TODO: How? (perhaps creating a standard subscription directly from ES client?
                //not sure how much this newing up datetime would impact performance in a production system.
                .AddConsumeFilterFirst(new MessageFilter(ctx => ctx.Created.ToUniversalTime() > DateTime.UtcNow.AddSeconds(-15))) //created seems to be utc always, but making sure it is...
                .AddEventHandler<BookingsSignalRHandler>()
            );

BookingsSignalRHandler .cs


/// <summary>
/// Important, don't use this without filtering out old events,
/// Preferably us it in a subscription where you subscribe from "now".
/// </summary>
public class BookingsSignalRHandler : EventHandler
{
    private readonly IBookingsHubService _bookingsHubService;

    public BookingsSignalRHandler(IBookingsHubService bookingsHubService)
    {
        _bookingsHubService = bookingsHubService;

        //On<BookingPaymentRecorded>(async ctx => await HandlePayment(ctx.Message, ctx.CancellationToken));

        On<V1.RoomBooked>(async ctx => await HandleRoomBooked(ctx, ctx.CancellationToken));

        On<V1.BookingChanged>(async ctx => await HandleBookingChanged(ctx, ctx.CancellationToken));

        On<V1.BookingCancelled>(async ctx => await HandleBookingCancelled(ctx, ctx.CancellationToken));

    }

    private async Task HandleBookingCancelled(MessageConsumeContext<V1.BookingCancelled> consumeContextMessage, CancellationToken cancellationToken)
    {
        throw new NotImplementedException();

    }

    private async Task HandleBookingChanged(MessageConsumeContext<V1.BookingChanged> ctx, CancellationToken cancellationToken)
    {
        await _bookingsHubService.BookingChanged(ctx.Stream.GetId(), ctx.Message);
    }

    private async Task HandleRoomBooked(MessageConsumeContext<V1.RoomBooked> ctx, CancellationToken cancellationToken)
    {
        throw new NotImplementedException();
    }
}

and the BookingsHubService.cs (just wrapping the IHubContext for now)

public interface IBookingsHubService //: IBookingsHub
{

    public Task RoomBooked(string bookingId);
    public Task BookingChanged(string bookingid, BookingEvents.V1.BookingChanged bookingChanged);
  //todo: bookingCancelled

}
public class BookingsHubService : IBookingsHubService
{
    private readonly IHubContext<BookingsHub> _hubContext;

    /// <summary>
    /// Note:  When client methods are called from outside of the Hub class,
    /// there's no caller associated with the invocation.
    /// Therefore, there's no access to the ConnectionId, Caller, and Others properties.
    /// (using Groups and "Personal Groups" solves parts of this) 
    /// </summary>
    /// <param name="hubContext"></param>
    public BookingsHubService(IHubContext<BookingsHub> hubContext)
    {
        _hubContext = hubContext;
    }

    public async Task RoomBooked(string bookingId)
    {
        await _hubContext.Clients.All.SendAsync("RoomBooked", bookingId);
    }

    public async Task BookingChanged(string bookingId, BookingEvents.V1.BookingChanged bookingChanged)
    {
            var message = new
            {
                Title = "Room reserved",
                bookingChanged.RoomId,
                bookingChanged.CheckInDate,
                bookingChanged.CheckOutDate,
            };
            await _hubContext.Clients.All.SendAsync("BookingChanged", message);
    }
} 
alexeyzimarev commented 1 year ago

If you need a volatile subscription, you can read one event from $all backwards, and then assign the commit position as the start for the NoOpCheckpointStore. It doesn't persist anything, just gives you the option to set the start position to whatever you want, and eventually get back the current position.

alexeyzimarev commented 1 year ago

Btw your domain model seems a bit weak. I know it is off topic, just want to give you feedback.

BookingChanged smells CRUD, and you can look deeper in the domain to distill it. Why would someone "change" the booking? If they want to check in earlier - it's one thing, If they want to check out later - it's another. Changing the room is allocation. All of that would require engaging different constraints when it comes to availability and pricing.

I understand that it's more like a demo :) Still, for a good demo you'd benefit showing the real potential of a proper domain model, where ChangeBooking would serve as an example of something you should not have there.