dotnet / aspire

Tools, templates, and packages to accelerate building observable, production-ready apps
https://learn.microsoft.com/dotnet/aspire
MIT License
3.82k stars 455 forks source link

Kafka WithDataVolume and WithDataBindMount don't persist messages #4909

Closed eerhardt closed 1 month ago

eerhardt commented 3 months ago

Is there an existing issue for this?

Describe the bug

Calling WithDataVolume or WithDataBindMount on a Kafka resource doesn't persist data across runs of the application. We should ensure we have the right settings set up so the data gets persisted across app runs.

Expected Behavior

When using WithDataVolume or WithDataBindMount, the messages that are produced should be perserved across app runs. So when the app spins up again, consuming those messages should be possible.

Steps To Reproduce

Run the following test

    [Theory]
    [InlineData(true)]
    [InlineData(false)]
    [RequiresDocker]
    public async Task WithDataShouldPersistStateBetweenUsages(bool useVolume)
    {
        var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
        var topic = "test-topic";
        string? volumeName = null;
        string? bindMountPath = null;

        try
        {
            var builder1 = CreateDistributedApplicationBuilder();
            var kafka1 = builder1.AddKafka("kafka");

            if (useVolume)
            {
                // Use a deterministic volume name to prevent them from exhausting the machines if deletion fails
                volumeName = VolumeNameGenerator.CreateVolumeName(kafka1, nameof(WithDataShouldPersistStateBetweenUsages));

                // if the volume already exists (because of a crashing previous run), try to delete it
                DockerUtils.AttemptDeleteDockerVolume(volumeName);
                kafka1.WithDataVolume(volumeName);
            }
            else
            {
                bindMountPath = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
                kafka1.WithDataBindMount(bindMountPath);
            }

            using (var app = builder1.Build())
            {
                await app.StartAsync();
                try
                {
                    var hb = Host.CreateApplicationBuilder();

                    hb.Configuration.AddInMemoryCollection(new Dictionary<string, string?>
                    {
                        [$"ConnectionStrings:{kafka1.Resource.Name}"] = await kafka1.Resource.ConnectionStringExpression.GetValueAsync(default)
                    });

        hb.AddKafkaProducer<string, string>("kafka");

                    using (var host = hb.Build())
                    {
                        await host.StartAsync();

                        var producer = host.Services.GetRequiredService<IProducer<string, string>>();
                        for (var i = 0; i < 10; i++)
                        {
                            await producer.ProduceAsync(topic, new Message<string, string> { Key = "test-key", Value = $"test-value{i}" });
                        }
                    }
                }
                finally
                {
                    // Stops the container, or the Volume/mount would still be in use
                    await app.StopAsync();
                }
            }

            var builder2 = CreateDistributedApplicationBuilder();
            var kafka2 = builder2.AddKafka("kafka");

            if (useVolume)
            {
                kafka2.WithDataVolume(volumeName);
            }
            else
            {
                kafka2.WithDataBindMount(bindMountPath!);
            }

            using (var app = builder2.Build())
            {
                await app.StartAsync();
                try
                {
                    var hb = Host.CreateApplicationBuilder();

                    hb.Configuration.AddInMemoryCollection(new Dictionary<string, string?>
                    {
                        [$"ConnectionStrings:{kafka2.Resource.Name}"] = await kafka2.Resource.ConnectionStringExpression.GetValueAsync(default)
                    });

                    hb.AddKafkaConsumer<string, string>("kafka", consumerBuilder =>
                    {
                        consumerBuilder.Config.GroupId = "aspire-consumer-group";
                        consumerBuilder.Config.AutoOffsetReset = AutoOffsetReset.Earliest;
                    });

                    using (var host = hb.Build())
                    {
                        await host.StartAsync();

                        var consumer = host.Services.GetRequiredService<IConsumer<string, string>>();
                        consumer.Subscribe(topic);
                        for (var i = 0; i < 10; i++)
                        {
                            var result = consumer.Consume(cts.Token);

                            Assert.Equal($"test-key", result.Message.Key);
                            Assert.Equal($"test-value{i}", result.Message.Value);
                        }
                    }
                }
                finally
                {
                    // Stops the container, or the Volume/mount would still be in use
                    await app.StopAsync();
                }
            }
        }
        finally
        {
            if (volumeName is not null)
            {
                DockerUtils.AttemptDeleteDockerVolume(volumeName);
            }

            if (bindMountPath is not null)
            {
                try
                {
                    File.Delete(bindMountPath);
                }
                catch
                {
                    // Don't fail test if we can't clean the temporary folder
                }
            }
        }
    }

Exceptions (if any)

  Message: 
Confluent.Kafka.ConsumeException : Subscribed topic not available: test-topic: Broker: Unknown topic or partition

  Stack Trace: 
Consumer`2.Consume(Int32 millisecondsTimeout)
Consumer`2.Consume(CancellationToken cancellationToken)
KafkaFunctionalTests.WithDataShouldPersistStateBetweenUsages(Boolean useVolume) line 169

.NET Version info

No response

Anything else?

No response

eerhardt commented 3 months ago

cc @stbau04 @g7ed6e

stbau04 commented 3 months ago

Looks like the Kafta data directory is wrong. Should probably be /tmp/kraft-combined-logs (https://hub.docker.com/r/confluentinc/confluent-local) instead of /var/lib/kafka/data. This lets the test fail with another exception:

Message:  Confluent.Kafka.ProduceException`2[[System.String, System.Private.CoreLib, Version=8.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version=8.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]] : Local: Message timed out

Stack trace:  Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken) KafkaFunctionalTests.WithDataShouldPersistStateBetweenUsages(Boolean useVolume) Zeile 127 KafkaFunctionalTests.WithDataShouldPersistStateBetweenUsages(Boolean useVolume) Zeile 135 --- End of stack trace from previous location ---

Changes are added in the draft https://github.com/dotnet/aspire/pull/4947

g7ed6e commented 3 months ago

~Having a look there it looks like 2 volumes are required one for data and on for secrets. I need to give this a try.~

edit: It looks like a solution has been found and the folder is no longer an issue.

joedyndale commented 2 months ago

This is still an issue for me. Using Aspire.Hosting.Kafka version 8.2.0 on MacOS and OrbStack (Docker alternative). Tried all the WithDataVolume(), WithVolume(), WithBindMount() etc. methods and none of them result in persisted messages.

AlexanderBartoshZ commented 1 month ago

Kafka does check the the /var/lib/kafka/data for writability, but writes data to /tmp/kraft-combined-logs You can change the /tmp/kraft-combined-logs to be /var/lib/kafka/data by setting KAFKA_LOG_DIRS

This workaround solves it for me (Checked only Docker desktop so far):

var kafka = builder.AddKafka("kafka")
    .WithDataVolume("ape_kafka_volume_v2")
    .WithKafkaUI();

var volumeMount = kafka.Resource.Annotations.OfType<ContainerMountAnnotation>().SingleOrDefault();
if (volumeMount != null)
    kafka.WithEnvironment("KAFKA_LOG_DIRS", volumeMount.Target);
davidfowl commented 1 month ago

Thanks @AlexanderBartoshZ and @Alirexaa !