rabbitmq / rabbitmq-stream-dotnet-client

RabbitMQ client for the stream protocol
https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html
Other
122 stars 41 forks source link

Filter not set when creating a DeduplicatingProducer #382

Closed ArticOne closed 5 months ago

ArticOne commented 5 months ago

Describe the bug

When creating a DeduplicatingProducer with a filter, like this:

var deduplicatingProducerConfig = new DeduplicatingProducerConfig(
    streamSystem,
    "TestStream",
    "Producer-1")
{
    Filter = new ProducerFilter()
    {
        FilterValue = GetFilterValue // This is a method that returns a string which consumers filter on
    }
};

var producer = await DeduplicatingProducer.Create(deduplicatingProducerConfig);

private string GetFilterValue(Message message)
{
    return message.ApplicationProperties["LocationId"]?.ToString() ?? "";
}

And then consuming from the stream like this:

var consumer = await Consumer.Create( 
    new ConsumerConfig( 
        streamSystem,
        "TestStream")
    {
        Filter = new ConsumerFilter()
        {
            Values = ["-1", "9"],
            PostFilter = message =>
            {
                return new[] { "-1", "9" }.Contains(message.ApplicationProperties["LocationId"]);
            },
            MatchUnfiltered = false // Messages would be dispatched only if this property was set to true. In that case, the PostFilter func (set above) would be called
        },
        Reference = "Location1",
        OffsetSpec = new OffsetTypeOffset(offset),
        MessageHandler = async (stream, consumer, context, message) =>
        {
            // Handler code
        }
    }
);

messages would not be consumed unless MatchUnfiltered was set to true.

Reproduction steps

  1. On the producing side, create a DeduplicatingProducer by setting the Filter property of its DeduplicatingProducerConfig to a non null value
  2. On the consuming side, create a Consumer by setting the Filter property of its ConsumerFilter to a non null value while also setting the ConsumerFilter's MatchUnfiltered property to false

After these steps, published messages would not be consumed.

Expected behavior

When using a DeduplicatingProducer with the Filtering feature, I'm expecting the consumer to consume messages based on ConsumerFilter.Values only (not taking the bloom filter's false positives into account).

Additional context

Following is a screenshot of a Create static method in DeduplicatingProducer that returns a new DeduplicatingProducer.

When creating a regular Producer, which DeduplicatingProducer wraps, the Filter property is not passed from DeduplicatingProducerConfig to ProducerConfig.

Here's a link that leads to the said static method.

image

Gsantomaggio commented 5 months ago

Thank you, @ArticOne, for the detailed report. Yes you are right.

Gsantomaggio commented 5 months ago

@ArticOne do you have the chance to test https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/383 ? thank you

ArticOne commented 5 months ago

@ArticOne do you have the chance to test #383 ? thank you

Sure, I'll try it out a bit later.

ArticOne commented 5 months ago

Okay, so I manually tryed it out. Should be good now.

I published 90 messages with the old and new version of the producer, with each producing to its own stream like this:

var j = 0ul;

for (int i = 1; i <= 30; i++)
{
    var mes = new Message(Encoding.UTF8.GetBytes($"Data V1 - {i}"));
    mes.ApplicationProperties = new ApplicationProperties()
    {
        { "K1", "V1" }
    };
    await producer.Send(++j, mes);
}

for (int i = 1; i <= 30; i++)
{
    var mes = new Message(Encoding.UTF8.GetBytes($"Data V2 - {i}"));
    mes.ApplicationProperties = new ApplicationProperties()
    {
        { "K1", "V2" }
    };

    await producer.Send(++j, mes);
}

for (int i = 1; i <= 30; i++)
{
    var mes = new Message(Encoding.UTF8.GetBytes($"Data V3 - {i}"));
    mes.ApplicationProperties = new ApplicationProperties()
    {
        { "K1", "V3" }
    };

    await producer.Send(++j, mes);
}

On the consumer side I would enter the filter value ("V1", "V2" or "V3"):

I also ran the new unit test, it passes.

Additionally, I published the same 90 messages using a regular producer (not the DeduplicatingProducer one). Interestingly, when consuming from a stream that had messages only from this regular producer, fewer false positives have occured.

Gsantomaggio commented 5 months ago

fyi: https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/releases/tag/v1.8.6