nats-io / nats.deno

Deno client for NATS, the cloud native messaging system
https://www.nats.io
Apache License 2.0
158 stars 50 forks source link

Memory leak in consumers framework API? #546

Closed albnnc closed 1 year ago

albnnc commented 1 year ago

My team was using js.pullSubscribe, but we found the new API kinda comfortable to work with and decided to update our codebase. However, we were unable to migrate the codebase without troubles.

Consider the following code snippet:

const natsUrl = Deno.env.get("NATS_URL") ?? "";
const nc = await connect({ servers: natsUrl });
const js = nc.jetstream();
const consumer = await js.consumers.get("ENTITY", "ENTITY_CONSUMER");
while (!nc.isClosed()) {
  const msg = await consumer.next();
  msg?.nak();
  // Remove this line to see the leak occurring faster.
  await delay(1_000);
}
Consumer Info ![image](https://github.com/nats-io/nats.deno/assets/20284259/97d5df1c-da31-451b-bcc2-5968d9507943)

Memory will leak dramatically making an app to fail with V8 error.

Is there any principal mistake in the code above? Thanks in advance.

aricart commented 1 year ago

I can verify this is happening/

aricart commented 1 year ago

@albnnc to unblock you please use the consume API like this:

const consumer = await js.consumers.get("messages", "myconsumer");
const iter = await consumer.consume({max_messages: 1});
for await(const m of iter) {
  console.log(m?.info.redeliveryCount);
  m?.nak();
}
aricart commented 1 year ago

The above will do the same, but doesn't trigger the leak - at any one point the consumer will only have one message which is effectively what you are doing with the next() on the loop

aricart commented 1 year ago

@albnnc I have figured out where the issue is, while I make a release, the above suggestion will unblock you. Thank you for finding this.

albnnc commented 1 year ago

@aricart, thanks for the response!

What I need actually is pulling a bunch of messages, limited to a certain maximum count, and process them while blocking the receiving of messages. With consumer.next() I would be able to simply collect messages and, when needed, process them and not to call the next function. With consumer.consume({ ... }), I think, this is not possible. I think that the following would do the trick:

```typescript const consumer = await js.consumers.get("ENTITY", "ENTITY_CONSUMER"); const batchSizeMax = 1_000; let batch: JsMsg[] = []; while (!nc.isClosed()) { const iter = await consumer.fetch({ max_messages: batchSizeMax }); for await (const msg of iter) { batch.push(msg); if (!msg.info.pending) { break; } } if (batch.length) { console.log(`Processing batch of size ${batch.length}`); console.log("redeliveryCount", batch[0].info.redeliveryCount); batch.forEach((v) => v.nak()); batch = []; } else { await delay(1_000); } } ```

However, this code leaks too (but much slower actually). Will it be fixed in the upcoming PR?

aricart commented 1 year ago

@albnnc the leak in fetch would also be fixed by the same PR - but I will make sure (running the tests right now)

If I understand correctly what you want is to retrieve N number of messages that you can assign to workers and can process concurrently. If that is the case, then fetch is what you want. See https://github.com/nats-io/nats.deno/blob/main/jetstream.md#fetching-batch-of-messages

aricart commented 1 year ago

@albnnc yes the PR fixes fetch as well.

albnnc commented 1 year ago

If I understand correctly what you want is to retrieve N number of messages that you can assign to workers and can process concurrently. If that is the case, then fetch is what you want. See https://github.com/nats-io/nats.deno/blob/main/jetstream.md#fetching-batch-of-messages

Yeah, I saw that part of docs. The notable part is that I need to start handling messages immediately, even when there are less than max_messages messages. However, the example from docs will wait for expires timer. That's why I'm checking for msg.info.pending value and breaking the fetch loop if there is no next message.

@albnnc the leak in fetch would also be fixed by the same PR - but I will make sure (running the tests right now)

I can confirm this too. Will wait for release, thanks!

aricart commented 1 year ago

@albnnc I have release new versions of all the clients fixing this issue, if you notice anything else please holler. Thank you for finding this.