rabbitmq / rabbitmq-stream-dotnet-client

RabbitMQ client for the stream protocol
https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html
Other
122 stars 41 forks source link

Event bus [Proposal] #320

Closed Gsantomaggio closed 11 months ago

Gsantomaggio commented 1 year ago

ref: https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/issues/316

The idea of this implementation is to have the possibility to subscribe to a partial event.

so like:

  StreamEventsBusSingleton.Instance.Publish<ProducerConnectEvent>(new ProducerConnectEvent(this, _config));

and somewhere to subscribe to the event based on the type, like:

 StreamEventsBusSingleton.Instance.Subscribe<ProducerConnectEvent>(p =>
 {
   Console.WriteLine($"ProducerConnectEvent , Stream {p.Config.Stream}, event type {p.EventType}, event severity 
   {p.EventSeverity}" );
});

In this way we can implement different events with detailed information and (why not ) with the class instance that raised the event, for example:

    public class ProducerConnectEvent : IStreamEvent
    {
        public ProducerConnectEvent(RawProducer producer, RawProducerConfig config)
        {
            Producer = producer;
            Config = config;
        }

        public RawProducerConfig Config { get; }

        public EventTypes EventType => EventTypes.Connection;
        public EventSeverity EventSeverity => EventSeverity.Info;
        public RawProducer Producer { get; }
    }
codecov[bot] commented 1 year ago

Codecov Report

Attention: 3 lines in your changes are missing coverage. Please review.

Comparison is base (a515ea9) 92.66% compared to head (638c3de) 92.73%. Report is 1 commits behind head on main.

:exclamation: Current head 638c3de differs from pull request most recent head 906fa39. Consider uploading reports for the commit 906fa39 to get more accurate results

Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #320 +/- ## ========================================== + Coverage 92.66% 92.73% +0.07% ========================================== Files 113 116 +3 Lines 9961 10195 +234 Branches 825 840 +15 ========================================== + Hits 9230 9454 +224 - Misses 555 562 +7 - Partials 176 179 +3 ``` | [Files](https://app.codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/pull/320?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=rabbitmq) | Coverage Δ | | |---|---|---| | [RabbitMQ.Stream.Client/EntityInfo.cs](https://app.codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/pull/320?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=rabbitmq#diff-UmFiYml0TVEuU3RyZWFtLkNsaWVudC9FbnRpdHlJbmZvLmNz) | `100.00% <100.00%> (ø)` | | | [RabbitMQ.Stream.Client/EventBus/StreamEvents.cs](https://app.codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/pull/320?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=rabbitmq#diff-UmFiYml0TVEuU3RyZWFtLkNsaWVudC9FdmVudEJ1cy9TdHJlYW1FdmVudHMuY3M=) | `100.00% <100.00%> (ø)` | | | [RabbitMQ.Stream.Client/IConsumer.cs](https://app.codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/pull/320?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=rabbitmq#diff-UmFiYml0TVEuU3RyZWFtLkNsaWVudC9JQ29uc3VtZXIuY3M=) | `100.00% <100.00%> (ø)` | | | [RabbitMQ.Stream.Client/IProducer.cs](https://app.codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/pull/320?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=rabbitmq#diff-UmFiYml0TVEuU3RyZWFtLkNsaWVudC9JUHJvZHVjZXIuY3M=) | `100.00% <100.00%> (ø)` | | | [RabbitMQ.Stream.Client/RawConsumer.cs](https://app.codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/pull/320?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=rabbitmq#diff-UmFiYml0TVEuU3RyZWFtLkNsaWVudC9SYXdDb25zdW1lci5jcw==) | `84.36% <100.00%> (ø)` | | | [RabbitMQ.Stream.Client/RawProducer.cs](https://app.codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/pull/320?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=rabbitmq#diff-UmFiYml0TVEuU3RyZWFtLkNsaWVudC9SYXdQcm9kdWNlci5jcw==) | `88.53% <100.00%> (+0.18%)` | :arrow_up: | | [RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs](https://app.codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/pull/320?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=rabbitmq#diff-UmFiYml0TVEuU3RyZWFtLkNsaWVudC9SYXdTdXBlclN0cmVhbUNvbnN1bWVyLmNz) | `94.30% <100.00%> (ø)` | | | [RabbitMQ.Stream.Client/RawSuperStreamProducer.cs](https://app.codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/pull/320?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=rabbitmq#diff-UmFiYml0TVEuU3RyZWFtLkNsaWVudC9SYXdTdXBlclN0cmVhbVByb2R1Y2VyLmNz) | `96.80% <100.00%> (+0.07%)` | :arrow_up: | | [RabbitMQ.Stream.Client/Reliable/Consumer.cs](https://app.codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/pull/320?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=rabbitmq#diff-UmFiYml0TVEuU3RyZWFtLkNsaWVudC9SZWxpYWJsZS9Db25zdW1lci5jcw==) | `100.00% <100.00%> (ø)` | | | [RabbitMQ.Stream.Client/Reliable/Producer.cs](https://app.codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/pull/320?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=rabbitmq#diff-UmFiYml0TVEuU3RyZWFtLkNsaWVudC9SZWxpYWJsZS9Qcm9kdWNlci5jcw==) | `84.00% <100.00%> (-0.03%)` | :arrow_down: | | ... and [5 more](https://app.codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/pull/320?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=rabbitmq) | | ... and [1 file with indirect coverage changes](https://app.codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/pull/320/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=rabbitmq)

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

jonnepmyra commented 1 year ago

Looks good.

We will need to execute async code when the events are raised.

Do you think the we could have async subscriptions aswell? So the registered action would be a Func<IStreamEvent, Task>and that the Publish part of the lib would await our registered callback/subscriber?

Gsantomaggio commented 1 year ago

have async subscriptions aswell?

Yes

TroelsL commented 1 year ago

I think this proposal covers our operational needs. But I am not certain which events the "Reconnecting" state would entail. From the WIP code, it looks like there are two event types: Connection and Disconnection.

I assume that, if modelled as a state machine, a Producer would start off in Disconnected. It would likely only have a single transition from there to Connected. But would there only be these two states, or would Reconnecting be it's own, and if so: would there be a "Reconnection" event published?

Gsantomaggio commented 1 year ago

it looks like there are two event types: Connection and Disconnection.

@TroelsL this was just a proposal. Of course, we will add other types like:

etc...

Gsantomaggio commented 1 year ago

It is still a work in progress. The bus can handle the events from the RawProducer and the Producer classes and only connection/disconnection/reconnection.

So given:

rabbitmq-streams add_super_stream invoices --partitions 3

And:

 var streamSystem = await StreamSystem.Create(new StreamSystemConfig());
        await streamSystem.CreateStream(new StreamSpec("my-stream"));
        IEventBus bus = new StreamEventsBus();
        bus.Subscribe<RawProducerConnected>(async connected =>
        {
            myLogger.LogInformation(
                "The raw producer {ClientProvidedName} is connected to the stream {Stream}",
                connected.Parameters.ClientProvidedName, connected.Instance.Info.Stream);

            await Task.CompletedTask;
        });

        bus.Subscribe<RawProducerDisconnected>(async disconnected =>
        {
            myLogger.LogInformation(
                "The raw producer {ClientProvidedName} is disconnected from the stream {Stream}",
                disconnected.Parameters.ClientProvidedName, disconnected.Instance.Info.Stream);
            await Task.CompletedTask;
        });

        bus.Subscribe<ProducerReconnected>(async reconnected =>
        {
            var value = (reconnected.IsReconnection) ? "is in reconnection.." : "ended the reconnection";
            myLogger.LogInformation("The producer {ClientProvidedName} {Value} to the stream {Stream}", reconnected.Instance.Info.ClientProvidedName, value, reconnected.Instance.Info.Stream);
            await Task.CompletedTask;
        });

        var superStreamProducer = await Producer.Create(new ProducerConfig(streamSystem, SystemUtils.InvoicesExchange)
        {
            SuperStreamConfig = new SuperStreamConfig()
            {
                Routing = message => message.Properties.MessageId.ToString(),
                RoutingStrategyType = RoutingStrategyType.Hash
            },
            ClientProvidedName = "my_super_producer",
            Events = bus,
        });

        var standardProducer = await Producer.Create(new ProducerConfig(streamSystem, "my-stream")
        {
            ClientProvidedName = "my_producer",
            Events = bus,
        });

        for (ulong i = 0; i < 2000; i++)
        {
            var message = new Message(Encoding.Default.GetBytes("hello"))
            {
                Properties = new Properties() {MessageId = $"hello{i}"}
            };
            await superStreamProducer.Send(message);
            Thread.Sleep(1 * 1000);
        }
    }

