Effortlessly send messages anywhere on the network using Reactive Extensions (RX). Uses NetMQ as the transport layer.
Fast! Runs at >120,000 messages per second on localhost (by comparison, Tibco runs at 100,000 on the same machine).
The API is a drop-in replacement for Subject<T>
from Reactive Extensions (RX).
As a refresher, to use Subject<T>
in Reactive Extensions (RX):
var subject = new Subject<int>();
subject.Subscribe(message =>
{
// If we get an error "Cannot convert lambda ... not a delegate type", install Reactive Extensions from NuGet.
Console.Write(message); // Prints "42".
});
subject.OnNext(42);
The new API starts with a drop-in replacement for Subject<T>
:
var subject = new SubjectNetMQ<int>("tcp://127.0.0.1:56001");
subject.Subscribe(message =>
{
Console.Write(message); // Prints "42".
});
subject.OnNext(42); // Sends 42.
This is great for a demo, but is not recommended for any real life application.
For those of us familiar with Reactive Extensions (RX), Subject<T>
is a combination of a publisher and a subscriber. If we are running a real-life application, we should separate out the publisher and the subscriber, because this means we can create the connection earlier which makes the transport setup more deterministic:
var publisher = new PublisherNetMq<int>("tcp://127.0.0.1:56001");
var subscriber = new SubscriberNetMq<int>("tcp://127.0.0.1:56001");
subscriber.Subscribe(message =>
{
Console.Write(message); // Prints "42".
});
publisher.OnNext(42); // Sends 42.
If we want to run in separate applications:
// Application 1 (subscriber)
var subscriber1 = new SubscriberNetMq<int>("tcp://127.0.0.1:56001");
subscriber1.Subscribe(message =>
{
Console.Write(message); // Prints "42".
});
// Application 2 (subscriber)
var subscriber2 = new SubscriberNetMq<int>("tcp://127.0.0.1:56001");
subscriber2.Subscribe(message =>
{
Console.Write(message); // Prints "42".
});
// Application 3 (publisher)
var publisher = new PublisherNetMq<int>("tcp://127.0.0.1:56001");
publisher.OnNext(42); // Sends 42.
Currently, serialization is performed using ProtoBuf. It will handle simple types such as int
without annotation, but if we want to send more complex classes, we have to annotate like this:
// For Protobuf support, include NuGet package protobuf-net from Marc Gravell.
[ProtoContract]
public struct MyMessage
{
[ProtoMember(1)]
public int Num { get; set; }
[ProtoMember(2)]
public string Name { get; set; }
}
var publisher = new PublisherNetMq<MyMessage>("tcp://127.0.0.1:56001");
var subscriber = new SubscriberNetMq<MyMessage>("tcp://127.0.0.1:56001");
subscriber.Subscribe(message =>
{
Console.Write(message.Num); // Prints "42".
Console.Write(message.Name); // Prints "Bill".
});
publisher.OnNext(new MyMessage(42, "Bill");
The NuGet package 0.9.3 is designed for .NET 4. It depends on Reactive Extensions v2.2.5 (this is difficult to find, can download the packages manually from NuGet).
The NuGet package 0.9.4-rc7 is designed .NET Core 1.1, .NET 4.5, and .NET Standard 1.6. If you want to build it for other platforms, please let me know. If you have trouble loading this, load the Git branch for the 0.9.3 release.
As of v0.9.4-rc7, this package will build for:
As this library supports .NET Standard 1.6 (which is a subset of .NET Core 1.1), this library should be compatible with:
This library is tested on Window and Linux. If it passes it's unit tests on any given platform, then it should perform nicely on different architectures such as Mac.
project.lock.json
file). You can either do this from the command line, or by right clicking on the solution and choosing Restore NuGet packages
.NOTE: Not compatible with .NET Core 1.0 or .NET Core 1.0.1. Must install .NET Core 1.1 and above to avoid potential compile errors.
To check out the demos, see:
NetMQ.ReactiveExtensions.SamplePublisher
NetMQ.ReactiveExtensions.SampleSubscriber
NetMQ.ReactiveExtensions.Tests
.Where()
, .Select()
, .Buffer()
, .Throttle()
, etc..OnNext()
, .OnException()
, and .OnCompleted()
.See the Wiki with more documentation.