moscajs / mosca

MQTT broker as a module
mosca.io
3.2k stars 513 forks source link

mongo persistence "write after end" error when server connection is closed abruptly #635

Closed jmholzinger closed 7 years ago

jmholzinger commented 7 years ago

I've been experiencing an "unrecoverable" error which seems to come from mosca's mongodb backend/persistence layer. If the mongodb server connection is ended abruptly, sometimes a "write after end" error is thrown causing the process to crash due to an unhandledException, even though error listeners are registered on the server.

My goal is to recover in the event the mongodb server goes down momentarily, so I would like to handle this error, but it is never passed up to an error handler I have control over, even if I pass the db connection to mosca. I'm avoiding handling this error at the process level at all costs.

Here's the stack trace I get when the process crashes:

events.js:160
      throw er; // Unhandled 'error' event
      ^

Error: write after end
    at writeAfterEnd (_stream_writable.js:193:12)
    at Socket.Writable.write (_stream_writable.js:244:5)
    at Socket.write (net.js:661:40)
    at Connection.write (C:\Users\jholzinger\WebstormProjects\mosca-crash\node_modules\mongodb-core\lib\connection\connection.js:521:53)
    at C:\Users\jholzinger\WebstormProjects\mosca-crash\node_modules\mongodb-core\lib\connection\pool.js:1274:26
    at waitForAuth (C:\Users\jholzinger\WebstormProjects\mosca-crash\node_modules\mongodb-core\lib\connection\pool.js:1135:39)
    at C:\Users\jholzinger\WebstormProjects\mosca-crash\node_modules\mongodb-core\lib\connection\pool.js:1143:5
    at C:\Users\jholzinger\WebstormProjects\mosca-crash\node_modules\mongodb-core\lib\connection\pool.js:1007:21
    at _combinedTickCallback (internal/process/next_tick.js:73:7)
    at process._tickCallback (internal/process/next_tick.js:104:9)

And the activity in the mongodb logs just before the process crashes:

2017-04-28T17:49:37.702-0700 I COMMAND  [conn1] command administration.EventBroker command: getMore { getMore: 29475920916, collection: "EventBroker", batchSize: 1000 } originatingCommand: { find: "EventBroker", filter: { _id: { $gt: ObjectId('5903e31bd2d1d33cf865749c') } }, tailable: true, awaitData: true } planSummary: COLLSCAN cursorid:29475920916 keysExamined:0 docsExamined:0 numYields:0 nreturned:0 reslen:113 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 1999ms
2017-04-28T17:49:39.703-0700 I COMMAND  [conn5] command administration.EventBroker command: getMore { getMore: 29475920916, collection: "EventBroker", batchSize: 1000 } originatingCommand: { find: "EventBroker", filter: { _id: { $gt: ObjectId('5903e31bd2d1d33cf865749c') } }, tailable: true, awaitData: true } planSummary: COLLSCAN cursorid:29475920916 keysExamined:0 docsExamined:0 numYields:0 nreturned:0 reslen:113 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 2000ms
2017-04-28T17:49:42.706-0700 I COMMAND  [conn5] command administration.EventBroker command: getMore { getMore: 29475920916, collection: "EventBroker", batchSize: 1000 } originatingCommand: { find: "EventBroker", filter: { _id: { $gt: ObjectId('5903e31bd2d1d33cf865749c') } }, tailable: true, awaitData: true } planSummary: COLLSCAN cursorid:29475920916 keysExamined:0 docsExamined:0 numYields:0 nreturned:0 reslen:113 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 1999ms
2017-04-28T17:49:45.710-0700 I COMMAND  [conn5] command administration.EventBroker command: getMore { getMore: 29475920916, collection: "EventBroker", batchSize: 1000 } originatingCommand: { find: "EventBroker", filter: { _id: { $gt: ObjectId('5903e31bd2d1d33cf865749c') } }, tailable: true, awaitData: true } planSummary: COLLSCAN cursorid:29475920916 keysExamined:0 docsExamined:0 numYields:0 nreturned:0 reslen:113 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 2000ms

I'm personally using mongoose to open and manage the database connection like so:

'use strict';

