dotnet / aspire

Tools, templates, and packages to accelerate building observable, production-ready apps
https://learn.microsoft.com/dotnet/aspire
MIT License
3.91k stars 476 forks source link

Using custom schema registry for Kafka component requires custom hostname #3910

Open ErikBIMobject opened 7 months ago

ErikBIMobject commented 7 months ago

Visual studio enterprise: Version 17.10.0 Preview 4.0 Running Aspire version: 8.0.0-preview.6.24178.5

Trying to add a schema registry using a new resource according to this docker compose:

`version: '3.8'
services:

  broker:
    image: confluentinc/confluent-local:7.6.0
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092
      KAFKA_DEBUG: "true"
      KAFKA_SCHEMA_REGISTRY_URL: http://schema-registry:1223

  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

The only way I can get the schema registry to find the kafka server (also created a stackoverflow ticket about this) is to use the hostname of the broker in the schema registry variable "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS" along with the variable "KAFKA_ADVERTISED_LISTENERS" = "PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092" (in the kafka component this is set to localhost not broker). Please advice. Getting the schema registry up and running would be a major piece in our puzzle.

davidfowl commented 7 months ago

@g7ed6e can you take a look?

g7ed6e commented 7 months ago

The broker container needs to know which name clients use to reach it in order to add this name to its KAFKA_ADVERTISED_LISTENERS env var. I'm currently having the same issue trying to integrate a KafkaUI container to the Aspire.Hosting.Kafka project. I need to try something similar to what Aspire.Hosting.Mongo is doing to get MongoExpress connect to MongoDb. MongoExpress solves this by using PrimaryEndpoint.ContainerHost and PrimaryEndpoint.Port to build the connection string https://github.com/dotnet/aspire/blob/bade408eecae2cb080b6de111c97bc290bd2c776/src/Aspire.Hosting.MongoDB/MongoDBBuilderExtensions.cs#L125.

davidfowl commented 7 months ago

This completely nerd sniped me. I went down the kafka rabbit hole https://www.confluent.io/blog/kafka-listeners-explained/.

I recommend reading https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/networking-overview to get a baseline for understanding networking in aspire.

What we haven't documented is how endpoint references work between aspire resources using C# object references. e.g.:

// Add the kafka broker
var broker = builder.AddContainer("broker", "confluent-local", "7.6.0")
                    .WithImageRegistry("confluentinc")
                    .WithEndpoint(name: "primary", targetPort: 9092)
                    .WithEndpoint(targetPort: 9101);

// Add the schema registry
var schemaRegistry = builder.AddContainer("schema-registry", "cp-schema-registry", "7.6.1")
                            .WithImageRegistry("confluentinc")
                            .WithHttpEndpoint(name: "primary", targetPort: 8081);

broker.WithEnvironment(context =>
{
    var selfEndpoint = broker.GetEndpoint("primary");

    // Set the advertised listeners to the public port
    var advertisedListeners = builder.ExecutionContext.IsRunMode
        // This is a workaround for https://github.com/dotnet/aspire/issues/3735
        ? ReferenceExpression.Create($"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://{selfEndpoint.ContainerHost}:{selfEndpoint.Property(EndpointProperty.Port)}")
        : ReferenceExpression.Create($"PLAINTEXT://{selfEndpoint.Property(EndpointProperty.Host)}:29092,PLAINTEXT_HOST://{selfEndpoint.Property(EndpointProperty.Host)}:{selfEndpoint.Property(EndpointProperty.Port)}");

    context.EnvironmentVariables["KAFKA_ADVERTISED_LISTENERS"] = advertisedListeners;

    // Set the URL to the schema registry
    context.EnvironmentVariables["KAFKA_SCHEMA_REGISTRY_URL"] = schemaRegistry.GetEndpoint("primary");
});

schemaRegistry.WithEnvironment(context =>
{
    // Get the broker endpoint
    var brokerEndpoint = broker.GetEndpoint("primary");

    // Create a reference to the broker endpoint, doing it this way will ensure that it works in both run and publish mode
    var brokerConnectionReference = builder.ExecutionContext.IsRunMode
        // This is a workaround for https://github.com/dotnet/aspire/issues/3735
        ? ReferenceExpression.Create($"{brokerEndpoint.ContainerHost}:{brokerEndpoint.Property(EndpointProperty.Port)}")
        : ReferenceExpression.Create($"{brokerEndpoint.Property(EndpointProperty.Host)}:{brokerEndpoint.Property(EndpointProperty.Port)}");

    context.EnvironmentVariables["SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS"] = brokerConnectionReference;
    context.EnvironmentVariables["SCHEMA_REGISTRY_HOST_NAME"] = "schema-registry";
});

@g7ed6e We should update the advertised listeners setting in our Kafka resource to support multiple host names so that containers and host processes can access the broker server. I am hoping that will just work πŸ˜„ .

g7ed6e commented 7 months ago

@davidfowl Yes. Ideally I think we should use the container name as this should enable cluster with multiple broker nodes scenario too. I did not find in the resource properties the generated docker container name yet maybe I missed an api. An alternative to pull the generated container name would be to set it to a stable fixed value.. but cluster with multiple broker nodes scenario would probably rely on naming conventions.

davidfowl commented 7 months ago

@davidfowl Yes. Ideally I think we should use the container name as this should enable cluster with multiple broker nodes scenario too.

docker networks aren't supported https://github.com/dotnet/aspire/issues/850.

ErikBIMobject commented 7 months ago

Thank you for your answers! Im glad it sparked an interest :) I checked your solution @davidfowl and indeed it works, the schema registry can connect to the broker. But the api can no longer connect to the broker πŸ€·β€β™‚οΈ I need to have 2 different setting for the PLAINTEXT_HOST part of the KAFKA_ADVERTISED_LISTENERS variable: this is needed for the schema registry: PLAINTEXT_HOST://{selfEndpoint.ContainerHost}:{selfEndpoint.Property(EndpointProperty.Port)} and this is needed for the api: PLAINTEXT_HOST://localhost:{selfEndpoint.Property(EndpointProperty.Port)}

I have looked at the networking guide for Aspire previously, and I graced through the kafka-listeners link and I could not come up with a working solution. I'll try to dig a little more into the listerners documentation and see if I come up with something

g7ed6e commented 7 months ago

I updated #3882 with support for both project connection (one producer and one consumer) and container connection (a KafkaUI container) using two different endpoints declared in KafkaServerResource.

mitchdenny commented 6 months ago

I'm just catching up on this. Does this mean we are going to need a custom WithReference for Kafka to make sure that it wires up correctly?

g7ed6e commented 6 months ago

For the time being yes. Both Kafka and Schema registry containers need to know each other. Client side schema registry has a dedicated client from which we can pull serializer and deserializer. Those serializer / deserializer are then passed to the kafka client.

Compufreak345 commented 4 months ago

Thank you for your answers! Im glad it sparked an interest :) I checked your solution @davidfowl and indeed it works, the schema registry can connect to the broker. But the api can no longer connect to the broker πŸ€·β€β™‚οΈ I need to have 2 different setting for the PLAINTEXT_HOST part of the KAFKA_ADVERTISED_LISTENERS variable: this is needed for the schema registry: PLAINTEXT_HOST://{selfEndpoint.ContainerHost}:{selfEndpoint.Property(EndpointProperty.Port)} and this is needed for the api: PLAINTEXT_HOST://localhost:{selfEndpoint.Property(EndpointProperty.Port)}

I have looked at the networking guide for Aspire previously, and I graced through the kafka-listeners link and I could not come up with a working solution. I'll try to dig a little more into the listerners documentation and see if I come up with something

Hey, I found a solution which works (some configuration enhancement to what @davidfowl did already provide) :) Give this a shot:

// Add the kafka broker
IResourceBuilder<ContainerResource> broker = builder.AddContainer("broker", "confluent-local")
    .WithImageRegistry("confluentinc")
    .WithEndpoint(name: "internal", targetPort: 9092) // Endpoint for docker-internal communication (schemaregistry)
    .WithEndpoint(name: "primary", targetPort: 9093) // Endpoint for docker-external communication (applications running on localhost)
    .WithHttpEndpoint(name: "rest-api", targetPort: 8082) // Confluent REST-API 
    .WithEndpoint(targetPort: 9101);

// Add the schema registry
IResourceBuilder<ContainerResource> schemaRegistry = builder.AddContainer("schema-registry", "cp-schema-registry")
    .WithImageRegistry("confluentinc")
    .WithEndpoint(name: "primary", targetPort: 8081);

broker.WithEnvironment(context =>
{
    var internalEndpoint = broker.GetEndpoint("internal");
    var primaryEndpoint = broker.GetEndpoint("primary");

    // Set the advertised listeners to the public port
    var advertisedListeners = builder.ExecutionContext.IsRunMode
        // This is a workaround for https://github.com/dotnet/aspire/issues/3735
        ? ReferenceExpression.Create(
            $"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://{internalEndpoint.ContainerHost}:{internalEndpoint.Property(EndpointProperty.Port)},PLAINTEXT_LOCALHOST://localhost:{primaryEndpoint.Property(EndpointProperty.Port)}")
        : ReferenceExpression.Create(
            $"PLAINTEXT://{internalEndpoint.Property(EndpointProperty.Host)}:29092,PLAINTEXT_HOST://{internalEndpoint.Property(EndpointProperty.Host)}:{internalEndpoint.Property(EndpointProperty.Port)},PLAINTEXT_LOCALHOST://localhost:{primaryEndpoint.Property(EndpointProperty.Port)}");

    context.EnvironmentVariables["KAFKA_ADVERTISED_LISTENERS"] = advertisedListeners;
    context.EnvironmentVariables["KAFKA_LISTENERS"] =
       "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_LOCALHOST://0.0.0.0:9093";
    context.EnvironmentVariables["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] =
        "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_LOCALHOST:PLAINTEXT";

    // Set the URL to the schema registry
    context.EnvironmentVariables["KAFKA_SCHEMA_REGISTRY_URL"] = schemaRegistry.GetEndpoint("primary");

});

schemaRegistry.WithEnvironment(context =>
{
    // Get the broker endpoint
    var brokerEndpoint = broker.GetEndpoint("internal");

    // Create a reference to the broker endpoint, doing it this way will ensure that it works in both run and publish mode
    var brokerConnectionReference = builder.ExecutionContext.IsRunMode
        // This is a workaround for https://github.com/dotnet/aspire/issues/3735
        ? ReferenceExpression.Create($"{brokerEndpoint.ContainerHost}:{brokerEndpoint.Property(EndpointProperty.Port)}")
        : ReferenceExpression.Create(
            $"{brokerEndpoint.Property(EndpointProperty.Host)}:{brokerEndpoint.Property(EndpointProperty.Port)}");

    context.EnvironmentVariables["SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS"] = brokerConnectionReference;
    context.EnvironmentVariables["SCHEMA_REGISTRY_HOST_NAME"] = "schema-registry";
});

This article helped me a lot to understand what's going on :)

davidfowl commented 2 months ago

We're making a change to enable a default docker network so container to container communications work. I'm not 100% sure if it solves this problem with the schema registry, but once the change is in, it will be worth trying again at this issue.

davidfowl commented 1 month ago

@g7ed6e When you get a chance, can you see if you can make this work with the new changes in 9.0 to support a default container network?

t03apt commented 1 month ago

Tbh I don't fully understand the conditional ReferenceExpression statements so I simplified it and defined ports upfront. In case anyone interested I share it here:

var publicPlainTextHostPort = FindFreePort();
var publicPlainTextInternalPort = FindFreePort();
var publicSchemaRegistryPort = FindFreePort();
var publicRestApiPort = FindFreePort();
var publicKafkaUIPort = FindFreePort();
var publicAKHQPort = FindFreePort();

var kafka = builder.AddContainer("kafka", "confluent-local", "7.7.1")
    .WithImageRegistry("confluentinc")
    .WithEndpoint(name: "primary", targetPort: 9092, port: publicPlainTextHostPort)
    .WithEndpoint(name: "internal", targetPort: 9093, port: publicPlainTextInternalPort)
    .WithHttpEndpoint(name: "restapi", targetPort: 8082, port: publicRestApiPort)
    .WithEnvironment("KAFKA_ADVERTISED_LISTENERS", $"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{publicPlainTextHostPort},PLAINTEXT_INTERNAL://host.docker.internal:{publicPlainTextInternalPort}".ToString())
    .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT")
    .WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093");

builder.AddContainer("schema-registry", "cp-schema-registry", "7.7.1")
    .WithImageRegistry("confluentinc")
    .WithHttpEndpoint(targetPort: 8081, port: publicSchemaRegistryPort)
    .WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", "localhost")
    .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", $"host.docker.internal:{publicPlainTextInternalPort}".ToString())
    .WithEnvironment("SCHEMA_REGISTRY_LISTENERS", $"http://0.0.0.0:8081");

builder.AddContainer("kafka-ui", "kafka-ui", "v0.7.2")
    .WithImageRegistry("provectuslabs")
    .WithHttpEndpoint(targetPort: 8080, port: publicKafkaUIPort)
    .WithEnvironment("KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS", $"host.docker.internal:{publicPlainTextInternalPort}".ToString())
    .WithEnvironment("KAFKA_CLUSTERS_0_NAME", "kafka")
    .WithEnvironment("LOGGING_LEVEL_ROOT", "debug");

var akhqConfiguration = $@"akhq:
  connections:
    docker-kafka-server:
      properties:
        bootstrap.servers: ""host.docker.internal:{publicPlainTextInternalPort}""
      schema-registry:
        url: ""http://host.docker.internal:{publicSchemaRegistryPort}""";

builder.AddContainer("akhq", "akhq", "dev")
    .WithImageRegistry("tchiotludo")
    .WithHttpEndpoint(targetPort: 8080, port: publicAKHQPort)
    .WithEnvironment("AKHQ_CONFIGURATION", akhqConfiguration);

static int FindFreePort()
{
    TcpListener? l = null;
    try
    {
        l = new TcpListener(IPAddress.Loopback, 0);
        l.Start();
        return ((IPEndPoint)l.LocalEndpoint).Port;
    }
    catch (Exception)
    {
        // ignore
    }
    finally
    {
        l?.Stop();
        l?.Dispose();
    }

    throw new InvalidOperationException("Unable to find free port");
}

I use a very similar setup for integration tests with Testcontainers.

See: Testcontainers sample ```csharp public async Task StartAsync(string sessionName, CancellationToken cancellationToken, bool withAKHQ = false, bool withKafkaUI = false) { var publicPlainTextHostPort = FindFreePort(); var publicPlainTextInternalPort = FindFreePort(); var kafkaContainer = new ContainerBuilder() .WithImage("confluentinc/confluent-local:7.7.1") .WithName($"confluent-local-{sessionName}") .WithPortBinding(publicPlainTextHostPort, 9092) .WithPortBinding(publicPlainTextInternalPort, 9093) .WithEnvironment("KAFKA_ADVERTISED_LISTENERS", $"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{publicPlainTextHostPort},PLAINTEXT_INTERNAL://host.docker.internal:{publicPlainTextInternalPort}") .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT") .WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093") .Build(); _containers.Add(kafkaContainer); await kafkaContainer.StartAsync(cancellationToken); KafkaConnectionString = new UriBuilder("PLAINTEXT", kafkaContainer.Hostname, kafkaContainer.GetMappedPublicPort(9092)).ToString(); var schemaRegistryContainer = new ContainerBuilder() .WithImage("confluentinc/cp-schema-registry:7.7.1") .WithName($"cp-schema-registry-{sessionName}") .DependsOn(kafkaContainer) .WithPortBinding(8081, true) .WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", "localhost") .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", $"host.docker.internal:{kafkaContainer.GetMappedPublicPort(9093)}") .WithEnvironment("SCHEMA_REGISTRY_LISTENERS", $"http://0.0.0.0:8081") .Build(); _containers.Add(schemaRegistryContainer); await schemaRegistryContainer.StartAsync(cancellationToken); if (withAKHQ) { var akhqConfiguration = $@"akhq: connections: docker-kafka-server: properties: bootstrap.servers: ""host.docker.internal:{kafkaContainer.GetMappedPublicPort(9093)}"" schema-registry: url: ""http://host.docker.internal:{schemaRegistryContainer.GetMappedPublicPort(8081)}"""; var akhqContainer = new ContainerBuilder() .WithImage("tchiotludo/akhq:dev") .WithName($"akhq-{sessionName}") .WithEnvironment("AKHQ_CONFIGURATION", akhqConfiguration) .WithPortBinding(8080, true) .Build(); _containers.Add(akhqContainer); await akhqContainer.StartAsync(cancellationToken); } if (withKafkaUI) { var kafkaUiContainer = new ContainerBuilder() .WithImage("provectuslabs/kafka-ui:v0.7.2") .WithName($"kafka-ui-{sessionName}") .WithPortBinding(8080, true) .WithEnvironment("KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS", $"host.docker.internal:{kafkaContainer.GetMappedPublicPort(9093)}") .WithEnvironment("KAFKA_CLUSTERS_0_NAME", "kafka") .WithEnvironment("LOGGING_LEVEL_ROOT", "debug") .Build(); _containers.Add(kafkaUiContainer); await kafkaUiContainer.StartAsync(cancellationToken); } } ```
davidfowl commented 1 month ago

A couple of thoughts:

  1. The reference expression exists to support capturing the expression information without using real values (it's a string with placeholders). This is important if you want it to work outside of local development because we can replace the host and port with the appropriate values in the right environment.
  2. Dynamic port allocation is done by the infrastructure if you don't specify a port, so that code isn't needed.
  3. Ideally, we wouldn't need to hardcode host.docker.internal, that's what the ContainerHost property is for.
  4. With Aspire 9, we have a default container network, so it should be possible to use container host names and the target port to do container to container communication.