Nowadays, no one considers implementing the communication layer using pure sockets. Some messaging libraries are developed such as ZeroMQ and its .NET Framework port called NetMQ. A number of problems solved by them is enormous: sending information over the network in a lightweight manner, making sure that a whole message is received or distributing messages to many subscribers. In the real world scenarios, it is not enough.
Let's consider how alternatives look like. The HTTP protocol provides different endpoints - every message is appropriately deserialized and routed to methods in your code. The messaging system in ZeroMQ does not contain any header information and, therefore, does not provide any similar functionality.
The NetmqRouter acts as an additional layer of abstraction between your code and communication layer. In the internal implementation, every message consists of two message parts: a header (containing name of the communication route aka address) and a message body.
Package name | Description | Link |
---|---|---|
Message.Router | A main routing package. Required. | nuget |
Message.Router.NetMQ | Support for NetMQ framework. Recommented. | nuget |
Message.Router.Json | General serializer for the JSON format. Recommented. | nuget |
Message.Router.Xml | General serializer for the Xml format. | nuget |
Here you can find all packages available on the Nuget website: link
This library uses reflection to analyze your code and configure the system behind the scenes. Annotate you endpoint with the Route attribute and as an argument pass name of the communication route. The library will automatically analyze if your endpoint method contains an argument that can be used as a message payload.
class ExampleSubscriber
{
[Route("TestRoute")]
public string Test(string value)
{
// your business logic here
}
}
Your endpoints can simply respond to any message using the return value. All scenarios are allowed :)
class ExampleSubscriber
{
[Route("TestRoute")]
[ResponseRoute("AnotherRoute")]
public string Test(string value)
{
// this endpoint will respond with text message
}
[Route("TestRoute")]
[ResponseRoute("AnotherRoute")]
public void Test(string value)
{
// this endpoint will respond with an event
}
}
Empty payloads are treated as a special type of messages, here called "events". Two scenarios are mainly interesting:
Please note, that you can use event approach along the message pattern, e.g. an endpoint can subscribe an event but return text message. All combinations are allowed.
class ExampleSubscriber
{
[Route("TestRoute")]
public void Test()
{
// event subscriber
}
[Route("TestRoute")]
[ResponseRoute("AnotherRoute")]
public void Test(string text)
{
// event emitter
}
[Route("TestRoute")]
[ResponseRoute("AnotherRoute")]
public void Test()
{
// event subscriber and emitter at the same time
}
}
You can annotate your class with the BaseRoute attribute to use subscribe to a specified route or family of routes.
[BaseRoute("BaseRoute")]
class ClassWithBaseRoute
{
[Route("IncomingRoute")]
[ResponseRoute("OutcomingRoute")]
public void Handler()
{
// this endpoint will:
// - subscribe messages from "BaseRoute/IncomingRoute" route,
// - emit messages to "OutcomingRoute" route.
}
}
If you want to do everything for yourself, send a simple message via the MessageRouter method:
// sending event
router.SendMessage("SomeRoute");
// sending text message
router.SendMessage("AnotherRoute", "Hello world!");
// available methods
public void SendMessage(string routeName);
public void SendMessage(string routeName, byte[] data);
public void SendMessage(string routeName, string text);
public void SendMessage(string routeName, object _object);
It is possible to register your own methods as subscribers to specified routes.
router.RegisterSubscriber("IncomingRoute", () => { });
router.RegisterSubscriber<string>("IncomingRoute", payload => { });
router.RegisterSubscriber("IncomingRoute", "OutcomingRoute", () => "Hello world");
router.RegisterSubscriber<string, string>("IncomingRoute", "OutcomingRoute", payload => "Hello " + payload);
Fluent APIs are sexy but always comes with additional performance cost. Here you do not need to be afraid but it is always good to take into consideration during profiling sessions. Below you can find some examples how to use MessageRouter fluent API:
var router = NetmqMessageRouter
.WithPubSubConnecton(publisherSocket, subscriberSocket)
.RegisterGeneralSerializer(new JsonObjectSerializer()) // requires nuget package!
.RegisterRoute("VectorRoute", typeof(Vector))
.RegisterRoute("VectorLengthRoute", typeof(double));
// registering a subscriber that process the data
router
.Subscribe("VectorRoute")
.WithResponse("VectorLengthRoute")
.WithHandler((Vector vector) =>
{
return Math.Sqrt(vector.X * vector.X + vector.Y * vector.Y);
});
// registering a subscriber that receives the calculation result
router
.Subscribe("VectorLengthRoute")
.WithHandler((double x) => Console.WriteLine(x));
router.StartRouting();
// sending your message
router
.SendMessage(new Vector() { X = 3, Y = 4 })
.To("VectorRoute");
This library contains preconfigured configuration layer to most popular communication patterns in NetMQ:
// basic example for the publisher-subscriber pattern
var publisherSocket = new PublisherSocket();
publisherSocket.Bind(Address);
var subscriberSocket = new SubscriberSocket();
subscriberSocket.Connect(Address);
var router = MessageRouter.WithPubSubConnecton(publisherSocket, subscriberSocket)
// available methods
public static MessageRouter WithPubSubConnecton(PublisherSocket publisherSocket, SubscriberSocket subscriberSocket);
public static MessageRouter WithPubSubConnecton(string publishAddress, string subscribeAddress);
public static MessageRouter WithPushPullConnection(PushSocket pushSocket, PullSocket pullSocket);
public static MessageRouter WithPushPullConnection(string pushAddress, string pullAddress);
public static MessageRouter WithPairConnection(PairSocket socket);
public static MessageRouter WithPairConnection(string address);
Do you have a special communication layer with custom business rules? Implement the IConnection interface and be free!
// declare your custom connection handler
public class PairConnection : IConnection
{
PairSocket Socket { get; }
private readonly object _socketLock = new object();
public PairConnection(PairSocket socket)
{
Socket = socket;
}
public void SendMessage(SerializedMessage message)
{
lock(_socketLock)
Socket.SendMessage(message);
}
// returns true if message was received successfully
public bool TryReceiveMessage(out SerializedMessage message)
{
lock(_socketLock)
return Socket.TryReceiveMessage(out message);
}
public void Connect(IEnumerable<string> routeNames) { }
public void Disconnect()
{
Socket?.Close();
Socket?.Dispose();
}
}
// create message router
var socket = new PairSocket(Address);
var connection = new PairConnection(socket);
var router = new MessageRouter(connection);
Your endpoints can return any type of message as long as the library can serialize it to the binary format. Use any serialization library you want - you can use JSON or XML protocols - or implement your own solution. Batteries are included for types:
router.RegisterTypeSerializerFor(new RawDataTypeSerializer());
router.RegisterTypeSerializerFor(new BasicTextTypeSerializer());
router.RegisterGeneralSerializerFor(new JsonObjectSerializer()); // requires nuget package!
Or use helpers provided by additional nuget packages:
router.RegisterJsonSerializer();
router.RegisterXmlSerializer();
It is possible to register a serializer designed for the specialized type. Below you can find an example implementation of text serializer:
// create the serializer
public class BasicTextTypeSerializer : ITypeSerializer<string>
{
private readonly Encoding _encoding;
/// <param name="encoding">Encoding that will be used for text serialization.</param>
public BasicTextTypeSerializer(Encoding encoding)
{
_encoding = encoding;
}
public BasicTextTypeSerializer() : this(Encoding.UTF8)
{
}
public byte[] Serialize(string text) => _encoding.GetBytes(text);
public string Deserialize(byte[] data) => _encoding.GetString(data);
}
// register the serializer
router.RegisterTypeSerializerFor(new BasicTextTypeSerializer());
It is possible to register a serializer designed for the group of types. Below you can find an example implementation of JSON serializer:
// create the serializer
public class JsonObjectSerializer : IGeneralSerializer<object>
{
private readonly Encoding _encoding;
/// <param name="encoding">Encoding that will be used for text serialization.</param>
public JsonObjectSerializer(Encoding encoding)
{
_encoding = encoding;
}
public JsonObjectSerializer() : this(Encoding.UTF8)
{
}
public byte[] Serialize(object _object)
{
var json = JsonConvert.SerializeObject(_object);
return _encoding.GetBytes(json);
}
public object Deserialize(byte[] data, Type targetType)
{
var json = _encoding.GetString(data);
return JsonConvert.DeserializeObject(json, targetType);
}
}
// register the serializer
router.RegisterGeneralSerializerFor(new JsonObjectSerializer());
There are four main sources (and therefore four types) of exceptions in the NetmqRouter library:
Please note that by declaring your own business logic (e.g. connection protocols, serializers or endpoints) you are creating code that potentially may throw an exception. All of them will be available in the InnerException field.
The ConfigurationException can occur in the code where your router is configured. Other exceptions are published in the special event:
router.OnException += exception =>
{
// handle any exception here
// good proposition: handle it using any logging library
Console.WriteLine(exception.Message);
};
[TestFixture]
public class MessagesRouterTests
{
private const string Address = "tcp://localhost:6000";
// will be serialized as JSON
class CustomPayload
{
public string Text { get; set; }
public int Number { get; set; }
public CustomPayload(string text, int number)
{
Text = text;
Number = number;
}
public override bool Equals(object obj)
{
return obj is CustomPayload o &&
this.Number == o.Number;
}
}
class ExampleSubscriber
{
public CustomPayload PassedValue;
[Route("TestRoute")]
public void Test(CustomPayload value)
{
PassedValue = value;
}
}
[Test]
public async Task RoutingTest()
{
var publisherSocket = new PublisherSocket();
publisherSocket.Bind(Address);
var subscriberSocket = new SubscriberSocket();
subscriberSocket.Connect(Address);
var subscriber = new ExampleSubscriber();
var router = MessageRouter
.WithPubSubConnecton(publisherSocket, subscriberSocket)
.RegisterTypeSerializer(new RawDataTypeSerializer())
.RegisterTypeSerializer(new BasicTextTypeSerializer())
.RegisterGeneralSerializer(new JsonObjectSerializer()) // requires nuget package!
.RegisterRoute("TestRoute", typeof(CustomPayload))
.RegisterSubscriber(subscriber)
.StartRouting();
router.SendMessage("TestRoute", new CustomPayload("Hello world", 123));
router.OnException += exception =>
{
// handle any exception
};
await Task.Delay(TimeSpan.FromSeconds(3));
router
.StopRouting()
.Disconnect();
var expectedValue = new CustomPayload("Hello world", 123);
Assert.AreEqual(expectedValue, subscriber.PassedValue);
}
}
There are several ways to improve your code quality. Please consider applying them to your system:
public interface IMessageRouter : IMessageRouterManager, IMessageRouterConfiguration, IMessageSender, IExceptionSource, IDisposable
{
}
public interface IMessageSender
{
void SendMessage(string routeName);
void SendMessage(string routeName, byte[] data);
void SendMessage(string routeName, string text);
void SendMessage(string routeName, object _object);
}
// your class containg route names
internal static class MessageRoutes
{
public static readonly string IncomingRoute = "IncomingRoute";
public static readonly string OutcomingRoute = "OutcomingRoute";
public static readonly string BananaRoute = "BananaRoute";
}
// your subscriber
class ExampleSubscriber
{
[Route(MessageRoutes.IncomingRoute)]
public string Test(string value)
{
// your business logic here
}
}
// somewhere in the code where you are sending a message
router.SendMessage(MessageRoutes.BananaRoute, "Hello world!");
This library uses a worker system in order to process messages. All workers are working in parallel in separate threads, processing data on different life stages:
If the serialization process or message handling logic is expensive, a single worker instance (per job type) might be not enough. Scale the performance easily by increasing number of workers. You are able to change how many workers will be started when called the StartRoute method - just tune the number of serialization and handler workers individually.
Default values:
router
// your router configuration goes here
.WithWorkerPool(numberOfSerializationWorkes: 2, numberOfHandlingWorkes: 6)
.StartRouting();
This library can be used in heavy environments because does not introduce any significant performance impact. Don't worry about your message, everything is well tested :)
Do you want to improve the architecture of your system and use this library? Congratulations, you made a good choice! At this moment, this framework is still under development and shouldn't be used in production but in nearest weeks it would be totally stable.