var mongoose = require('mongoose');
var mosca = require('mosca');

var dbUrl = 'mongodb://localhost:8316/administration';

mongoose.connect(dbUrl);

mongoose.connection.on('connected', function () {
    var server = new mosca.Server({
        port: 1883,
        // https://github.com/mcollina/ascoltatori#mongodb
        backend: {
            type: 'mongo',
            pubsubCollection: 'EventBroker',
            db: mongoose.connection.db
        },
        // https://github.com/mcollina/mosca/wiki/Mosca-advanced-usage#mongo-persistence-options
        persistence: {
            factory: mosca.persistence.Mongo,
            connection: mongoose.connection.db
        }
    });

    server.on('ready', function () {
        console.log('MQTT Server ready.');
    });

    server.on('closed', function () {
        console.log('MQTT Server closed.');
    });

    server.on('error', function (err) {
        console.log(err);
    });

    mongoose.connection.on('error', function (err) {
        console.log(err);
    });

    mongoose.connection.db.on('error', function (err) {
        console.log(err);
    });
});

But the error is reproducible when creating the connection using the native driver:

'use strict';

var mongodb = require('mongodb');
var mosca = require('mosca');

var dbUrl = 'mongodb://localhost:8316/administration';

mongodb.MongoClient.connect(dbUrl, function(err, db) {
    if (err) {
        throw err;
    } else {
        var server = new mosca.Server({
            port: 1883,
            // https://github.com/mcollina/ascoltatori#mongodb
            backend: {
                type: 'mongo',
                pubsubCollection: 'EventBroker',
                db: db
            },
            // https://github.com/mcollina/mosca/wiki/Mosca-advanced-usage#mongo-persistence-options
            persistence: {
                factory: mosca.persistence.Mongo,
                connection: db
            }
        });

        server.on('ready', function () {
            console.log('MQTT Server ready.');
        });

        server.on('closed', function () {
            console.log('MQTT Server closed.');
        });

        server.on('error', function (err) {
            console.log(err);
        });

        db.on('error', function(err) {
            console.log(err);
        });
    }
});

As well as the built in mosca functionality:

'use strict';

var mosca = require('mosca');

var dbUrl = 'mongodb://localhost:8316/administration';

var server = new mosca.Server({
    port: 1883,
    // https://github.com/mcollina/ascoltatori#mongodb
    backend: {
        type: 'mongo',
        pubsubCollection: 'EventBroker',
        url: dbUrl
    },
    // https://github.com/mcollina/mosca/wiki/Mosca-advanced-usage#mongo-persistence-options
    persistence: {
        factory: mosca.persistence.Mongo,
        url: dbUrl
    }
});

server.on('ready', function () {
    console.log('MQTT Server ready.');
});

server.on('closed', function () {
    console.log('MQTT Server closed.');
});

server.on('error', function (err) {
    console.log(err);
});

I've opened this issue with "mongodb-core" to see if they had any feedback, but I'm not convinced the problem is coming from their end, you can find that issue here: https://github.com/christkv/mongodb-core/issues/179

Any feedback related to this issue would be appreciated. Thanks.

mcollina commented 7 years ago

I think you should try to attach a debugger and see what's happening, and where the error is firing.

I do not think is a good idea to handle this error. Given the amount of messages Mosca can receive, if the connection to Mongo fail is safer to fail early and restart. Long-term, handling those types of error conditions leads to memory leaks, file descriptor leaks and a lot of other very bad things.

jmholzinger commented 7 years ago

While I agree with your concerns I'm inclined to disagree that the error should not be handled, mongodb connections are supposed to be fault tolerant, this is why they offer an autoReconnect option.

I will attempt to further debug this issue when I get a chance and report my findings here.

mcollina commented 7 years ago

Feel free to send a PR if you discover if we are missing something here.

jmholzinger commented 7 years ago

I wasn't able to pinpoint the problem area, but I was able to determine that the way mosca works won't suit my needs, when the connection does recover the broker attempts to bind to the port again causing another crash. Fortunately I was able to switch to aedes and this problem is no longer occurring. At this point I don't think a PR is necessary, the library just doesn't seem to support my use-case.