dapr / dotnet-sdk

Dapr SDK for .NET
Apache License 2.0
1.1k stars 328 forks source link

Publish message to Kafka topic (KRaft) with Dapr C# #1291

Open abdul-hafeel opened 2 months ago

abdul-hafeel commented 2 months ago

I'm new to Dapr as well as Kafka. Currently, for my .NET 8 desktop app, I want to implement Dapr with Kafka(KRaft), for that I'm working on POC, there I'm facing issues that:

'Dapr.DaprException: 'Publish operation failed: the Dapr endpoint indicated a failure'. ({"Status(StatusCode="Unavailable", Detail="Error connecting to subchannel.", DebugException="System.Net.Sockets.SocketException: No connection could be made because the target machine actively refused it.")"}).

docker-compose.yaml for Kafka image in Docker

`version: "3.8"

services: kafka: image: docker.io/bitnami/kafka:3.7 container_name: kafka ports:

volumes: kafka_data: driver: local`

Program.cs

using Dapr.Client;

class Program
{
    static async Task Main(string[] args)
    {
        var services = new ServiceCollection();
        ConfigureServices(services);
        var serviceProvider = services.BuildServiceProvider();
        var daprClient = serviceProvider.GetRequiredService<DaprClient>();

        var message = new HelloMessage { Text = "Hello, Kafka!" };
        await PublishMessageEvent(daprClient, message);
    }

    //const string topicName = "kafka-topic";

    const string PUBSUB_NAME = "kafka-pubsub";
    const string TOPIC_NAME = "kafka-topic";
    static async Task PublishMessageEvent(DaprClient daprClient, HelloMessage message)
    {
        try
        {
            await daprClient.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, message);

            Console.WriteLine("Event published successfully.");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error publishing event: {ex.Message}");
        }
    }
    static void ConfigureServices(IServiceCollection services)
    {
        string daprPort = Environment.GetEnvironmentVariable("DAPR_GRPC_PORT")!;
        var grpcEndpoint = $"http://localhost:{daprPort}";
        //var grpcEndpoint = $"http://localhost:52540";

        // var channel = GrpcChannel.ForAddress("http://localhost:5009/");
        // Register DaprClient instance using gRPC channel and DaprClientBuilder
        var daprClient = new DaprClientBuilder()
            .UseGrpcEndpoint(grpcEndpoint) // Specify the gRPC endpoint
            .Build();

        services.AddSingleton(daprClient);
    }
}

public class HelloMessage
{
    public string Text { get; set; } = default!;
}

=================================================

var channel = GrpcChannel.ForAddress("http://localhost:5009/");
new DaprClientBuilder()
            .UseGrpcEndpoint(channel) // Specify the gRPC endpoint
            .Build();

In the above line, it is accepted as a string, not a channel.

Kafka pubsub yaml (Dapr configuration)

`apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: kafka-pubsub namespace: default spec: type: pubsub.kafka version: v1 metadata:

Here I have attached tried code and configuration(s). I have turned off the firewall as well. Even though I am facing the same issue. Kindly help me out what I am missing here. Thanks in advance.

philliphoff commented 1 month ago

@abdul-hafeel My guess would be it's one of two things: