dotnetcore / CAP

Distributed transaction solution in micro-service base on eventually consistency, also an eventbus with Outbox pattern
http://cap.dotnetcore.xyz
MIT License
6.67k stars 1.28k forks source link

Kafka subscribing #744

Closed KovtunV closed 3 years ago

KovtunV commented 3 years ago

Hi, when I use kafka can't subscribe a topic.

I have two webApi projects

  1. webApi1: ICapPublisher.Publish("testTopic", some object);
  2. webApi2:
    public class TestHandler : ICapSubscribe 
    {
        [CapSubscribe("testTopic")]
        public Task<ResponseModel> HandleAsync(RequestModel model)
        {
            return Task.FromResult(new ResponseModel(model.Age * 100));
        }
    } 

Topic name should be unique, due to the fact that if I don't get a message in webApi2 and then restart a project, it will work. With RabbitMQ it works fine whatever I do. Is it a bug or I use kafka incorrectly?

yang-xiaodong commented 3 years ago

Hello, I can't reproduce your issue. Do you have this error log at first time create topic?

fail: DotNetCore.CAP.Internal.ConsumerRegister[0]
      Broker: Unknown topic or partition
      Confluent.Kafka.ConsumeException: Broker: Unknown topic or partition
         at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
         at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)
         at DotNetCore.CAP.Kafka.KafkaConsumerClient.Listening(TimeSpan timeout, CancellationToken cancellationToken)
         at DotNetCore.CAP.Internal.ConsumerRegister.<>c__DisplayClass17_0.<Start>b__0()
KovtunV commented 3 years ago

Yes, I have this:

DotNetCore.CAP.Internal.ConsumerRegister: Error: Broker: Unknown topic or partition

Confluent.Kafka.ConsumeException: Broker: Unknown topic or partition
   at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
   at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)
   at DotNetCore.CAP.Kafka.KafkaConsumerClient.Listening(TimeSpan timeout, CancellationToken cancellationToken)
   at DotNetCore.CAP.Internal.ConsumerRegister.<>c__DisplayClass17_0.<Start>b__0()

It happens for new topic name where I haven't sent message yet. Like I've written in first point: "If I start webApi2 and then run webApi1 with publishing testTopic, I won't get a message."

yang-xiaodong commented 3 years ago

This is a known issue, please refer to: https://github.com/confluentinc/confluent-kafka-dotnet/issues/1366