rebus-org / Rebus

:bus: Simple and lean service bus implementation for .NET
https://mookid.dk/category/rebus
Other
2.26k stars 353 forks source link

Sample SubscriberGetsPublishedStrings code raised exception #1130

Closed jmojiwat closed 5 months ago

jmojiwat commented 5 months ago

I'm a long time fan of Rebus since v3. Recently I wanted to write an integration test as suggested here:

https://github.com/rebus-org/Rebus/wiki/How-to-create-integration-tests

[Test]
public async Task SubscriberGetsPublishedStrings()
{
    var network = new InMemNetwork();
    var subscriberStore = new InMemorySubscriberStore();

    using var publisherActivator = new BuiltinHandlerActivator();
    using var subscriberActivator = new BuiltinHandlerActivator();
    using var eventWasReceived = new ManualResetEvent(initialState: false);

    using var publisher = Configure.With(publisherActivator) // rebus v7+: Configure.OneWayClient()
        .Transport(t => t.UseInMemoryTransport(network, "publisher"))
        .Subscriptions(s => s.StoreInMemory(subscriberStore))
        .Start();

    subscriberActivator.Handle<string>(async message => eventWasReceived.Set());

    var subscriber = Configure.With(subscriberActivator)
        .Transport(t => t.UseInMemoryTransport(network, "subscriber"))
        .Subscriptions(s => s.StoreInMemory(subscriberStore))
        .Start();

    await subscriber.Subscribe<string>();

    await publisher.Publish("HEJ MED DIG MIN VEN");

    Assert.That(eventWasReceived.WaitOne(TimeSpan.FromSeconds(5)), Is.True, 
        "Did not receive the published event within 5 s timeout");
}

I copy-pasted the code but got an error:

System.InvalidOperationException Attempted to register primary -> Rebus.Subscriptions.ISubscriptionStorage, but a primary registration already exists: primary -> Rebus.Subscriptions.ISubscriptionStorage at Rebus.Injection.Injectionist.Register[TService](Func2 resolverMethod, Boolean isDecorator, String description) at Rebus.Injection.Injectionist.Register[TService](Func2 resolverMethod, String description) at Rebus.Config.StandardConfigurer1.Register(Func2 factoryMethod, String description) at Rebus.Persistence.InMem.InMemorySubscriptionStorageExtensions.StoreInMemory(StandardConfigurer`1 configurer, InMemorySubscriberStore subscriberStore)

I've also tried

Configure.OneWayClient() 

and received a similar error. Any pointers?

I'm using Rebus 8.1.0 and Rebus.TestHelpers 8.0.0.

mookid8000 commented 5 months ago

This is because the in-mem transport in Rebus 8 automatically registers itself as a subscription storage, thus removing the need for the subscriber store above. 🙂

IOW just delete subscriberStore and remove all uses of it, and you should be good.

OTOH if you WANT to manually handle subscriptions (e.g. for integrations testing one's own subscription storage implementation, etc.), you can make the in-mem transport behave the old way by passing registerSubscriptionStorage: false to it like this:

services.AddRebus(
    configure => configure
        .Transport(t => t.UseInMemoryTransport(network, "my-queue", registerSubscriptionStorage: false))
);