doujiang24 / lua-resty-kafka

Lua kafka client driver for the Openresty based on the cosocket API
BSD 3-Clause "New" or "Revised" License
801 stars 274 forks source link

consumer #110

Open ltanme opened 3 years ago

ltanme commented 3 years ago

`

--- Generated by EmmyLua(https://github.com/EmmyLua) --- Created by admin. --- DateTime: 2021/6/10 10:16

local cjson = require "cjson" local consumer = require "resty.kafka.consumer" local client = require "resty.kafka.client"

local broker_list = { { host = "172.20.154.101", port = 9092 }, { host = "172.20.154.103", port = 9092 }, { host = "172.20.154.104", port = 9092 }, }

local delay = 3 -- in seconds local check

check = function(premature) if not premature then local con = consumer:new(broker_list) --local offsets = { [0] = 15028, [1] = 35070 } --local messages, offsets = con:fetch("vlog-topic", offsets) local offsets, err = con:fetch_offset("vlog-topic")

    --if not err then
    --    ngx.log(ngx.ERR, "fetch err:", offsets)
    --    return
    --end

    --ngx.log(ngx.NOTICE,cjson.encode(err), cjson.encode(offsets))
    --return
    local offsets = { [0]=0,[1]=0,[2]=0,[3]=0,[4]=10,[5] = 10,[6]=0,[7]=0,[8]=0,[9]=0,} 

    ngx.log(ngx.NOTICE,"offsets----", cjson.encode(offsets))
    local messages, offsets = con:fetch("vlog-topic", offsets)
    if not messages then
        ngx.log(ngx.ERR, "fetch err:", offsets)
        return
    end

    ngx.log(ngx.NOTICE,"消息----",cjson.encode(messages))
end

end

if 0 == ngx.worker.id() then local ok, err = ngx.timer.every(delay, check) if not ok then ngx.log(ngx.ERR, "failed to create timer: ", err) return end end

`