apache / rocketmq-clients

RocketMQ Clients - Collection of Client Bindings for Apache RocketMQ
https://rocketmq.apache.org/
Apache License 2.0
302 stars 202 forks source link

[Bug] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. #836

Open JanYork opened 18 hours ago

JanYork commented 18 hours ago

Before Creating the Bug Report

Programming Language of the Client

Node.js

Runtime Platform Environment

Macos Node.js 18.17.1 openjdk 17.0.11 2024-04-16 OpenJDK Runtime Environment Homebrew (build 17.0.11+0) OpenJDK 64-Bit Server VM Homebrew (build 17.0.11+0, mixed mode, sharing)

RocketMQ Version of the Client/Server

RocketMQ 5.3.0

Run or Compiler Version

openjdk 17.0.11 2024-04-16 OpenJDK Runtime Environment Homebrew (build 17.0.11+0) OpenJDK 64-Bit Server VM Homebrew (build 17.0.11+0, mixed mode, sharing)

Describe the Bug

I got an inexplicable error. I just started and waited for messages, did nothing, and suddenly an error occurred. I caught it. I am not sure whether I need to ignore this error. It looks more like a bug or a problem that should not exist. It will interrupt my program.

`receive message error InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63) at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15) at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19) at processTicksAndRejections (node:internal/process/task_queues:95:5) at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12) at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) { code: 50001 }

/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81 throw new InternalErrorException(status.code, status.message, requestId); ^ InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63) at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15) at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19) at processTicksAndRejections (node:internal/process/task_queues:95:5) at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12) at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) `

Steps to Reproduce

Start and run the following code in the Nest program, and then try to send a few messages. It should be normal at this time. Then we wait patiently for a while. This error will not be triggered in a short time. It seems that there is a scheduled task that changes some status. In short, we have to wait for a long time.

import { Body, Controller, OnModuleInit, Post } from "@nestjs/common";
import { MessageOptions, Producer, SimpleConsumer } from "rocketmq-client-nodejs";

interface IProducerQuery {
  topic: string;
  tag: string;
  keys?: string;
  body: string;
  group?: string;
  num?: number;
}

@Controller()
export class AppController implements OnModuleInit {
  private producer: Producer;

  async onModuleInit() {
    await this.producerInit();
    await this.consumerInit();
  }

  async producerInit() {
    const producer = new Producer({
      endpoints: "127.0.0.1:8081"
    });
    this.producer = producer;

    await this.producer.startup()
      .catch((e) => {
        console.error("producer startup error", e);
        process.exit(1);
      })
      .then(() => {
        console.log("producer startup success!");
      });
  }

  async consumerInit() {
    const task_to_pool_consumer = new SimpleConsumer({
      consumerGroup: "task-to-pool",
      endpoints: "127.0.0.1:8081",
      subscriptions: new Map().set("task-processing", "task-to-pool")
    });
    await task_to_pool_consumer.startup()
      .catch((e) => {
        console.error("consumer startup error", e);
        process.exit(1);
      })
      .then(() => {
        console.log("task to pool consumer startup success!");
        this.listenForMessages(task_to_pool_consumer, 5);
      });

    const kitchens = ["kitchen1", "kitchen2", "kitchen3", "kitchen4", "kitchen5"];
    const kitchen_consumers: SimpleConsumer[] = [];
    for (const kitchen of kitchens) {
      const task_assign_consumer = new SimpleConsumer({
        consumerGroup: "task-assign",
        endpoints: "127.0.0.1:8081",
        subscriptions: new Map().set("task-processing", kitchen)
      });
      kitchen_consumers.push(task_assign_consumer);
    }
    await Promise.all(kitchen_consumers.map((consumer: SimpleConsumer) => {
      consumer.startup().catch((e) => {
        console.error("consumer startup error", e);
        process.exit(1);
      }).then(() => {
        console.log("kitchen consumer startup success!");
        this.listenForMessages(consumer, 5);
      });
    }));

    const task_to_db_consumer = new SimpleConsumer({
      consumerGroup: "task-to-db",
      endpoints: "127.0.0.1:8081",
      subscriptions: new Map().set("task-write", "task-to-db")
    });
    await task_to_db_consumer.startup()
      .catch((e) => {
        console.error("consumer startup error", e);
        process.exit(1);
      })
      .then(() => {
        console.log("task to db consumer startup success!");
        this.listenForMessages(task_to_db_consumer, 1);
      });

    const task_to_cache_consumer = new SimpleConsumer({
      consumerGroup: "task-to-cache",
      endpoints: "127.0.0.1:8081",
      subscriptions: new Map().set("task-write", "task-to-cache")
    });
    await task_to_cache_consumer.startup()
      .catch((e) => {
        console.error("consumer startup error", e);
        process.exit(1);
      })
      .then(() => {
        console.log("task to cache consumer startup success!");
        this.listenForMessages(task_to_cache_consumer, 1);
      });
  }

  async listenForMessages(consumer: SimpleConsumer, number: number) {
    while (true) {
      const messages = await consumer.receive(number).catch((e) => {
        console.error("receive message error", e);
        throw e;
      });

      if (!messages || messages.length === 0) {
        continue;
      }

      for (const message of messages) {
        console.log(`Received message: ${message.body.toString()}`);
        await consumer.ack(message).catch((e) => {
          console.error("ack message error", e);
          throw e;
        }).then(() => {
          console.log("ack message success!");
        });
      }
    }
  }

