apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.18k stars 11.67k forks source link

[Bug] A null pointer error occurs when SimpleConsumer calls the get message method. #8819

Open JanYork opened 1 week ago

JanYork commented 1 week ago

Before Creating the Bug Report

Runtime platform environment

From the official docker image.

RocketMQ version

From the official docker image 5.3.0.

JDK Version

From the official docker image.

Describe the Bug

InternalErrorException: [request-id=undefined, response-code=50001] null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63)
    at Function.check (/Users/muyouzhi/Code/nest-mq/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15)
    at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/nest-mq/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
    at SimpleConsumer.receive (/Users/muyouzhi/Code/nest-mq/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12)
    at consumer (/Users/muyouzhi/Code/nest-mq/src/main.ts:40:22) {
  code: 50001
}

Steps to Reproduce

async function production(message: string) {
  const receipt = await producer.send({
    topic: 'kitchen',
    messageGroup: '1',
    body: Buffer.from(
      JSON.stringify({
        message,
        now: Date(),
      }),
    ),
  });
  console.log(receipt);
}

async function consumer() {
  const simpleConsumer = new SimpleConsumer({
    consumerGroup: '1',
    endpoints: '127.0.0.1:8081',
    subscriptions: new Map().set('kitchen', '*'),
  });
  await simpleConsumer.startup();

  while (true) {
    const messages = await simpleConsumer.receive(10);

    if (!messages.length) {
      continue;
    }

    for (const message of messages) {
      console.log('message=%o', message);
      console.log('body=%o', message.body.toString());
      await simpleConsumer.ack(message);
    }
  }
}
function main() {
    production('hello world')
    consumer()
}
main()

This is almost a very basic operation. I did nothing but run the consumer according to the documentation and keep getting messages. It was normal at the beginning, but after running for 2-3 minutes, this error occurred. I don’t know why this error occurred. Can anyone reply to me if this is a bug? The maintainers are too busy and I have not been able to solve this problem.

What Did You Expect to See?

Normal operation.

What Did You See Instead?

It was working normally at the beginning, but after 2 to 3 minutes, my consumer could not get the message and it would just return an error to me.

Additional Context

No response

kaustubhdeokar commented 1 week ago

Did you check out this ? https://github.com/apache/rocketmq-client-nodejs/tree/master

JanYork commented 1 week ago

Did you check out this ? https://github.com/apache/rocketmq-client-nodejs/tree/master

I'm using a client from rocketmq-client's repository who is based on grpc, I'm using version 5.3.0.

JanYork commented 1 week ago

Did you check out this ? https://github.com/apache/rocketmq-client-nodejs/tree/master

Thank you, I hope to get your help live, I'll add what I know:

I deployed rocketmq version 5.3.0 using docker-compose with NameServe, Broker and Rocketmq-Proxy for grpc protocol support.

I used the client from the nodejs version of the client from Apache's official repository (rocketmq-client).

I followed the documentation for the nodejs client to create a consumer persistence method and a production message method.

At first it was working fine and the persistence was running for about 2~3 minutes with this error I mentioned.

I realized that the error is coming from the server side and not the client side, it seems to be an error message coming back from the broker/proxy.