doujiang24 / lua-resty-kafka

Lua kafka client driver for the Openresty based on the cosocket API
BSD 3-Clause "New" or "Revised" License
801 stars 274 forks source link

some msg lose with error buffered messages send to kafka err: not found topic #109

Open dean-river opened 3 years ago

dean-river commented 3 years ago

i use-resty-kafka-lua two recive http request and upload msg to kafka, i can recive some msg in kafka, but not all. some is lost. nginx error.log report some error like bellow (i change nginx error log level to info) , when i use logstash http input plugin and kafka output plugin there is no msg loss. i don't know why!

2021/05/13 16:59:27 [error] 41485#0: *134288 [lua] producer.lua:272: buffered messages send to kafka err: not found topic, retryable: true, topic: dp_live_push_origin_quality_raw, partition_id: -1, length: 1, context: ngx.timer, client: 10.71.52.217, server: 0.0.0.0:8888

this is the way i use lua-resty-kafka, every request will use send_msg, dp_live_mcu_push_origin_quality_raw and dp_live_push_origin_quality_raw topic all have 300 brokers

` local _M = {} _M.topics_broker_list = { test = { { host = "127.0.0.1", port = 9092 }, },

dp_live_mcu_push_origin_quality_raw = {
    { host = "dataarch-kafka-h1.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h2.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h3.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h4.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h5.idczw.hb1.kwaidc.com", port = 9092},
},

dp_live_push_origin_quality_raw = {
    { host = "dataarch-kafka-h1.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h2.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h3.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h4.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h5.idczw.hb1.kwaidc.com", port = 9092},
},

}

_M.log_type_2_topic = { test = "test", mcu = "dp_live_mcu_push_origin_quality_raw", srs = "dp_live_push_origin_quality_raw", }

_M.log_type_2_log_id = { test = "test", mcu = "logUploadKafkaTime", srs = "logUploadKafkaTime", }

-- return 0 ok -- return 1 data error -- return 2 server error function _M.send_msg(json_data) local cjson = require "cjson" local producer = require "resty.kafka.producer" local log_type = json_data["@log_type"]

if not log_type then
    return 1, "@log_type not exits"
end

local topic = _M.log_type_2_topic[log_type]
if not topic then
    return 1, "@log_type not config"
end

local broker_list = _M.topics_broker_list[topic]
local bp = producer:new(broker_list, { producer_type = "async" })
local json_str = cjson.encode(json_data)
local ok, err = bp:send(topic, nil, json_str)

if not ok then
    ngx.log(ngx.ERR, "send err=", err, "msg=", json_str)
    return 2, "send kafka error"
end

return 0, "ok"

end

`