  @Post("production")
  async production(@Body() query: IProducerQuery) {
    const options: MessageOptions = {
      topic: query.topic,
      tag: query.tag,
      keys: [query.keys as string] || [Date.now().toString()],
      body: Buffer.from(query.body)
    };
    query.group ? options.messageGroup = query.group : null;

    if (query.num && query.num > 1) {
      for (let i = 0; i < query.num; i++) {
        options.keys = [Date.now().toString()];
        options.body = Buffer.from(query.body + i);

        await this.producer
          .send(options)
          .catch((e) => {
            console.error("producer send message error", e);
          })
          .then((r) => {
            console.log(`producer send message success! receipt -> ${r ? JSON.stringify(r) : "null"}`);
          });
      }
      return;
    }

    await this.producer
      .send(options)
      .catch((e) => {
        console.error("producer send message error", e);
      })
      .then((r) => {
        console.log(`producer send message success! receipt -> ${r ? JSON.stringify(r) : "null"}`);
      });
  }
}

If nothing goes wrong, you should wait until the program has an error:

receive message error InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63)
    at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15)
    at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
    at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12)
    at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) {
  code: 50001
}

/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81
        throw new InternalErrorException(status.code, status.message, requestId);
              ^
InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63)
    at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15)
    at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
    at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12)
    at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24)

This error comes from StatusChecker.check(status) in the receiveMessage method.

This error indicates that when calling RocketMQ to receive messages, Settings.getSubscription() returned null, causing a NullPointerException.

Why does the Subscription data in Settings suddenly disappear? Why does the NullPointerException exist?

What Did You Expect to See?

Continuous, continuous and normal operation.

What Did You See Instead?

receive message error InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63)
    at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15)
    at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
    at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12)
    at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) {
  code: 50001
}

/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81
        throw new InternalErrorException(status.code, status.message, requestId);
              ^
InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63)
    at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15)
    at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
    at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12)
    at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24)

Additional Context

No response

JanYork commented 18 hours ago

@fengmk2 Hello, could I kindly ask you to spare some of your valuable time to take a look at this issue? It seems the error is being thrown deliberately, and I'm having some trouble understanding how to resolve it.

fengmk2 commented 16 hours ago

This is an error returned by the rocketmq server.

JanYork commented 15 hours ago

This is an error returned by the rocketmq server.

Can you tell me why this happens? I didn't do anything to the server, just started it. But this problem can be reproduced and is normal during operation. It only appears suddenly.

JanYork commented 15 hours ago

2024-09-19 19:53:36 INFO ClientHousekeepingScheduledThread1 - clear handle of this client when client unregister. group:task-to-pool, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=JanYorkMacBook-Pro.local@6593@1@m198e8vo, remoteAddress=169.254.95.59:54910, localAddress=169.254.95.59:8081}, clientId=JanYorkMacBook-Pro.local@6593@1@m198e8vo, language=HTTP, version=473, lastUpdateTimestamp=1726746690044] 2024-09-19 19:53:36 INFO ClientHousekeepingScheduledThread1 - remove grpc channel when client unregister. group:task-to-pool, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=JanYorkMacBook-Pro.local@6593@1@m198e8vo, remoteAddress=169.254.95.59:54910, localAddress=169.254.95.59:8081}, clientId=JanYorkMacBook-Pro.local@6593@1@m198e8vo, language=HTTP, version=473, lastUpdateTimestamp=1726746690044], removed:true 2024-09-19 19:53:36 INFO ClientHousekeepingScheduledThread1 - remove remoting channel when client unregister. clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=JanYorkMacBook-Pro.local@6593@1@m198e8vo, remoteAddress=169.254.95.59:54910, localAddress=169.254.95.59:8081}, clientId=JanYorkMacBook-Pro.local@6593@1@m198e8vo, language=HTTP, version=473, lastUpdateTimestamp=1726746690044] 2024-09-19 19:53:36 INFO GrpcClientSettingsManagerCleaner - remove unused grpc client settings. group:null, settings:client_type: SIMPLE_CONSUMER access_point { scheme: IPv4 addresses { host: "127.0.0.1" port: 8081 } } backoff_policy { max_attempts: 17 customized_backoff { next { seconds: 1 } next { seconds: 5 } next { seconds: 10 } next { seconds: 30 } next { seconds: 60 } next { seconds: 120 } next { seconds: 180 } next { seconds: 240 } next { seconds: 300 } next { seconds: 360 } next { seconds: 420 } next { seconds: 480 } next { seconds: 540 } next { seconds: 600 } next { seconds: 1200 } next { seconds: 1800 } next { seconds: 3600 } next { seconds: 7200 } } } request_timeout { seconds: 3 } subscription { group { name: "task-to-pool" } subscriptions { topic { name: "task-processing" } expression { type: TAG expression: "task-to-pool" } } fifo: false receive_batch_size: 32 long_polling_timeout { seconds: 30 } } user_agent { language: NODE_JS version: "1.0.0" platform: "darwin" hostname: "JanYorkMacBook-Pro.local" }

2024-09-19 19:53:49 ERROR GrpcConsumerThreadPool-7 - internal server error java.lang.NullPointerException: Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null at org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63) at org.apache.rocketmq.proxy.grpc.v2.DefaultGrpcMessingActivity.receiveMessage(DefaultGrpcMessingActivity.java:117) at org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication.lambda$receiveMessage$14(GrpcMessagingApplication.java:278) at org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication$GrpcTask.run(GrpcMessagingApplication.java:442) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) 2024-09-19 19:53:49 ERROR GrpcRequestExecutorThread-42 - telemetry on error io.grpc.StatusRuntimeException: CANCELLED: client cancelled at io.grpc.Status.asRuntimeException(Status.java:530) at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor$1.onCancel(GlobalExceptionInterceptor.java:65) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378) at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365) at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840)