Closed lilbrianxp closed 8 years ago
I'm not sure if this is your situation:
test
not created manuallycause a non-existent topic will automatically create in default kafka config if this matches your situation, you are recommended to create topic manually
python works fine, maybe it will auto retry when not found topic
Sorry for the late and thank you for the information
It works well on my side with 500 partitions
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "127.0.0.1", port = 9092 },
}
local key = "key"
local message = "halo world"
local p = producer:new(broker_list, { producer_type = "async", flush_time = 10000 })
ngx.sleep(0.1) -- will have an immediately flush by timer_flush
local ok, err = p:send("test500", key, message)
if not ok then
ngx.say("send err:", err)
return
end
ngx.say("send ok:", ok)
p:flush()
local offset0 = p:offset()
for i = 1, 50000 do
local ok, err = p:send("test500", key, message)
if not ok then
ngx.say("send err:", err)
return
end
p:flush()
end
local offset1 = p:offset()
ngx.say("send num:", tonumber(offset1 - offset0))
works fine:
send ok:true
send num:50000
and your 'fix' looks really magic for me. so could you test this code on your side?
local cjson = require "cjson"
local client = require "resty.kafka.client"
local broker_list = {
{ host = "127.0.0.1", port = 9092 },
}
-- usually we do not use this library directly
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
if not brokers then
ngx.say("fetch_metadata failed, err:", partitions)
end
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
thanks :)
@afsheenb
could you add some debug log? Or if it's possible for me to get you environment? I can not reproduce on my side
local cjson = require "cjson"
local function choose_partition(self, topic, key)
local brokers, partitions = self.client:fetch_metadata(topic)
if not brokers then
return nil, partitions
end
ngx.log(ngx.ERR, cjson.encode(partitions))
return self.partitioner(key, partitions.num, self.correlation_id)
end
So it looks like pulling from master and trying again 'fixed' my issue! I'll delete my comment, it's no longer relevant and I think may be confusing for others.
Hi lilbrianxp: I have the same problem like "not foundd topic",but the topic has been created before, how to resolve it, thanks.
Using: Openresty 1.7.10.1 kafka server 2.10-0.8.2.1
@sunchenxian thanks for the report, are you using the master code? if it is yes, could you try the debug code I list before, I can not reproducer the error on my side
@doujiang24 Hi, I have tried the following debug code:
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{host="127.0.0.1",port=9093},
}
local cli = client:new( broker_list )
local brokers, partitions = cli:fetch_metadata( "test" )
if not brokers then
ngx.say( "fetch_metadata failed, err:", partitions )
end
ngx.say( "brokers: ", cjson.encode( brokers ), "; partitions: ", cjson.encode( partitions ) )
ngx.exit( 200 )
It prints debug message to me:
brokers: [{"host":"localhost","port":9093},{"host":"localhost","port":9094}, {"host":"localhost","port":9095}]; partitions: {"0":{"id":0,"errcode":0,"replicas":[3,2,1],"isr":[3],"leader":3},"errcode":0,"num":1}
if I run the following code, it print "not foundd topic", I have created the topic before.
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local brokers = {
{host="localhost",port=9093},
}
local message = "hello world"
local p = producer:new( brokers, {producer_type="sync"} )
local offset, err = p:send( "test", nil, message )
if not offset then
ngx.say( "err:" .. err )
end
ngx.exit(200)
waiting for your reply.
@sunchenxian
seems like nginx can not resolve localhost
on your side, that the producer can not fetch the metadata from broker
Do you see any error info in /path/to/nginx/log/error.log
?
@doujiang24 Hi, there are some errors in error log file:
2015/06/09 10:08:18 [error] 31350#0: [lua] client.lua:150: _fetch_metadata(): all brokers failed in fetch topic metadata, context: ngx.timer, client: 127.0.0.1, server: 0.0.0.0:9100
2015/06/09 10:08:18 [error] 31350#0: [lua] client.lua:150: _fetch_metadata(): all brokers failed in fetch topic metadata, context: ngx.timer, client: 127.0.0.1, server: 0.0.0.0:9100
2015/06/09 10:08:18 [error] 31350#0: [lua] producer.lua:255: buffered messages send to kafka err: not foundd topic, retryable: true, topic: test, partition_id: -1, length: 1, context: ngx.timer, client: 127.0.0.1, server: 0.0.0.0:9100
I will try a domain instead of localhost to verify your guess.
Hi doujiang24: I have verified your guess, It retured right information to me when I set host.name to "127.0.0.1" in all broker config files, but I don't know why does the kafka server unrecognize “localhost”,I have setted localhost in file /etc/hosts. thank you for all your replies.
我发现如果没有在nginx中填写正确的reslover也会导致no foundd topic
resolver 10.10.114.140 //如果没有这一行
local broker_list = {
{ host = "xxx.xxx.xxx.xxxx", port = 9092 },
}
I bypass the resolver Instruction and the kafka setting.Now kafka return the IP address or the Hostname both work.
local hostname2ip = {
["kafka-node0"] = "192.168.10.34",
["kafka-node1"] = "192.168.10.35",
["kafka-node2"] = "192.168.10.36"
}
-- sync producer_type
local p = producer:new(broker_list)
p.client:fetch_metadata('ngx-test')
for k, v in pairs(p.client.brokers) do
if hostname2ip[v.host] then
p.client.brokers[k].host = hostname2ip[v.host]
end
end
@chenyoufu It's really not a good idea. You'd better change the host.name
in your kafka config or use your local dns.
@sunchenxian Actually, it's not kafka server unrecognize “localhost”, it's nginx unrecognize localhost
, because do not use the config in /etc/hosts
.
是的,我也发现时nginx不能发现域名
Consider it resolved.
我发现如果没有在nginx中填写正确的reslover也会导致no foundd topic
resolver 10.10.114.140 //如果没有这一行 local broker_list = { { host = "xxx.xxx.xxx.xxxx", port = 9092 }, }
请问resolver后面这个ip是指的你本机ip?我现在遇到了同样的问题。如果broker_list中配置了域名,就会出现not found topic。如果broker_list中配置了域名解析后的ip就能成功写入。请问你是怎么解决得呢?
是的,我也发现时nginx不能发现域名
请问你是怎么解决得呢?由于域名解析后的ip是不固定的,所以我们不能写死ip,而只能用域名。
我将resolver 1.1.1.1(dns-ip);加入到nginx conf中就可以了 太感谢大家的回复了
Hi,
Not sure why I'm getting this error and the topic do exist. I also tested it through client python with the same topic and that worked.
.... ....
local broker_list = { { host = "127.0.0.1", port = 9092 }, }
Using: Openresty 1.7.2.1 kafka server 2.10-0.8.2.0
Also, I tested on 0.8.1 and 0.8.1.1 Thanks,