BrighterCommand / Brighter

A framework for building messaging apps with .NET and C#.
https://www.goparamore.io/
MIT License
2k stars 256 forks source link

[Bug] Combining producer registries results in `AccessViolationException` instances for Kafka producers #3257

Closed dhickie closed 2 weeks ago

dhickie commented 4 weeks ago

Describe the bug

Brighter has recently introduced the option of combining multiple producer registries into a single registry, allowing applications to publish to multiple message transports.

This is done by creating the producer registries for different transports, and then combining them using the Merge() extension method. However, as the message producers inside those registries are passed by reference, the instances are shared between the original producer registry and the combined one.

When the original registries fall out of scope, the next time the GC runs it will dispose the registries and their underlying message producers. In the case of Kafka, the next time a disposed message producer is retrieved from the combined producer registry and an attempt is made to publish a message, an AccessViolationException is thrown by the Confluent Kafka SDK, as it is no longer looking at valid memory.

This will likely result in errors for other transports as well when the message producers are disposed.

To Reproduce

To reproduce the issue for Kafka, create a Kafka producer registry:

var kafkaProducerRegistry = new KafkaProducerRegistryFactory(
    kafkaGatewayConfig,
    new KafkaPublication[]
    {
        new KafkaPublication()
        {
            Topic = new RoutingKey(kafkaConfig.KafkaTopic),
            MakeChannels = OnMissingChannel.Create
        }
    })
    .Create();

Then create another producer registry for some other transport (in this example, SNS):

var snsProducerRegistry = new SnsProducerRegistryFactory(
    snsGatewayConfig,
    new SnsPublication[]
    {
        new SnsPublication
        {
            Topic = new RoutingKey(snsConfig.SnsTopic),
            FindTopicBy = TopicFindBy.Convention,
            MakeChannels = OnMissingChannel.Assume
        }
    })
    .Create();

Merge the two registries into a single combined registry:

var combinedProducerRegistry = kafkaProducerRegistry.Merge(snsProducerRegistry);

Pass the combined registry to UseExternalBus():

var brighterBuilder = services.AddBrighter()
    .UseExternalBus(combinedProducerRegistry)
...

Run the application. The first time GC runs, the application will crash with an AccessViolationException.

Workaround

The workaround currently is to ensure that the original registries are never disposed by the GC. There are a few ways to do this, such as:

Possible fix

The issue ultimately arises from having to create producer registries which the user has no intent on using, and only really wants the IAmAMessageProducer instances inside. One way of resolving this would be requiring each transport package to implement both an implementation of IAmAProducerRegistryFactory and also a new interface called IAmAMessageProducerFactory. The latter would simply return a collection of message producers for a particular transport.

In the core package, we would then have a CombinedProducerRegistryFactory, which takes an arbitrary number of IAmAMessageProducerFactory instances, and calls their Create() methods to create and return a ProducerRegistry instance containing all the message producers.

In practice, this would look like:

var kafkaProducerFactory = new KafkaProducerFactory(
    kafkaGatewayConfig,
    new KafkaPublication[]
    {
        ...
    });
var snsProducerFactory = new SnsProducerFactory(
    snsGatewayConfig,
    new SnsPublication[]
    {
        ...
    });

var combinedProducerRegistry = new CombinedProducerRegistryFactory(
    kafkaProducerFactory,
    snsProducerFactory).Create();

Further technical details

iancooper commented 4 weeks ago

I think this approach makes sense. Better to just end up with the CombinedProducerRegistryFactory that can cope with this, and retire the V9 merge approach. It solves the actual problem of heterogeneous producers over being a workaround