doujiang24 / lua-resty-kafka

Lua kafka client driver for the Openresty based on the cosocket API
BSD 3-Clause "New" or "Revised" License
801 stars 274 forks source link

"buffered messages send to kafka err: closed" when using a dynamic key #116

Open samcrawford opened 2 years ago

samcrawford commented 2 years ago

Hi. I'm using this module on a reasonably busy web server (20-100 requests per second) in order to write a log of source IPs and timestamps into Kafka (Confluent Cloud).

If I write all of the messages with a single key (e.g. "a") then everything works fine, except that all of the messages end up in a single partition in Kafka. This will cause issues for our consuming application as we scale up.

So we tried to use a dynamic key, which I understand from the source code is hashed and then used in a modulo operation to determine the destination partition. I tried using key = ngx.var.remote_addr as I figured that would give a reasonably even distribution.

However, as soon as I enable this configuration, I start to see logs of these errors in the log:

2021/12/13 18:30:07 [error] 1145636#1145636: *101851722 [lua] producer.lua:271: buffered messages send to kafka err: closed, retryable: true, topic: connections, partition_id: 0, length: 2, context: ngx.timer, client: 1.2.3.4, server: 0.0.0.0:443

The error is always for the same partition ID. If I delete the topic on the Kafka server, and recreate it, then the error reappears but for a different partition ID. I see messages successfully go into all of the other partitions, but not the one reporting the error. This suggests to me an off-by-one error somewhere.

My full configuration is below. As soon as I change key = ngx.var.remote_addr to key = "a" then everything works perfectly with no errors (but we have the undesirable behaviour that all messages go into a single partition).

My configuration:

local cjson = require "cjson"
local producer = require "resty.kafka.producer"
local broker_list = {
    {
          host = "xxxxxxx.confluent.cloud",
          port = 12345,
          sasl_config = {
                mechanism = "PLAIN",
                user = "XXXXXXXXXXXX",
                password = "XXXXXXXXXXXXXXXXXX",
          },
    },
}

-- As soon as I change this to local key = "a"
-- then everything works fine, but we only use a single partition
local key = ngx.var.remote_addr

local message = cjson.encode({ device_id = tonumber(device_id), timestamp = ngx.time(), public_ip = ngx.var.remote_addr })

local bp = producer:new(broker_list, { ssl = true, ssl_verify = false, socket_timeout = 10000, request_timeout = 5000, producer_type = "async" })

local ok, err = bp:send("connections", key, message)
if not ok then
    ngx.log(ERR,"Failed to write to Kafka: ",err)
end
samcrawford commented 2 years ago

The error only occurs when the topic has >= 10 partitions. Fewer than 10 partitions and everything is fine.

doujiang24 commented 2 years ago

However, as soon as I enable this configuration, I start to see logs of these errors in the log:

2021/12/13 18:30:07 [error] 1145636#1145636: *101851722 [lua] producer.lua:271: buffered messages send to kafka err: closed, retryable: true, topic: connections, partition_id: 0, length: 2, context: ngx.timer, client: 1.2.3.4, server: 0.0.0.0:443

Have you seen any other errors? this error message doesn't look useful.

samcrawford commented 2 years ago

No other errors at all I'm afraid. The error was 100% reproducible when the topic had >=10 partitions.