The connection part for the super stream is:

[10:23:16] info: example.MyEventsTest[0] The raw producer my_super_producer#invoices-0 is connected to the stream invoices-0
[10:23:16] info: example.MyEventsTest[0] The raw producer my_super_producer#invoices-1 is connected to the stream invoices-1
[10:23:16] info: example.MyEventsTest[0] The raw producer my_super_producer#invoices-2 is connected to the stream invoices-2

for the standard stream is:

[10:23:16] info: example.MyEventsTest[0] The raw producer my_producer is connected to the stream my-stream

I case I kill the standard producer connection the events will be:

[10:23:45] info: example.MyEventsTest[0] The raw producer my_producer is disconnected from the stream my-stream
[10:23:45] info: example.MyEventsTest[0] The producer my_producer is in reconnection.. to the stream my-stream
[10:23:45] info: example.MyEventsTest[0] The raw producer my_producer is connected to the stream my-stream
[10:23:45] info: example.MyEventsTest[0] The producer my_producer ended the reconnection to the stream my-stream

The behaviour for the super stream is a bit different: the connection is re-created as soon a message is sent to the partition stream.

The basic idea is to define your bus with the subscriptions you want to listen to and handle it with different information.

The bus is an interface public interface IEventBus.So the user can write its own event logger in case the default one is not enough.

Gsantomaggio commented 11 months ago

closed in favour of https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/issues/336