Open adrian-gierakowski opened 4 years ago
Created a small github repo with Travis on it with a similar test case: https://github.com/szabolcs-szilagyi/highland-node-12 https://travis-ci.org/szabolcs-szilagyi/highland-node-12
Setup Travis to build it daily, so the above image should go green once its fixed.
interestingly if you use the ratelimit
function before calling the batch on the stream with some settings it can finish. Think it might be a timing issue on setting some internal state. E.g. if in the test project I put:
hl(mongoStream)
.ratelimit(110, 0)
.batch(300)
it will actually finish, while if I put 120 for the amount it will hang.
what I've found sofar:
it is able to pull data from the mongo stream 4 times, but then it forgets to resume/pull from it again. Tried to run it with the --inspect
flag to see - after lots of console.logs - but of course like all timing bugs it runs perfectly when you start it in debug mode. :/
Tried to reproduce the bug with file streams, but those ran okay. Another case that mitght be related is when I stream http requests using the got
library, it also hangs. (node@12, got@10).
I'm tempted to give up at this point and just start on a big project of replacing all highland usages with some other solutions. :(
I took a quick look at your test repo and saw you are using highland 3 which if I recall, is missing some work around when resources are destroyed which may be affecting this. What happens if you use the stable Highland 2.13.5? https://www.npmjs.com/package/highland
Also what happens if you use .toPromise(Promise)
instead of .toCallback()
?
Lastly, looking at the source:
https://github.com/caolan/highland/blob/3.0.0/lib/index.js#L2785-L2792
It has a clause to close the stream when the source stream closes. That suggests it may be the mongo node stream that is not triggering the completion because it's also batching and will not close until it's satisfied.
Could you try adding an event listener to the mongo stream to see if it dispatches a finish
or close
event?
Hmm with version 2.13.5 it finishes nicely without the issue. That is cool, now just need to see why did we started to use 3. :D
In regards to the finish
and close
events, I've added them, but they don't get triggered. (updated the repo with the event listeners too)
edit: I use toPromise
already in my test repo.
Ok that's at least progress. I see your updated test repo and the use of toPromise. You're right that close
and finish
may not fire but is end
firing?
Also, based on how you're using batch and flatMap here, would .take(11)
not suffice?
Indeed progress, thank you for the support :)
For when the test runs on the non-batched stream it triggers all three event handlers, but on the second run with batched stream, none of them gets triggered.
How would the .take(n)
method help? It gives a new stream with the first n items, but don't see how would we be able to iterate through all the items in the original stream.
Thanks for that info! I'm going to dig in today and see if I can pinpoint if it's a highland issue specifically or something with the batched mongo stream.
As for take
, I wasn't sure what the intention was looking at the example. Based on the use of .toCallback
and the fact that this example batches it into an array then flattens it out again, I was wondering if the goal was to get a finite stream out of an infinite stream.
Made some progress.
async function main () {
const client = await MongoClient.connect(url, opts);
const db = client.db("test");
const col = db.collection("test");
await col.removeMany();
await col.insertMany(docs);
// This function will explode in six seconds
timeout(6000)
.done(() => {
throw new Error("APPLICATION TIMED OUT");
client.close();
process.exit(1);
});
return _.of(col)
.flatMap(fetchDocs)
.batch(11)
.consume(logStream)
.flatMap(_)
.reduce(append, [])
.toCallback((err, xs) => {
client.close();
if (err) {
console.error(err);
process.exit(1);
}
assert.deepEqual(xs, docs, `DOCS WERE NOT RETUREND. INSTEAD FOUND "${xs}"`);
console.log("Completed!", xs);
process.exit(0);
});
}
That works, the main difference being _(cursor.stream())
-> _.of(x).flatMap(() => _(cursor.stream))
.
My next step is to create a new Node Readable stream that kinda simulates the cursor.stream. That should confirm whether this is a Highland or Mongo DB API issue.
My progress is at https://github.com/eccentric-j/highland-batch-repro which includes a docker-compose file to make it easier to run.
wow interesting, tried to use
const mongoStream = hl.of(collection.find().stream()).flatMap(hl);
instead of the
const mongoStream = collection.find().stream();
in my tests and indeed the streaming works if wrapped like this.
Tried _(mockMongoStream(docs))
with a custom subclass of Stream.Readable
and it worked without issue https://travis-ci.com/eccentric-j/highland-batch-repro/builds/151668807. I also tried _(_(docs).toNodeReadable({ objectMode: true }))
and it also worked without issue https://travis-ci.com/eccentric-j/highland-batch-repro/builds/151663566.
This has me leaning towards the issue residing in mongodb's cursor.stream()
method. The next step is to use the mongo db stream and try consuming it in a batch Node Transform stream to see if I can reproduce the error without using Highland.
has anyone managed to make any further progress on this?
@adrian-gierakowski unfortunately no, personally I started to move away from using highland utilizing async-await
, for await of
and nodejs' built in pipeline
method.
There is a helper lib that a collegue of mine made - called aih
- have a look into the lib
folder you can see some good examples on how you can use await
with streams.
The following test deadlocks (
toCallback
never gets called):You save save the above in
tests/batch_deadlock.js
, install mongo drives withnpm i mongodb
and finally run it withnpx nodeunit tests/batch_deadlock.js
. It should deadlock when running with node version >= 10, but passes when ran with node 8.x.I've also left a few comments in the code which show some tweaks that make the deadlock go away.