confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
66 stars 861 forks source link

Produce/ProduceAsync never returns #1057

Open sershe84 opened 5 years ago

sershe84 commented 5 years ago

Description

I have an issue on 1.1 where in a regular application ProduceAsync call never returns, and Produce call never calls its callback, despite the messages being delivered to the topic with no issues. If I keep calling Produce with new messages without regard for that, eventually they fail with local queue full error (again, despite the messages being sent and visible to a consumer of the topic). I've tried numerous workarounds including ConfigureAwait, explicitly enabling config settings, explicitly calling Poll (included here), not calling poll, to no avail. The error callback is also not invoked. I'm assuming this works for someone... not sure what in my env may be causing this. The code is below. The relevant anonymized log; the full log FWIW is all2.log

2nd column is thread ID.


13:30:30,28228,Sending 3898 bytes"
13:30:30,28228,[rdkafka#producer-1] Debug TOPIC : [thrd:app]: New local topic: <topic>"
13:30:30,28228,[rdkafka#producer-1] Debug TOPPARNEW : [thrd:app]: NEW <topic> [-1] 000001F8661D4ED0 (at rd_kafka_topic_new0:325)"
13:30:30,28228,Before wait"
13:30:31,7392,[rdkafka#producer-1] Debug NOINFO : [thrd:main]: Topic <topic> partition count is zero: should refresh metadata"
...
13:30:32,7392,[rdkafka#producer-1] Debug PARTCNT : [thrd:main]: Topic <topic> partition count changed from 0 to 4"
... [brief broker unavailablily appears to be a red herring; the same happens when it can immediately reach it]
13:30:33,12920,[rdkafka#producer-1] Debug TOPPAR : [thrd:<ip.36>/2007]: <ip.36>/2007: <topic> [0] 1+0 msgs"
13:30:33,12920,[rdkafka#producer-1] Debug PRODUCE : [thrd:<ip.36>/2007]: <ip.36>/2007: <topic> [0]: Produce MessageSet with 1 message(s) (3932 bytes, ApiVersion 2, MsgVersion 1)"
13:30:33,12920,[rdkafka#producer-1] Debug TOPPAR : [thrd:<ip.36>/2007]: <ip.36>/2007: <topic> [0] 0+0 msgs"
13:30:33,12920,[rdkafka#producer-1] Debug SEND : [thrd:<ip.36>/2007]: <ip.36>/2007: Sent ProduceRequest (v2, 4008 bytes @ 0, CorrId 2)"
13:30:33,12920,[rdkafka#producer-1] Debug TOPPAR : [thrd:<ip.36>/2007]: <ip.36>/2007: <topic> [0] 0+0 msgs"
13:30:33,12920,[rdkafka#producer-1] Debug RECV : [thrd:<ip.36>/2007]: <ip.36>/2007: Received ProduceResponse (v2, 67 bytes, CorrId 2, rtt 0.00ms)"
13:30:33,12920,[rdkafka#producer-1] Debug MSGSET : [thrd:<ip.36>/2007]: <ip.36>/2007: <topic> [0]: MessageSet with 1 message(s) delivered"
13:30:33,12920,[rdkafka#producer-1] Debug TOPPAR : [thrd:<ip.36>/2007]: <ip.36>/2007: <topic> [0] 0+0 msgs"
13:30:40,28228,Calling poll"
... [nothing except TOPPAR 0+0 logging for all partitions]
13:31:10,28228,After poll"
... [same]

How to reproduce

Confluent.Kafka.1.1.0 librdkafka.redist.1.1.0 Windows Server 2016 Datacenter .NET Framework 4.7.2 (461814) C# console app

This runs in background thread in my case (Thread t = new Thread(DoWork); t.IsBackground = true; t.Start();) Previously it used to run in a complex async context. Next I'm going to try it w/o the thread, it will take some time.

I can't share the message generation code, but it's byte[] so anything should suffice. Size is above in the log. Setting the key of the message also doesn't help.


        var kafkaConf = new ProducerConfig
        {
            BootstrapServers = brokerList,
            MessageMaxBytes = 768 * 1024 * 1024,
            EnableDeliveryReports = true,
            EnableBackgroundPoll = true,
        };
        kafkaConf.Debug = "all";
        var pb = new ProducerBuilder<byte[], byte[]>(kafkaConf);
        pb.SetErrorHandler((_, e) => LogDebug($"Kafka error from callback {e}"));
        pb.SetLogHandler((_, e) => LogDebug($"[{e.Name}] {e.Level} {e.Facility} : {e.Message}"););

        var kafka = pb.Build();

        byte[] event = ...;

        var msg = new Message<byte[], byte[]>
        {
            Value = event
        };
        DeliveryResult<byte[], byte[]> result = null;
        try
        {
            LogDebug($"Sending {msg.Value.Length} bytes");
            var task = kafka.ProduceAsync(topic, msg);

            LogDebug($"Before wait");
            if (!task.Wait(TimeSpan.FromSeconds(10)))
            {
                LogDebug($"Calling poll");
                kafka.Poll(TimeSpan.FromSeconds(30));
                LogDebug($"After poll");
            }
            result = task.Result;
        }
        finally
        {
            LogDebug($"After {result}"); // never happens
        }

Checklist

Please provide the following information:

sershe84 commented 5 years ago

Update: same difference from Main()

mhowlett commented 5 years ago

it may be related to this: https://rmoff.net/2018/08/02/kafka-listeners-explained/

sershe84 commented 5 years ago

The issue turned out to be caused by incorrect version of librdkafka replacing the correct one from some unfortunate unrelated legacy dependency. Replacing the native dll with the correct 1.1 one resolved the issue. This is on x64 versions fwiw. Not sure how viable it is to improve the handling of such cases. I think an explicit version check API can be added if the consequences of the incompatibility are so bad (and hard to debug ;)).

rsaltrelli commented 4 years ago

@sershe84 I'm observing this same symptom (Produce() and ProduceAsync() never returning). Can you elaborate on how you fixed the problem? I'm just referencing the Confluent.Kafka v1.2.1 NuGet package on x64 Windows 10 so I'm not sure what or how I messed up.

sershe84 commented 4 years ago

@sershe84 I'm observing this same symptom (Produce() and ProduceAsync() never returning). Can you elaborate on how you fixed the problem? I'm just referencing the Confluent.Kafka v1.2.1 NuGet package on x64 Windows 10 so I'm not sure what or how I messed up.

See the above. Confluent.Kafka is a wrapper around the native Kafka library; make sure that the versions of both that you are using is exactly the same. You have to be deploying some version of librdkafka and dlls that come with it, otherwise the managed library would error out before doing anything... in my case, this version was an incorrect one. Replacing it with the correct one (same package version) worked.

mhowlett commented 4 years ago

we should start enforcing that binding version == librdkafka version. it's a feature not to, but causes more problems than good.

rsaltrelli commented 4 years ago

I'm not convinced that this is the cause of my problem. I'm referencing the following packages.

So everything appears to match. I've cleaned my solution, cleared my NuGet cache, and rebuilt. The symptom persists: Produce() never returns. Are there any other potential causes to explore?

mhowlett commented 4 years ago

Produce should always return immediately - there's no reason for it to block, i don't have any idea why it would. can you paste a minimal example demonstrating the problem?

you don't need to reference librdkafka.redist explicitly (though it won't hurt).

rsaltrelli commented 4 years ago

My app is able to communicate with the broker. During startup, as part of a sanity check, I call IAdminClient.GetMetadata(streamName, TimeSpan.FromSeconds(1)); which succeeds and creates the topic with the default configuration. So the DLLs appear to be loading correctly. There's something else in play here.

Below is the code that actually produces the message. I've tried both Produce() and ProduceAsync() and neither returns nor throws an exception in .NET Framework. Again, this works perfectly in .NET Core.

protected override bool ExecutePublish(Event<T> evnt)
{
    try
    {
        var message = new Message<string, T>
        {
            Key = evnt.Key,
            Value = evnt.Payload,
            Timestamp = new Timestamp(evnt.Timestamp),
            Headers = new Headers
            {
                new Header("Id", Encoding.UTF8.GetBytes(evnt.Id)),
                new Header("Origin", Encoding.UTF8.GetBytes(evnt.Origin))
            }
        };

        // Producer.Produce(_streamName, message);
        Producer.ProduceAsync(_streamName, message).Wait();

        return true;
    }
    catch(Exception ex)
    {
        _logger.LogError(ex, "Exception occurred.");
        return false;
    }
}

public IProducer<string, T> CreateProducer<T>()
{
    _producerSchemaRegistry = _producerSchemaRegistry ?? CreateProducerSchemaRegistry();
     return new ProducerBuilder<string, T>(_config.Publisher.Kafka)
        .SetKeySerializer(new AvroSerializer<string>(_producerSchemaRegistry)/*.AsSyncOverAsync()*/)
        .SetValueSerializer(new AvroSerializer<T>(_producerSchemaRegistry)/*.AsSyncOverAsync()*/)
        .Build();
}
rsaltrelli commented 4 years ago

UPDATE: I created a new .NET Framework 4.6.2 console application from scratch that produces to Kafka and it works fine. I'm now working through the steps to figure out what is different between the console app and my web app.

rsaltrelli commented 4 years ago

UPDATE: I've been able to narrow it down to calling IProducer.Produce() from within a ASP.NET MVC controller action. Under those circumstances, Produce() will never return. Calling it in the same application but not within a controller action (e.g. in Global.asax.cs) returns successfully.

using Confluent.Kafka;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.Mvc;

namespace WebApplication1.Controllers
{
    public class HomeController : Controller
    {
        public ActionResult Index()
        {
            var payload = new Redacted
            {
                // Redacted
            };
            var message = new Message<string, Redacted>
            {
                Key = "key",
                Value = payload
            };

            var schemaRegistryConfig = new Dictionary<string, string>
            {
                { "schema.registry.url", "http://redacted:8081" },
            };
            var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
            var producerConfig = new Dictionary<string, string>
            {
                { "bootstrap.servers", "redacted:29092" },
                { "debug", "all" },
                { "log_level", "7" }
            };
            var producer = new ProducerBuilder<string, Redacted>(producerConfig)
                .SetKeySerializer(new AvroSerializer<string>(schemaRegistry).AsSyncOverAsync())
                .SetValueSerializer(new AvroSerializer<Redacted>(schemaRegistry).AsSyncOverAsync())
                .Build();

            producer.Produce(typeof(Redacted).FullName, message);

            return new EmptyResult();
        }
    }
}

Converting the controller action to be async and calling ProduceAsync() does return, however.

using Confluent.Kafka;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.Mvc;

namespace WebApplication1.Controllers
{
    public class HomeController : Controller
    {
        public async Tas<ActionResult> Index()
        {
            var payload = new Redacted
            {
                // Redacted
            };
            var message = new Message<string, Redacted>
            {
                Key = "key",
                Value = payload
            };

            var schemaRegistryConfig = new Dictionary<string, string>
            {
                { "schema.registry.url", "http://redacted:8081" },
            };
            var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
            var producerConfig = new Dictionary<string, string>
            {
                { "bootstrap.servers", "redacted:29092" },
                { "debug", "all" },
                { "log_level", "7" }
            };
            var producer = new ProducerBuilder<string, Redacted>(producerConfig)
                .SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
                .SetValueSerializer(new AvroSerializer<Redacted>(schemaRegistry))
                .Build();

            await producer.ProduceAsync(typeof(Redacted).FullName, message);

            return new EmptyResult();
        }
    }
}

It seems like something within Produce() that handles the sync over async isn't playing nicely with ASP.NET MVC.

rsaltrelli commented 4 years ago

UPDATE: I don't have the ability to make the controller action async all the way down the stack (that would be too much refactoring given my other constraints) so I have to get sync over async to work. This appears to do the trick. After talking to some colleagues, this appears to be a common problem in ASP.NET MVC and not unique to the Confluent.Kafka package.

using Confluent.Kafka;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Web;
using System.Web.Mvc;

namespace WebApplication1.Controllers
{
    public class HomeController : Controller
    {
        public ActionResult Index()
        {
            var payload = new Redacted
            {
                // Redacted
            };
            var message = new Message<string, Redacted>
            {
                Key = "key",
                Value = payload
            };

            var schemaRegistryConfig = new Dictionary<string, string>
            {
                { "schema.registry.url", "http://redacted:8081" },
            };
            var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
            var producerConfig = new Dictionary<string, string>
            {
                { "bootstrap.servers", "redacted:29092" },
                { "debug", "all" },
                { "log_level", "7" }
            };
            var producer = new ProducerBuilder<string, Redacted>(producerConfig)
                .SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
                .SetValueSerializer(new AvroSerializer<Redacted>(schemaRegistry))
                .Build();

            Task.Run(() => producer.ProduceAsync(typeof(Redacted).FullName, message)).ConfigureAwait(false).GetAwaiter().GetResult();

            return View();
        }
    }
}
azam123 commented 4 years ago

UPDATE: I've been able to narrow it down to calling IProducer.Produce() from within a ASP.NET MVC controller action. Under those circumstances, Produce() will never return. Calling it in the same application but not within a controller action (e.g. in Global.asax.cs) returns successfully.

using Confluent.Kafka;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.Mvc;

namespace WebApplication1.Controllers
{
    public class HomeController : Controller
    {
        public ActionResult Index()
        {
            var payload = new Redacted
            {
                // Redacted
            };
            var message = new Message<string, Redacted>
            {
                Key = "key",
                Value = payload
            };

            var schemaRegistryConfig = new Dictionary<string, string>
            {
                { "schema.registry.url", "http://redacted:8081" },
            };
            var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
            var producerConfig = new Dictionary<string, string>
            {
                { "bootstrap.servers", "redacted:29092" },
                { "debug", "all" },
                { "log_level", "7" }
            };
            var producer = new ProducerBuilder<string, Redacted>(producerConfig)
                .SetKeySerializer(new AvroSerializer<string>(schemaRegistry).AsSyncOverAsync())
                .SetValueSerializer(new AvroSerializer<Redacted>(schemaRegistry).AsSyncOverAsync())
                .Build();

            producer.Produce(typeof(Redacted).FullName, message);

            return new EmptyResult();
        }
    }
}

Converting the controller action to be async and calling ProduceAsync() does return, however.

using Confluent.Kafka;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.Mvc;

namespace WebApplication1.Controllers
{
    public class HomeController : Controller
    {
        public async Tas<ActionResult> Index()
        {
            var payload = new Redacted
            {
                // Redacted
            };
            var message = new Message<string, Redacted>
            {
                Key = "key",
                Value = payload
            };

            var schemaRegistryConfig = new Dictionary<string, string>
            {
                { "schema.registry.url", "http://redacted:8081" },
            };
            var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
            var producerConfig = new Dictionary<string, string>
            {
                { "bootstrap.servers", "redacted:29092" },
                { "debug", "all" },
                { "log_level", "7" }
            };
            var producer = new ProducerBuilder<string, Redacted>(producerConfig)
                .SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
                .SetValueSerializer(new AvroSerializer<Redacted>(schemaRegistry))
                .Build();

            await producer.ProduceAsync(typeof(Redacted).FullName, message);

            return new EmptyResult();
        }
    }
}

It seems like something within Produce() that handles the sync over async isn't playing nicely with ASP.NET MVC.

Hi ,Thanks for sharing the solution, btw it worked for me when i made its async call end to end from controller to the producer.

mhowlett commented 4 years ago

you shouldn't create a producer in the web handler - it's very expensive and very slow. Instead, create a singleton instance on startup (you'll probably need to make a wrapper) and use that. we should do an example...

if it is producer.Produce(typeof(Redacted).FullName, message); is blocking, it would be due to the HTTP call to schema registry that happens in the avro serializer. possibly something to do with sync over async. I would try and narrow the problem down to definitely being that be using the schema registry client directly (and blocking on the result).

jcasement commented 4 years ago

I'm facing the same issue but for me I've been using a WebApi controller rather than a MVC controller which I think is in the example above. Would anyone care to provide a sample for converting to an async call from end to end from controller to ProduceAsync ? Any help is greatly appreciated!! Just so it's clear, I inherit from : ApiController and have POST like the following:


    [RoutePrefix("api")]
    public class KafkaController : ApiController
    {        
        IKafkaProducerRepository _repository;        

        public KafkaController()  
        {
            _repository = new KafkaProducerRepository();

        }

        [Route("RMT")]
        /// POST -- Used for transmitting to a topic on Kafka.        
        public  IHttpActionResult Post([FromBody]IEnumerable<RMTtopic> RMTList)
        {
            try
            {
                if (RMTList == null)
                {
                    return BadRequest();
                }               
                var result = _repository.POSTtopic(RMTList);

                if (result.Status == RepositoryActionStatus.Created || result.Status == RepositoryActionStatus.NothingModified)
                {
                    KafkaRepository.Repository.Entities.RootSearchObject returnRoot = new KafkaRepository.Repository.Entities.RootSearchObject();
                    KafkaRepository.Repository.Entities.RMTtopicdata returnPolicy = new KafkaRepository.Repository.Entities.RMTtopicdata();

                    foreach (PostTopicResult p in result.Entity)
                    {
                        var policies = p.RMTtopicdata;
                        returnRoot.result = p.result;
                        returnRoot.additionalDetail = p.additionalDetail;

                        foreach (RMTtopic ps in policies)
                            returnPolicy.rmtTopic.Add(ps);                        
                    }
                    returnRoot.policySearchData = returnPolicy;

                    return Ok(returnRoot);
                }

                return BadRequest();

            }
            catch (Exception)
            {
                return InternalServerError();
            }
        }
    }
anchitj commented 4 months ago

Is this still an issue with the latest version?