jkone27 / AliceMQ

A reactive client library for RabbitMQ and Google PubSub, with observables, acknowledge and reject capabilities
MIT License
5 stars 2 forks source link
alice consumer google-pubsub messaging messaging-library queue rabbitmq rabbitmq-client

Ceasefire Now Stand With Ukraine

AliceMQ NuGet Badge

A reactive client library with support for RabbitMq and experimental support for google pubsub (TBR), using reactive extensions for .net

local environment setup

for rabbitmq:

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

or for the google pubsub version you can run an emulator like

docker run --rm -ti -p 8681:8681 -e PUBSUB_PROJECT1=test-proj,topic1:subscription1 messagebird/gcloud-pubsub-emulator:latest     

contributions welcome

Mailman (Producer)

Usage of a mailman is dead simple:

using AliceMQ.Core.Message;
using AliceMQ.Core.Types;
using AliceMQ.Rabbit.MailBox; 
using AliceMQ.Rabbit.Mailman; 
//for for g pubsub version: using AliceMQ.PubSub;
//parameters are slightly different..

var source = new Source("A", "A.q");
var endPoint = new EndPoint();
var sink = new Sink(source);

var serialization = new JsonSerializerSettings
    {
        MissingMemberHandling = MissingMemberHandling.Error
    };

var p = new Mailman(endPoint, source.Exchange, s => JsonConvert.SerializeObject(s, serialization));

//first message published creates exchange if non existent
p.PublishOne(new Msg(-1),"");

Now let's see the simplest form of consumer, which is just a thin layer from the real MQ system...

SimpleMailbox (Consumer of BasicDeliverEventArgs)

Consumer subscription is identical for every type, giving an istance of an IObservable (rx).

using AliceMQ.Mailbox;

var mb = new SimpleMailbox(endPoint, sink);

using var d = mb.Subscribe(am =>
{
    Console.WriteLine("A - " + Encoding.UTF8.GetString(am.EventArgs.Body));
    am.Channel.BasicAck(am.EventArgs.DeliveryTag, false);
});

Mailbox\ (Consumer of T)

let's consider an example DTO class Msg, the typed consumer is build upon the common consumer, which is enhanced with message body deserialization into an istance of a generic T type.

var sfm = new Mailbox<Msg>(endPoint, sink, s => JsonConvert.DeserializeObject<Msg>(s, serialization));

using var d = sfm.Subscribe(am =>
{
    if (am.IsOk<Msg>())
    {
        var msg = am.AsOk<Msg>().Message;
        Console.WriteLine("ok - " + msg.Bla);
        am.Confirm();
    }
    else
    {
        Console.WriteLine("error - " + am.AsError().Ex.Message);
        am.Reject();
    }
},
ex => Console.WriteLine("COMPLETE ERROR"),
() => Console.WriteLine("COMPLETE"));

Status

Build Status