GokGokalp / MetroBus

Lightweight messaging wrapper of MassTransit
MIT License
72 stars 24 forks source link
consumer dotnet dotnet-core dotnetcore masstransit message-bus messaging rabbitmq service-bus

MetroBus


alt tag

Lightweight messaging wrapper of MassTransit

NuGet version NuGet version NuGet version NuGet version

NuGet Packages

> dotnet add package MetroBus
> dotnet add package MetroBus.Autofac
> dotnet add package MetroBus.Microsoft.Extensions.DependencyInjection

Supports:

Features:

Usage:

Initializing bus instance for Producer:

// For events
IBusControl bus = MetroBusInitializer.Instance.UseRabbitMq(rabbitMqUri, rabbitMqUserName, rabbitMqPassword)
                    .InitializeEventProducer();

// For commands
ISendEndpoint bus = MetroBusInitializer.Instance.UseRabbitMq(rabbitMqUri, rabbitMqUserName, rabbitMqPassword)
                    .InitializeCommandProducer(queueName);

after bus instance initializing then you can use Send or Publish methods.

// For events
await bus.Publish<TEvent>(new
{
    SomeProperty = SomeValue
}));

// For commands
await bus.Send<TCommand>(new
{
    SomeProperty = SomeValue
}));

using Consumer:

static void Main(string[] args)
{
    IBusControl bus = MetroBusInitializer.Instance
                        .UseRabbitMq(string rabbitMqUri, string rabbitMqUserName, string rabbitMqPassword)
                        .RegisterConsumer<TCommandConsumer>(string queueName)
                        .RegisterConsumer<TEventConsumer>(string queueName)
                        .Build();

    bus.Start();

    //if you want to stop
    bus.Stop();

    Console.ReadLine();
}

TCommandConsumer could like below:

public class TCommandConsumer : IConsumer<TCommand>
{
    public async Task Consume(ConsumeContext<TCommand> context)
    {
        var command = context.Message;

        //do something...
        await Console.Out.WriteAsync($"{command.SomeProperty}");
    }
}

Initializing bus instance for Request/Response conversation:

IRequestClient<TRequest, TResponse> client = MetroBusInitializer.Instance.UseRabbitMq(string rabbitMqUri, string rabbitMqUserName, string rabbitMqPassword)
                                                                    .InitializeRequestClient<TRequest, TResponse>(string queueName);

TResponse result = await client.Request(new TRequest
{
    Command = "Say hello!"
});

and consumer for Request/Response conversation could like below:

public class TCommandConsumer : IConsumer<TRequest>
{
    public async Task Consume(ConsumeContext<TRequest> context)
    {
        var command = context.Message;

        //do something...
        await Console.Out.WriteAsync($"{command.SomeProperty}");

        //and
        context.Respond(new TRequest
            {
                Command = "Hello!"
            });
    }
}

using Consumer with Microsoft.Extensions.DependencyInjection:

new HostBuilder ()
    .ConfigureServices ((hostContext, services) =>
    {
        services.AddMetroBus (x =>
        {
            x.AddConsumer<TCommandConsumer>();
            x.AddConsumer<TEventConsumer>();
        });

        services.AddSingleton<IBusControl> (provider => MetroBusInitializer.Instance
                .UseRabbitMq (string rabbitMqUri, string rabbitMqUserName, string rabbitMqPassword)
                .RegisterConsumer<TCommandConsumer>("foo.command.queue", provider)
                .RegisterConsumer<TEventConsumer>("foo.event.queue", provider)
                .Build ())
            .BuildServiceProvider ();

        services.AddHostedService<BusService> ();
    })
    .RunConsoleAsync ().Wait ();

public class BusService : IHostedService
{
    private readonly IBusControl _busControl;

    public BusService(IBusControl busControl)
    {
        _busControl = busControl;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        return _busControl.StartAsync(cancellationToken);
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        return _busControl.StopAsync(cancellationToken);
    }
}

PS: Publisher and Consumer services must be used same TCommand or TEvent interfaces. This is important for MassTransit integration. Also one other thing is rabbitMqUri parameter must start with "rabbitmq://" prefix.

There are several options you can set via fluent interface: