bob403 / emqx_kafka_bridge

EMQX 3.0 plugin Kafka bridge send messgaes from broker to kafka .
Apache License 2.0
22 stars 24 forks source link

how to use emqx_kafa_bridge #1

Open nbatien069 opened 5 years ago

nbatien069 commented 5 years ago

I don't know how I can use this plugins. how to pub to EMQX and sub at kafka?

What do I do after install and load plugin?

bob403 commented 5 years ago
  1. change "kafka.host = 127.0.0.1" , "kafka.port = 9092" to your kafka server ip and port.
  2. cretate "Processing" topic in your kafka server.
  3. start emqx and load plugin emqx_kafka_bridge.
  4. pub message to emqx, then emqx will produce json message to "Processing" topic.
  5. consume "Processing" topic.
nbatien069 commented 5 years ago

ekaf_server:171 <0.1637.0>:connected/2 cant handle {metadata,req,#Port<0.42>}2018-11-27 16:00:28.463 [error] mosqpub/5655-ubuntu@10.10.0.31:47082 Generic server <0.1633.0> terminating Last message in was {inet_async,#Port<0.25>,50, {ok,<<16,33,0,6,77,81,73,115,100,112,3,2,0,60,0, 19,109,111,115,113,112,117,98,47,53,54,53, 53,45,117,98,117,110,116,117>>}} ** When Server state == {state,esockd_transport,#Port<0.25>, {{10,10,0,31},47082}, undefined,running,true, {pstate,external,

Fun,

                            {{10,10,0,31},47082},
                            nossl,4,<<"MQTT">>,<<>>,false,<0.1633.0>,
                            undefined,undefined,undefined,undefined,false,
                            #{},1048576,undefined,undefined,undefined,
                            undefined,false,false,true,true,
                            #{msg => 0,pkt => 0},
                            #{msg => 0,pkt => 0},
                            false,undefined,false},
                        {none,#{max_packet_size => 1048576,version => 4}},
                        undefined,true,undefined,undefined,undefined,
                        undefined,undefined,15000}

Reason for termination == {{timeout,{gen_fsm,sync_send_event,[<0.1637.0>,prepare]}}, [{gen_fsm,sync_send_event,2,[{file,"gen_fsm.erl"},{line,247}]}, {ekaf_lib,prepare,2,[{file,"src/ekaf_lib.erl"},{line,55}]}, {ekaf_lib,common_async,3,[{file,"src/ekaf_lib.erl"},{line,98}]}, {emqx_kafka_bridge,on_client_connected,4, [{file,"src/emqx_kafka_bridge.erl"},{line,55}]}, {emqxhooks,run,2,[{file,"src/emqx_hooks.erl"},{line,93}]}, {emqx_protocol,connack,1,[{file,"src/emqx_protocol.erl"},{line,466}]}, {emqx_connection,handle_packet,2, [{file,"src/emqx_connection.erl"},{line,328}]}, {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,637}]}]} 2018-11-27 16:00:28.467 [error] mosqpub/5655-ubuntu Generic server <0.1635.0> terminating Last message in was {'EXIT',<0.1633.0>, {timeout, {gen_fsm,sync_send_event, [<0.1637.0>,prepare]}}} ** When Server state == {state,15000,true,local,<<"mosqpub/5655-ubuntu">>, undefined,undefined,undefined,1,0,#{},false, {emqx_inflight,32,{0,nil}}, 20000,undefined, {mqueue,true,1000,0,0,none,infinity, {queue,[],[],0}},

{},100,300000,undefined,0,undefined,true,

                        undefined,0,0,
                        {1543,309223,461869},
                        0,undefined,undefined}

Reason for termination == {timeout,{gen_fsm,sync_send_event,[<0.1637.0>,prepare]}}

ekaf_server:130 <0.1637.0>:downtime/2 cant handle {metadata,resp, {metadata_response,1, [{broker,0,<<"ubuntu">>, 9092}], [{topic,<<"telehaha">>,0, [{partition,0,0,0, [{replica,0}], [{isr,0}], 0,[]}]}]}} ekaf_server:130 <0.1637.0>:downtime/2 cant handle {metadata,resp, {metadata_response,1, [{broker,0,<<"ubuntu">>, 9092}], [{topic,<<"telehaha">>,0, [{partition,0,0,0, [{replica,0}], [{isr,0}], 0,[]}]}]}} ekaf_server:130 <0.1637.0>:downtime/2 cant handle {metadata,resp, {metadata_response,1, [{broker,0,<<"ubuntu">>, 9092}], [{topic,<<"telehaha">>,0, [{partition,0,0,0, [{replica,0}], [{isr,0}], 0,[]}]}]}} ekaf_server:130 <0.1637.0>:downtime/2 cant handle {metadata,resp, {metadata_response,1, [{broker,0,<<"ubuntu">>, 9092}], [{topic,<<"telehaha">>,0, [{partition,0,0,0, [{replica,0}], [{isr,0}], 0,[]}]}]}} ekaf_server:130 <0.1637.0>:downtime/2 cant handle {metadata,resp, {metadata_response,1, [{broker,0,<<"ubuntu">>, 9092}], [{topic,<<"telehaha">>,0, [{partition,0,0,0, [{replica,0}], [{isr,0}], 0,[]}]}]}}

nbatien069 commented 5 years ago

this is my issuse

bob403 commented 5 years ago

why use '<<"ubuntu">>'? try to change hostname to ip address .