Automattic / mongoose

MongoDB object modeling designed to work in an asynchronous environment.
https://mongoosejs.com
MIT License
26.88k stars 3.83k forks source link

catching errors on streams #14304

Open simllll opened 7 months ago

simllll commented 7 months ago

Prerequisites

Mongoose version

8.1.1

Node.js version

20.x

MongoDB server version

6.x

Typescript version (if applicable)

5.1.6

Description

when using a mongoose stream, I cannot catch an initial error, it always kills the proess with an "uncaught exception".

following (pseudo) code:

const stream = someCollectionl.find().sort({'_id': -1}).stream({transform})
stream.on('error', err => console.error(err));

throws with

uncaughtException MongoServerError: Encountered non-retryable error during query :: caused by :: PlanExecutor error during aggregation :: caused by :: Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting.
   at Connection.onMessage (/home/simon/Dev/hokify/hokify/node_modules/.pnpm/mongodb@6.3.0_snappy@7.2.2/node_modules/mongodb/src/cmap/connection.ts:421:18)
    at MessageStream.<anonymous> (/home/simon/Dev/hokify/hokify/node_modules/.pnpm/mongodb@6.3.0_snappy@7.2.2/node_modules/mongodb/src/cmap/connection.ts:251:56)
    at MessageStream.emit (node:events:517:28)
    at MessageStream.emit (node:domain:489:12)
    at processIncomingData (/home/simon/Dev/hokify/hokify/node_modules/.pnpm/mongodb@6.3.0_snappy@7.2.2/node_modules/mongodb/src/cmap/message_stream.ts:179:12)
    at MessageStream._write (/home/simon/Dev/hokify/hokify/node_modules/.pnpm/mongodb@6.3.0_snappy@7.2.2/node_modules/mongodb/src/cmap/message_stream.ts:68:5)
    at writeOrBuffer (node:internal/streams/writable:392:12)
    at _write (node:internal/streams/writable:333:10)
    at MessageStream.Writable.write (node:internal/streams/writable:337:10)
    at Socket.ondata (node:internal/streams/readable:777:22)
    at Socket.emit (node:events:517:28)
    at Socket.emit (node:domain:489:12)
    at addChunk (node:internal/streams/readable:335:12)
    at readableAddChunk (node:internal/streams/readable:308:9)
    at Socket.Readable.push (node:internal/streams/readable:245:10)
    at TCP.onStreamRead (node:internal/stream_base_commons:190:23) {
  ok: 0,
  code: 292,
  codeName: 'QueryExceededMemoryLimitNoDiskUseAllowed',

I played around with it and couldn't find any way how to catch this error.

Steps to Reproduce

run a query that results into QueryExceededMemoryLimitNoDiskUseAllowed by simple using some big collection and putting a sort on it.

Expected Behavior

the error should be catchable.

IslandRhythms commented 7 months ago
const mongoose = require('mongoose');

const testSchema = new mongoose.Schema({
  studentId: String
});

const Test = mongoose.model('Test', testSchema);

async function run() {
  await mongoose.connect('mongodb://localhost:27017');
  const iterations = 1000000;
  console.log(`begin creating ${iterations} documents`)
  console.log(mongoose.connection.collections);
  for (let i = 0; i < iterations; i++) {
    await Test.create({ studentId: (''+i).repeat(100) });
  }

  console.log('run query');
  const stream = mongoose.connection.collection('tests').find().sort().stream();
  console.log('stream', stream);
  let i = 0;
  stream.on('data', () => { console.log('Doc', ++i); });
  stream.on('error', err => console.log(err));
  stream.on('end', () => console.log('Done'))
}

run();
vkarpov15 commented 7 months ago

The issue is the transform. The following code prints "Caught" before the "Sort exceeded memory limit" error, which is expected:

const mongoose = require('mongoose');
const { Transform } = require('stream');

const testSchema = new mongoose.Schema({
  studentId: String
});

const Test = mongoose.model('Test', testSchema);

async function run() {
  await mongoose.connect('mongodb://localhost:27017');
  const iterations = 10000;
  /*console.log(`begin creating ${iterations} documents`)
  for (let i = 0; i < iterations; i++) {
    await Test.create({ studentId: ('' + i).repeat(5000) });
    console.log(i);
  }*/

  const transform = new Transform({
    transform(data, encoding, callback) {
      callback(null, data);
    },
  });

  console.log('run query');
  const stream = mongoose.connection.collection('tests').find().sort({ studentId: -1 }).stream(/*{ transform }*/);
  let i = 0;
  stream.on('data', () => { console.log('Doc', ++i); });
  stream.on('error', err => console.log('Caught', err));
  stream.on('end', () => console.log('Done'));
}

run();

However, uncomment the transform option and you get an uncaught error:

run query
node:events:492
      throw er; // Unhandled 'error' event
      ^

MongoServerError: Executor error during find command :: caused by :: Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting.
...
Emitted 'error' event on ReadableCursorStream instance at:
    at emitErrorNT (node:internal/streams/destroy:151:8)
    at emitErrorCloseNT (node:internal/streams/destroy:116:3)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
  ok: 0,
  code: 292,
  codeName: 'QueryExceededMemoryLimitNoDiskUseAllowed',
  [Symbol(errorLabels)]: Set(0) {}
}

Node.js v18.17.1

We're looking into how to fix this.

vkarpov15 commented 7 months ago

The issue looks like the underlying MongoDB node driver just uses pipe() to pass in the transform, which doesn't propagate errors. As a workaround, you can try the following:

  const stream = mongoose.connection.collection('tests').find().sort({ studentId: -1 }).stream(/*{ transform }*/);
  const transformed = stream.pipe(transform);
  let i = 0;
  transformed.on('data', () => { console.log('Doc', ++i); });
  transformed.on('error', err => console.log('Caught', err));
  transformed.on('end', () => console.log('Done'));
  stream.on('error', err => console.log('Caught', err));
vkarpov15 commented 7 months ago

I opened PR in MongoDB Node driver repo, we will see if we can get that merged. In the meantime, use the workaround from my previous post. More info on transformed stream error handling here.