moscajs / mosca

MQTT broker as a module
mosca.io
3.2k stars 513 forks source link

Clients get repeated messages on endless loop #767

Open 0xmtn opened 5 years ago

0xmtn commented 5 years ago

Hi,

I'm using lua-mosquitto and node mqtt as a client to my Mosca broker with kafka backend. But I get repeated message like below:

0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    sub|blahblah9
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    sub|blahblah9
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    sub|blahblah9
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    sub|blahblah9
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    sub|blahblah9
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello2
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello3
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello3
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello3
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello3
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello3
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello3
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello3
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello3
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello3
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    hellohello3
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k    
0   /c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k

Client.lua:

mqtt = require("mosquitto")
client = mqtt.new()

client.ON_CONNECT = function()
    print("5b8bc0aa6be58c05415cbc44 connected")
    client:subscribe("/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k")
end

client.ON_MESSAGE = function(mid, topic, payload)
    print(mid, topic, payload)
    -- act, ext = payload:match("([^,]+)|([^,]+)")
    -- print(act, ext)
    -- if act == "sub" then client:subscribe("/c/5b8b215c855b7569eb94f0ed/d/5b8b26dd2ac7176a44765c0c/k/"..ext) end
end

client.ON_PUBLISH = function()
    -- client:disconnect()
end

client:connect("localhost", 1883)
--client:connect("test.mosquitto.org", 1883) -- It works well with mosquitto.org.
client:loop_forever()

Node MQtt Client:

var mqtt = require('mqtt');
var options = {
  port: 1883,
  host: 'mqtt://localhost',
  keepalive: 60,
  reconnectPeriod: 1000,
  protocolId: 'MQIsdp',
  protocolVersion: 3,
  clean: true,
  encoding: 'utf8'
};
var Client = mqtt.connect('mqtt://localhost', options);

Client.on('connect',  () => {
  console.log("Client Connected");
  //Client.subscribe("_c_5b8b215c855b7569eb94f0ed_d_5b92665fce3f081648ee86c9_k");
  Client.subscribe("/c/5b8b215c855b7569eb94f0ed/d/5b92665fce3f081648ee86c9/k");
});

Client.on('message', (p, a) => {
  console.log(p, a);
});

Mosca Broker.js

var mosca = require('mosca');
var crypto = require('crypto');
var fs = require('fs');

var backend = {
    type: "kafka",
    json: false,
    connectionString: "localhost:2181",
    clientId: "mosca",
    groupId: "mosca",
    defaultEncoding: "utf8",
  };

var moscaSettings = {
    interfaces: [
        { type: "mqtt", port: 1883 },
    ],
    id: "mosca",
    stats: false,
    publishNewClient: false,
    publishClientDisconnect: false,
    publishSubscriptions: false,
    backend: backend,
};
var server = new mosca.Server(moscaSettings);
server.on('ready', setup);
function setup() {
    console.log('Mosca server is up and running.');
}

server.on("error", function (err) {
    console.log(err);
});
server.on('clientConnected', function (client) {
    console.log('Client Connected \t:= ', client.id);
});
server.on('published', function (packet, client) {
  packet.payload = packet.payload.toString('utf8');
  console.log("Published :=", packet);
});
server.on('subscribed', function (topic, client) {
    console.log("Subscribed :=", topic, client.packet);
});
server.on('unsubscribed', function (topic, client) {
    console.log('unsubscribed := ', topic);
});
server.on('clientDisconnecting', function (client) {
    console.log('clientDisconnecting := ', client.id);
});
server.on('clientDisconnected', function (client) {
    console.log('Client Disconnected     := ', client.id);
});

Any ideas on how to solve this?

0xmtn commented 5 years ago

Hi @mcollina, did you have any chance to look at this?

user1m commented 5 years ago

@0xmtn did you have any luck with this? Having the same experience. Not sure if it's a Kafka setting or Mosca though. @mcollina any thoughts?

grath10 commented 5 years ago

@0xmtn @User1m meet the similar problem, could you give me some suggestion as to repeat the problem? I want to go deep into why this would happen. Thx!

user1m commented 5 years ago

@grath10 I ended up giving up on this lib but before that, I found a fork that was more actively maintained. if you want to continue w/ this lib then use the following instead: https://github.com/ConduitVC/ascoltatori https://github.com/ConduitVC/mosca hope that helps.

If it's of any help I talk about what I ended up going with here: https://github.com/User1m/kafka-pub-sub-investigation

grath10 commented 5 years ago

@User1m Thx again for sharing your practice with me. By the way, could you provide the substitute one for me to refer to? Any suggestion is appreciated!

user1m commented 5 years ago

@grath10 I went with just kafka-node