doujiang24 / lua-resty-kafka

Lua kafka client driver for the Openresty based on the cosocket API
BSD 3-Clause "New" or "Revised" License
804 stars 275 forks source link

Messages aren't dispatched to Kafka, but no error is shown. #2

Closed carminexx closed 9 years ago

carminexx commented 9 years ago

Hi, I tried you producer example, but it's not working. I tried to send a message, but kafka doesn't receive it. LUA gives me no errors, and both the "err" and the "resp" variable are nil. I'm sure I can connect to Kafka from that machine because I tested the connection with kafkacat and the sending of messages via CLI works.

doujiang24 commented 9 years ago

thanks for you try :) the sample code works fine on my side

would you please check the error.log file, there should be some valueable error log or try the last version, I fix some code

be sure you are using the producer, not the bufferproducer

bufferproducer may have a problem with the resty command-line in openresty Version 1.7.4.1, I will check this later

carminexx commented 9 years ago

Yes, I'm using the buffer producer; and the odd thing is that the error.log hasn't recorded any error or info log.

doujiang24 commented 9 years ago

sorry, I try many method, still can not reproduce your error on my side either

could you please provide your openresty version and the full nginx.conf & lua file?

doujiang24 commented 9 years ago

and the kafka version

I used openresty/1.7.4.1 and Kafka 0.8.1

carminexx commented 9 years ago

I'm using OpenResty 1.7.2.1 with LuaJIT; and an external Kafka server version 0.8.1.1

My routine is as follows:

function M.DispatchToKafka(jobname, workload)

    local cjson = require "cjson"
    local producer = require "resty.kafka.producer"

    local broker_list = {
        { host = config.GetKafkaIP(), port = config.GetKafkaPort() },
    }
    local messages = {
        workload
    }

    local p, err = producer:new(broker_list)

    local resp, err = p:send(jobname, messages)
    if not resp then
        ngx.say("send err:", err)
        return
    end
    return true
end

Note that .GetKafkaIP() returns a String, while .GetKafkaPort() an integer. Workload and Jobname are both strings.

gyllen commented 9 years ago

Yeah I am not getting the producer to work either. Awesome project though. I am very eager for this to be production ready.

doujiang24 commented 9 years ago

thanks, but I have try OpenResty 1.7.2.1 with LuaJIT and Kafka server 0.8.1.1 with your code, still works fine on my side

I have no idea what's the problem on your side, maybe you can try exactly the below sample, or step into the library code add some debug code to check where goes wrong if the below code still goes wrong

test.lua

local config = {}
config.GetKafkaIP = function ()
    return "127.0.0.1"
end
config.GetKafkaPort = function ()
    return 9092
end

local M = {}
function M.DispatchToKafka(jobname, workload)
    local cjson = require "cjson"
    local producer = require "resty.kafka.producer"

    local broker_list = {
        { host = config.GetKafkaIP(), port = config.GetKafkaPort() },
    }
    local messages = {
        workload
    }

    local p, err = producer:new(broker_list)
    local resp, err = p:send(jobname, messages)
    if not resp then
        ngx.say("send err:", err)
        return
    end
    return true
end

M.DispatchToKafka("test", "hello world")

nginx.conf

worker_processes  1;
error_log  logs/error.log  info;
events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  text/html;
    server {
        listen       80;
        access_log  logs/access.log;
        location /luafile {
            content_by_lua_file "/path/to/test.lua";
        }
    }
}
doujiang24 commented 9 years ago

@carminexx @gyllen I have update much in these days, could you please try again?

Now there is three module provided as noticed in the Readme doc.

It's better to try the test case code in t/client.t first if still got problems. Thanks :)

carminexx commented 9 years ago

Hi, I have tested the new version, with the standard producer (not buffer producer), and it works good!

Just a hint for newer releases: add an option to turn off "verbose" errors; else I get too many errors in case a connection doesn't work (for example, if the Kafka server suddenly goes down).

doujiang24 commented 9 years ago

Thanks, I just commit a patch, turn some verbose error log from ERR to INFO: when fetch the metadata from the broker_list, if some of them failed log INFO, still log ERR if the broker_list all failed.