apache / pulsar-client-node

Apache Pulsar NodeJS Client
https://pulsar.apache.org/
Apache License 2.0
148 stars 86 forks source link

Incorrect log message "...Messages were not acked within X time..." for MultiTopicConsumer #371

Closed fabianwikstrom closed 4 months ago

fabianwikstrom commented 7 months ago

Hello. Running a MultiTopicPulsarConsumer we are getting log messages indicating that our messages have not been acked in time even though they have. Specifically, we get the messages:

{"status":"info","time":1711978097503,"pid":22140,"hostname":"ip-10-225-225-162.us-west-2.compute.internal","msg":"[Muti Topics Consumer: TopicName - persistent://public/default/test-EsfccmYHfC-TopicsConsumerFakeName-7daedcc224 - Subscription - sub-test-EsfccmYHfC]: 1 Messages were not acked within 10000 time"}

I've attached a minimally reproducible script below. What's interesting is that if we switch over to a single topic consumer the log messages go away. We believe it could have something to do with the diverging c++ implementation for consumer vs multiTopicConsumer that this client wraps

The script below has a ackTimeOutMs of 10 seconds, but consumes and acknowledges the messages in 5 seconds and should therefore not produce the log message

Pulsar Client Version: 1.10.0

As a final note/nit, there's a spelling error in the log message: Muti Topics Consumer

import { faker } from '@faker-js/faker';
import Pulsar from 'pulsar-client';

process.env.ENVIRONMENT = 'development';
process.env.PULSAR_SERVICE_URL = 'pulsar://localhost:6650';
const PULSAR_TOPIC = `test-${faker.string.alpha(10)}`;

const PULSAR_SUBSCRIPTION = `sub-${PULSAR_TOPIC}`;

const SEND_NUMBER = 5;

async function handleMessage(
  message: Pulsar.Message,
  consumer: Pulsar.Consumer,
): Promise<void> {
  console.log('Received message: ', message.getData().toString());
  await new Promise((resolve) => setTimeout(resolve, 1000));
  await consumer.acknowledgeId(message.getMessageId());
}

async function main() {
  const client = new Pulsar.Client({
    serviceUrl: process.env.PULSAR_SERVICE_URL as string,
    log: (level, _file, _line, message) => {
      switch (level) {
        case Pulsar.LogLevel.DEBUG:
          console.debug(message);
          break;
        case Pulsar.LogLevel.INFO:
          console.info(message);
          break;
        case Pulsar.LogLevel.WARN:
          console.warn(message);
          break;
        case Pulsar.LogLevel.ERROR:
          console.error(message);
          break;
      }
    },
  });

  console.log('Topic: ', PULSAR_TOPIC);
  console.log('Subscription: ', PULSAR_SUBSCRIPTION);

  // Create the main consumer

  const counter = new Map<string, number>();

  const subscriptionType = 'Shared';
  const ackTimeoutMs = 10_000;
  const nAckRedeliverTimeoutMs = 2_000;
  const batchIndexAckEnabled = false;

  const consumerWithListener1 = await client.subscribe({
    topics: [PULSAR_TOPIC], // using topic: PULSAR_TOPIC the log message will not be produced
    subscription: PULSAR_SUBSCRIPTION,
    subscriptionType,
    ackTimeoutMs,
    nAckRedeliverTimeoutMs,
    batchIndexAckEnabled,
    listener: (message, consumer) => handleMessage(message, consumer),
  });

  // Send messages
  const producer = await client.createProducer({ topic: PULSAR_TOPIC });

  for (let i = 0; i < SEND_NUMBER; i += 1) {
    const msg = `test-message-${i}`;
    counter.set(msg, 0);
    await producer.send({ data: Buffer.from(msg) });
  }

  // Sleep 20 seconds to wait for the messages to be processed
  await new Promise((resolve) => setTimeout(resolve, 50000));

  await producer.close();
  await consumerWithListener1.close();
  process.exit(0);
}

void main();
shibd commented 7 months ago

This is cpp client bug, please refer to: https://github.com/apache/pulsar-client-cpp/pull/423

fabianwikstrom commented 7 months ago

@shibd thank you! really appreciate the quick turnaround :)