Automattic / mongoose

MongoDB object modeling designed to work in an asynchronous environment.
https://mongoosejs.com
MIT License
26.96k stars 3.84k forks source link

cursor never finishes #10902

Closed simllll closed 3 years ago

simllll commented 3 years ago

Do you want to request a feature or report a bug? bug

What is the current behavior? this PR https://github.com/Automattic/mongoose/pull/10878 breaks our setup, some streams now never finish.

If the current behavior is a bug, please provide the steps to reproduce. It#s hard to tell what's casuing this, because some stream do finish, but some do not. It seems around 60-70% never finish. image

What is the expected behavior? stream should fire finish event like it was before.

What are the versions of Node.js, Mongoose and MongoDB you are using? Note that "latest" is not a version. mongoose 6.0.11: not working mongoose 6.0.10: works

simllll commented 3 years ago

It seems the only event that is fired is a "drain" event, after that..nothing happens anymore.

Update: in my construct I have a custom wirteable stream where I'm pipeing the mongoose stream to. It seems the writeable stream does not get the finish event now anymore. If I listen to the MongooseStream directly, it starts working again. Still a bit confused about that, maybe if the destroy method is called, the event does not "bubble up", if the event is fired direclty, things work?

await new Promise((resolve, reject) => {
                    stream.on('finish', resolve); // not fired
                    stream.on('error', reject); // not fired
                    MongooseStream.on('close', resolve); // gets fired
                    MongooseStream.on('finish', resolve); // not fired
                    MongooseStream.on('error', reject); // not fired
                });
maximilianschmid commented 3 years ago

same problem.

iovanom commented 3 years ago

I'm try to reproduce that. I've created this test script

const mongoose = require('mongoose');
const fs = require('fs');

const users = [
  { name: 'User1' },
  { name: 'User2' },
  { name: 'User3' },
  { name: 'User4' },
  { name: 'User5' },
  { name: 'User6' },
  { name: 'User7' },
  { name: 'User8' },
  { name: 'User9' },
  { name: 'User10' },
  { name: 'User11' },
  { name: 'User12' },
  { name: 'User13' },
  { name: 'User14' },
  { name: 'User15' },
  { name: 'User16' },
  { name: 'User17' },
  { name: 'User18' },
  { name: 'User19' },
  { name: 'User20' },
  { name: 'User21' },
  { name: 'User22' },
];

function transform(user) {
  return user.name.toUpperCase() + '\n';
}

mongoose.connect('mongodb://localhost:27017/test-mongoose', () => {
  const User = mongoose.model('User', new mongoose.Schema({ name: String }));
  User.create(users)
    .then(() => {
      const cursor = User.find().cursor({ batchSize: 3, transform });
      const write = fs.createWriteStream('./out.txt');
      const stream = cursor.pipe(write);
      stream.on('finish', () => {
        console.log('finish');
        mongoose.disconnect();
      });
      stream.on('error', error => {
        console.log('ERROR', error)
        mongoose.disconnect();
      });
    });
});

And I have got 'finish' on console and also the out.txt contain the right data. Which version of node do you use? @simllll could you provide some test script with this issue?

simllll commented 3 years ago

I will try to modify your script to reproduce the issue.

My node version is v16.11.1, but also experience the same issues with node 14.18.1

simllll commented 3 years ago

Well, I tried narrowing it down, but now I'm at a point where I have no clue anymore.

What is so strange:

so super hard to narrow it down to something specfic... but in this current form, it does NOT work and it produces the bug

UPDATE 1:

UPDATE 2: From time to time the script finishes, I tracked this down to the reason that

