confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
60 stars 862 forks source link

Producer throws error message `Broker: Message size too large` when the event payload size is less than the max max.message.bytes configuration on the topic #2214

Open vikasillumina opened 5 months ago

vikasillumina commented 5 months ago

Description

Producer throws error messageBroker: Message size too large when the event payload size is less than the max.message.bytes configuration on the topic

How to reproduce

We have not been able to reproduce this issue and from the investigation we have concluded that the event payload we are trying to publish to Kafka is no where near the max.message.bytes configuration on the topic. But here is the sample code:

 public virtual async Task<DeliveryResult<string, byte[]>[]> PublishMessagesAsync(IEnumerable<Message> messages,
            Action<MessageDelivery<string, byte[]>> onDelivery,
            CancellationToken cancellationToken = default(CancellationToken))
        {        
            var tasks = new List<Task<DeliveryResult<string, byte[]>>>();
            foreach (var m in messages)
            {
                if (m == null)
                {
                    continue;
                }
                Task<DeliveryResult<string, byte[]>> task = null;

                var contentJson = JsonConvert.SerializeObject(m.ContentObject, new JsonSerializerSettings
                {
                    DateTimeZoneHandling = DateTimeZoneHandling.Utc,
                    NullValueHandling = NullValueHandling.Ignore,
                    ContractResolver = new Newtonsoft.Json.Serialization.DefaultContractResolver
                    {
                        NamingStrategy = new Newtonsoft.Json.Serialization.CamelCaseNamingStrategy()
                    }
                });

                var contentBytes = Encoding.Default.GetBytes(contentJson);

                if (m.ContentBytes != null && m.ContentBytes.Length > 0)
                {
                    task = PublishRawAsync(m.KafkaMessageMetadata?.Topic, m.ContentBytes, m.Headers, m.KafkaMessageMetadata?.Key, cancellationToken);
                }

                else
                {
                    throw new ArgumentException("message has no content populated");
                }

                if (onDelivery != null)
                {
                    task = task.ContinueWith(async continuation =>
                    {
                        var deliveryReport = await continuation;
                        onDelivery(new MessageDelivery<string, byte[]>(m, deliveryReport));
                        return deliveryReport;
                    }, cancellationToken).Unwrap();
                }

                tasks.Add(task);
            }

            return await Task.WhenAll(tasks);
        }

public async Task<DeliveryResult<string, byte[]>> PublishRawAsync(string topic, byte[] content, HeadersWrapper messageHeaders, string messageKey = null, CancellationToken cancellationToken = default(CancellationToken))
        {
            if (string.IsNullOrEmpty(topic))
            {
                throw new ArgumentNullException(nameof(topic), "Topic must be provided");
            }
            if (content == null || content.Length == 0)
            {
                throw new ArgumentException("content is null or empty", nameof(content));
            }

            if (string.IsNullOrEmpty(messageHeaders.producedby))
            {
                messageHeaders.producedby = _serviceName;
            }

            try
            {
                var message = new Message<string, byte[]>()
                {
                    Value = content,
                    Headers = CopyToKafkaHeaders(messageHeaders),
                    Key = messageKey,
                    Timestamp = new Timestamp(DateTime.UtcNow)
                };

                var dr = await _producer.ProduceAsync(topic, message);
                return dr;
            }
            catch(Exception ex)
            {
                _logger.Error(ex, $"An error occured publishing event to topic {topic}.");
                throw;
            }
        }

CLIENT CONFIGURATION

Producer configuration: bootstrap.servers = AWS broker url (3 brokers across 3 AZ) client.id = service name message.max.bytes = 4194304 (4MB) (its a common library setting, however the topic itself has 1MB limit on the message.max.bytes, please see below screenshot under Topic configuration

Topic configuration: image

OPERATING SYSTEM:

Application runs in docker container hosted as Kubernetes pod inside Linux OS, below is more specifics of the OS PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/" 5.10.205-195.807.amzn2.x86_64

PRODUCER LOGS:

Broker: Message size too large at Confluent.Kafka.Producer2.ProduceAsync(TopicPartition topicPartition, Message2 message, CancellationToken cancellationToken) This is the only error message we could get from the SDK.

Our application doesn't batch the events, it is only sending 1 event at a time via async Tasks. We do submit a lot of these tasks in parallel if that matters.

I looked through the SDK code but couldn't find more details that could explain this error other than the size. I searched through internet and didn't find any obvious suggestions/solutions.

Please let me know if there is anything else I can provide.

Thanks for your help in advance.

Checklist

Please provide the following information: