Open expandboard opened 3 years ago
Based on the error you shown, seems the problem is happening here.
BTW by looking at the mongodb docs I cannot find what's wrong in that code. For sure we are missing some error handling with handleWriteError
but the bug is somewhere else
What version of aedes are you using? If you are using 0.45.0 this could be related to our last change where we removed the retain false in broker publish method, this in a cluster env could make multiuple aedes instances calling the bulk write to the same document (?)
@mcollina @getlarge any thoughts?
We could also think about adding a writeConcern
and retryWrites
options to buld constructor/execute method
@robertsLando, @expandboard metioned to use aedes v0.44 so the latest changes should not be the reason for this issue.
I cannot help regarding this persistence module and mongo bulkOperation, sorry. My naive approach would be to reproduce this situation and add some log and error catchers to know more about it.
I used example code in the current master branch of aedes git. I reviewed again. package-lock.json is:
"aedes": {
"version": "0.44.2",
"resolved": "https://registry.npmjs.org/aedes/-/aedes-0.44.2.tgz",
"integrity": "sha512-HBVP3rcwoMwkLafr+QfuBvBu+CJyzoiIf0MXWWV86VbtyWsz9iH3lO7kDCgAzK3Nz7l08osKC9Xg8/iowOCe6g==",
"requires": {
"aedes-packet": "^2.3.1",
"aedes-persistence": "^8.1.1",
"bulk-write-stream": "^2.0.1",
"end-of-stream": "^1.4.4",
"fastfall": "^1.5.1",
"fastparallel": "^2.4.0",
"fastseries": "^2.0.0",
"hyperid": "^2.0.5",
"mqemitter": "^4.4.0",
"mqtt-packet": "^6.7.0",
"readable-stream": "^3.6.0",
"retimer": "^2.0.0",
"reusify": "^1.0.4",
"uuid": "^8.3.1"
}
},
"aedes-cached-persistence": {
"version": "8.1.1",
"resolved": "https://registry.npmjs.org/aedes-cached-persistence/-/aedes-cached-persistence-8.1.1.tgz",
"integrity": "sha512-LBCzUu+22zGbSGICRsZhabzj4V0rNGtpIEvjyLQEhic6OnL062wF+qsgJTgQUph92qBwF894F0shgNlIPn0Z6Q==",
"requires": {
"aedes-persistence": "^8.1.2",
"fastparallel": "^2.4.0",
"multistream": "^4.0.1",
"qlobber": "^5.0.3"
}
},
"aedes-packet": {
"version": "2.3.1",
"resolved": "https://registry.npmjs.org/aedes-packet/-/aedes-packet-2.3.1.tgz",
"integrity": "sha512-LqBd57uc2rui2RbjycW17dylglejG26mM4ewVXGNDnVp/SUHFVEgm7d1HTmYrnSkSCNoHti042qgcTwv/F+BtQ==",
"requires": {
"mqtt-packet": "^6.3.0"
}
},
"aedes-persistence": {
"version": "8.1.3",
"resolved": "https://registry.npmjs.org/aedes-persistence/-/aedes-persistence-8.1.3.tgz",
"integrity": "sha512-VMCjEV+2g1TNJb/IlDEUy6SP9crT+QUhe2xc6UjyqrFNBNgTvHmOefXY7FxWrwmR2QA02vwg3+5p/JXkyg/Dkw==",
"requires": {
"aedes-packet": "^2.3.1",
"from2": "^2.3.0",
"qlobber": "^5.0.3"
}
},
"aedes-persistence-mongodb": {
"version": "7.0.1",
"resolved": "https://registry.npmjs.org/aedes-persistence-mongodb/-/aedes-persistence-mongodb-7.0.1.tgz",
"integrity": "sha512-NJlZsVb7i3sUTm7hj86E8PtFCR+Pf068kzNCQhlBH8MuWLgF8WB51GArkJYsgo32vu5UBarfN9lBsj2CpR72Hw==",
"requires": {
"aedes-cached-persistence": "^8.0.0",
"escape-string-regexp": "^1.0.5",
"mongodb": "^3.4.1",
"native-url": "^0.2.4",
"pump": "^3.0.0",
"qlobber": "^3.0.2",
"through2": "^3.0.0"
},
"dependencies": {
"qlobber": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/qlobber/-/qlobber-3.1.0.tgz",
"integrity": "sha512-B7EU6Hv9g4BeJiB7qtOjn9wwgqVpcWE5c4/86O0Yoj7fmAvgwXrdG1E+QF13S/+TX5XGUl7toizP0gzXR2Saug=="
}
}
}
Just pulled the code from the cluster example, and I'm seeing the same error:
.../node_modules/mongodb/lib/bulk/common.js:738
throw new TypeError('Update document requires atomic operators');
@KevinVitale IMO the problem is that upsert is not atomic and could create errors in cluster enviroments here:
https://github.com/moscajs/aedes-persistence-mongodb/blob/master/persistence.js#L269
I think you could give a try by using http://mongodb.github.io/node-mongodb-native/4.0/classes/bulkoperationbase.html#handlewriteerror
i run cluster example with MongoDB, broker can handle publish with NOT retain flag but when i set Retain flag true, it rises this error; /home/ioboard/mqttbroker/examples/clusters/node_modules/mongodb/lib/bulk/common.js:647 throw new TypeError('Update document requires atomic operators'); ^
TypeError: Update document requires atomic operators at FindOperators.updateOne (/home/ioboard/mqttbroker/examples/clusters/node_modules/mongodb/lib/bulk/common.js:647:13) at executeBulk (/home/ioboard/mqttbroker/examples/clusters/node_modules/aedes-persistence-mongodb/persistence.js:200:38) at MongoPersistence.storeRetained (/home/ioboard/mqttbroker/examples/clusters/node_modules/aedes-persistence-mongodb/persistence.js:185:3) at PublishState.storeRetained (/home/ioboard/mqttbroker/examples/clusters/node_modules/aedes/aedes.js:166:29) at makeCallTwo (/home/ioboard/mqttbroker/examples/clusters/node_modules/fastseries/series.js:150:3) at release (/home/ioboard/mqttbroker/examples/clusters/node_modules/fastseries/series.js:138:7) at resultList (/home/ioboard/mqttbroker/examples/clusters/node_modules/fastseries/series.js:125:3) at Aedes.series [as _series] (/home/ioboard/mqttbroker/examples/clusters/node_modules/fastseries/series.js:45:7) at Aedes.publish (/home/ioboard/mqttbroker/examples/clusters/node_modules/aedes/aedes.js:257:8) at Client.enqueuePublish (/home/ioboard/mqttbroker/examples/clusters/node_modules/aedes/lib/handlers/publish.js:60:21) Worker 204140 died with code: 1, and signal: null Starting a new worker Worker 206756 is online
what's wrong here?
System Information