const isNotClosedAutomatically = !_this.destroyed;
          if (isNotClosedAutomatically) {

is isNotClosedAutomatically = false, and therefore neither destroy nor close is actually sent. So i tested what happens if I just don't call destroy in case it is defined:

if (isNotClosedAutomatically) {
            // call destroy method if exists to prevent emit twice 'close' by autoDestroy (gh-10876)
            // https://nodejs.org/api/stream.html#stream_readable_destroy_error
            if (_this.destroy) {
              console.log('DESTORY');
                   // _this.destroy();
            } else {
        console.log('CLOSE');
              _this.emit('close');
            }

and everything starts working again on my side too.. could't it be that now destroy is called even though the stream is closed automtaically? So somehow node tries to close the stream already, but it's not destroyed yet?

here is the script (after update 1):

const mongoose = require("mongoose");
const { Writable } = require("stream");

const fs = require("fs");

const entries = [
  {
    _id: "6082b737529f2c41dc5c7322",
    someStringArray: [],
    andSomeString: "Club Diamond",
    anotherString: "Club Diamond",
    someBooleanvalue: false,
  },
  {
    _id: "6082b737529f2c41dc5c7330",
    arrayOfObjectIdsWithoutName: ["6082ade30ae0e22164be0cf7"],
    someStringArray: [],

    andSomeString: "Some String Value With Letters",
    anotherString: "Some String Value With Letters",
    someBooleanvalue: false,
  },
  {
    _id: "6082b737529f2c41dc5c732a",
    arrayOfObjectIdsWithoutName: ["6082ade30ae0e22164be0cf7"],
    someStringArray: [],

    andSomeString: "Some String Value With Letters",
    anotherString: "Some String Value With Letters",
    someBooleanvalue: false,
  },
  {
    _id: "6082b737529f2c41dc5c7320",
    arrayOfObjectIdsWithoutName: [
      "6082ade30ae0e22164be0cb8",
      "6082ade30ae0e22164be0cb9",
      "6082ade30ae0e22164be0cf6",
    ],
    someStringArray: [],
    andSomeString: "Club Standard",
    anotherString: "Club Standard",
    someBooleanvalue: false,
  },
  {
    _id: "6082b737529f2c41dc5c731e",
    arrayOfObjectIdsWithoutName: [
      "6082ade30ae0e22164be0ca7",
      "6082ade30ae0e22164be0ca8",
      "6082ade30ae0e22164be0ca9",
      "6082ade30ae0e22164be0caa",
      "6082ade30ae0e22164be0cab",
      "6082ade30ae0e22164be0c98",
      "6082ade30ae0e22164be0c99",
      "6082ade30ae0e22164be0cf7",
    ],
    someStringArray: [],

    andSomeString: "Eljkafskslsa fasdr",
    anotherString: "asfd dsalkjfaldskjfalkdsjfldsaf asfd",
    someBooleanvalue: false,
  },
  {
    _id: "6082b737529f2c41dc5c7331",
    arrayOfObjectIdsWithoutName: ["6082ade30ae0e22164be0cf7"],
    someStringArray: [],

    andSomeString: "Some String Value With Letters",
    anotherString: "Some String Value With Letters",
    someBooleanvalue: false,
  },
  {
    _id: "6082b737529f2c41dc5c7332",
    arrayOfObjectIdsWithoutName: ["6082ade30ae0e22164be0cf7"],
    someStringArray: [],

    andSomeString: "Some String Value With Letters",
    anotherString: "Some String Value With Letters",
    someBooleanvalue: false,
  },
  {
    _id: "6082b737529f2c41dc5c7324",
    arrayOfObjectIdsWithoutName: [
      "6082ade30ae0e22164be0c9e",
      "6082ade30ae0e22164be0c9b",
      "6082ade30ae0e22164be0c9d",
      "6082ade30ae0e22164be0c9e",
      "6082ade30ae0e22164be0ca4",
      "6082ade30ae0e22164be0ca0",
      "6082ade30ae0e22164be0cc1",
      "6082ade30ae0e22164be0cc2",
      "6082ade30ae0e22164be0cc3",
      "6082ade30ae0e22164be0cc4",
      "6082ade30ae0e22164be0ca5",
      "6082ade30ae0e22164be0cac",
      "6082ade30ae0e22164be0cf7",
    ],
    someStringArray: ["DE", "CH"],

    andSomeString: "asfdafdsfdsa z45",
    anotherString: "afdsfsd afdsfdsafdsa",
    someBooleanvalue: false,
  },
  {
    _id: "6082b737529f2c41dc5c731a",
    arrayOfObjectIdsWithoutName: [
      "6082ade30ae0e22164be0c9b",
      "6082ade30ae0e22164be0ca2",
      "6082ade30ae0e22164be0c99",
      "6082ade30ae0e22164be0c9d",
      "6082ade30ae0e22164be0c9e",
      "6082ade30ae0e22164be0ca3",
      "6082ade30ae0e22164be0ca4",
      "6082ade30ae0e22164be0ca0",
      "6082ade30ae0e22164be0ca5",
      "6082ade30ae0e22164be0ca6",
      "6082ade30ae0e22164be0cf7",
    ],
    someStringArray: ["ASDF"],
    andSomeString: "Premium AT",
    anotherString: "Premium AT",
    someBooleanvalue: false,
  },
  {
    _id: "6082b737529f2c41dc5c732d",
    arrayOfObjectIdsWithoutName: ["6082ade30ae0e22164be0cf7"],
    someStringArray: [],
    andSomeString: "Some String Value With Letters",
    anotherString: "Some String Value With Letters",
    someBooleanvalue: false,
  },
  {
    _id: "6082b737529f2c41dc5c732f",
    arrayOfObjectIdsWithoutName: ["6082ade30ae0e22164be0cf7"],
    someStringArray: [],
    andSomeString: "Some String Value With Letters",
    anotherString: "Some String Value With Letters",
    someBooleanvalue: false,
  },
  {
    _id: "6082b737529f2c41dc5c731b",
    arrayOfObjectIdsWithoutName: [
      "6082ade30ae0e22164be0ca7",
      "6082ade30ae0e22164be0ca8",
      "6082ade30ae0e22164be0ca9",
      "6082ade30ae0e22164be0caa",
      "6082ade30ae0e22164be0cab",
      "6082ade30ae0e22164be0c98",
      "6082ade30ae0e22164be0c99",
      "6082ade30ae0e22164be0cac",
      "6082ade30ae0e22164be0cf7",
      "6082ade30ae0e22164be0ca0",
      "6082ade30ae0e22164be0c9e",
      "6082ade30ae0e22164be0ca0",
      "6082ade30ae0e22164be0c9e",
    ],
    someStringArray: ["ASDF"],

    andSomeString: "Probepaket AT",
    anotherString: "Trial Package AT",
    someBooleanvalue: false,
  }
];

function transform(obj) {
  return JSON.stringify(obj) + "\n";
}

class StorageDestination extends Writable {
  constructor() {
// TO REPRODUCE IT EASIER; SET THIS TO A VERY LOW VALUE
// setting it higher or removing it, needs more data to reproduce this issue
    super({highWaterMark: 1});
    this.currentWriteStream = this.createNewWriteStream();
  }

  createNewWriteStream() {
    // create new write stream
    const fileStream = fs.createWriteStream("./out.txt");

    return fileStream;
  }

  _write(data, encoding, callback) {
    // just pass it through to the actual writestream
    return this.currentWriteStream._write(data, encoding, callback);
  }
}

const storageDestination = new StorageDestination();

mongoose.connect("mongodb://localhost:12345/test-mongoose", async () => {
  const schema = new mongoose.Schema({
    arrayOfObjectIdsWithoutName: [mongoose.Schema.Types.ObjectId],
    someStringArray: [{ type: String, uppercase: true }],
    someBooleanvalue: Boolean,
    andSomeString: String,
    anotherString: String,
  });

  const Model = mongoose.model("TestMongooseBug10902", schema);
  await Model.deleteMany({});

  await Model.create(entries);

  const mongooseQuery = Model.find({}).lean().select(undefined);

  console.log("start");

  const cursor = mongooseQuery
    .batchSize(5000)
    .cursor({ batchSize: 5000, transform });
  const stream = cursor.pipe(storageDestination);
  stream.on("finish", () => {
    console.log("finish", storageDestination.outputSize);
    mongoose.disconnect();
  });
  stream.on("error", (error) => {
    console.log("ERROR", error);
    mongoose.disconnect();
  });
});

tested with node v14.18.1 node v16.11.1 node v17

iovanom commented 3 years ago

@simllll I've found the issue. The 'end' event is not emitted after destroy. I'm preparing a PR with the fix.

iovanom commented 3 years ago

PR: https://github.com/Automattic/mongoose/pull/10906

iovanom commented 3 years ago

@vkarpov15 the PR https://github.com/Automattic/mongoose/pull/10916 for version 5 Thank you!