apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.22k stars 11.68k forks source link

[Bug] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. #8714

Open JanYork opened 1 month ago

JanYork commented 1 month ago

Before Creating the Bug Report

Runtime platform environment

Macos Node.js 18.17.1

RocketMQ version

RocketMQ 5.3.0

JDK Version

openjdk 17.0.11 2024-04-16 OpenJDK Runtime Environment Homebrew (build 17.0.11+0) OpenJDK 64-Bit Server VM Homebrew (build 17.0.11+0, mixed mode, sharing)

Describe the Bug

I got an inexplicable error. I just started and waited for messages, did nothing, and suddenly an error occurred. I caught it. I am not sure whether I need to ignore this error. It looks more like a bug or a problem that should not exist. It will interrupt my program.

`receive message error InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63) at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15) at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19) at processTicksAndRejections (node:internal/process/task_queues:95:5) at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12) at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) { code: 50001 }

/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81 throw new InternalErrorException(status.code, status.message, requestId); ^ InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63) at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15) at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19) at processTicksAndRejections (node:internal/process/task_queues:95:5) at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12) at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) `

Steps to Reproduce

Start and run the following code in the Nest program, and then try to send a few messages. It should be normal at this time. Then we wait patiently for a while. This error will not be triggered in a short time. It seems that there is a scheduled task that changes some status. In short, we have to wait for a long time.

import { Body, Controller, OnModuleInit, Post } from "@nestjs/common";
import { MessageOptions, Producer, SimpleConsumer } from "rocketmq-client-nodejs";

interface IProducerQuery {
  topic: string;
  tag: string;
  keys?: string;
  body: string;
  group?: string;
  num?: number;
}

@Controller()
export class AppController implements OnModuleInit {
  private producer: Producer;

  async onModuleInit() {
    await this.producerInit();
    await this.consumerInit();
  }

  async producerInit() {
    const producer = new Producer({
      endpoints: "127.0.0.1:8081"
    });
    this.producer = producer;

    await this.producer.startup()
      .catch((e) => {
        console.error("producer startup error", e);
        process.exit(1);
      })
      .then(() => {
        console.log("producer startup success!");
      });
  }

  async consumerInit() {
    const task_to_pool_consumer = new SimpleConsumer({
      consumerGroup: "task-to-pool",
      endpoints: "127.0.0.1:8081",
      subscriptions: new Map().set("task-processing", "task-to-pool")
    });
    await task_to_pool_consumer.startup()
      .catch((e) => {
        console.error("consumer startup error", e);
        process.exit(1);
      })
      .then(() => {
        console.log("task to pool consumer startup success!");
        this.listenForMessages(task_to_pool_consumer, 5);
      });

    const kitchens = ["kitchen1", "kitchen2", "kitchen3", "kitchen4", "kitchen5"];
    const kitchen_consumers: SimpleConsumer[] = [];
    for (const kitchen of kitchens) {
      const task_assign_consumer = new SimpleConsumer({
        consumerGroup: "task-assign",
        endpoints: "127.0.0.1:8081",
        subscriptions: new Map().set("task-processing", kitchen)
      });
      kitchen_consumers.push(task_assign_consumer);
    }
    await Promise.all(kitchen_consumers.map((consumer: SimpleConsumer) => {
      consumer.startup().catch((e) => {
        console.error("consumer startup error", e);
        process.exit(1);
      }).then(() => {
        console.log("kitchen consumer startup success!");
        this.listenForMessages(consumer, 5);
      });
    }));

    const task_to_db_consumer = new SimpleConsumer({
      consumerGroup: "task-to-db",
      endpoints: "127.0.0.1:8081",
      subscriptions: new Map().set("task-write", "task-to-db")
    });
    await task_to_db_consumer.startup()
      .catch((e) => {
        console.error("consumer startup error", e);
        process.exit(1);
      })
      .then(() => {
        console.log("task to db consumer startup success!");
        this.listenForMessages(task_to_db_consumer, 1);
      });

    const task_to_cache_consumer = new SimpleConsumer({
      consumerGroup: "task-to-cache",
      endpoints: "127.0.0.1:8081",
      subscriptions: new Map().set("task-write", "task-to-cache")
    });
    await task_to_cache_consumer.startup()
      .catch((e) => {
        console.error("consumer startup error", e);
        process.exit(1);
      })
      .then(() => {
        console.log("task to cache consumer startup success!");
        this.listenForMessages(task_to_cache_consumer, 1);
      });
  }

  async listenForMessages(consumer: SimpleConsumer, number: number) {
    while (true) {
      const messages = await consumer.receive(number).catch((e) => {
        console.error("receive message error", e);
        throw e;
      });

      if (!messages || messages.length === 0) {
        continue;
      }

      for (const message of messages) {
        console.log(`Received message: ${message.body.toString()}`);
        await consumer.ack(message).catch((e) => {
          console.error("ack message error", e);
          throw e;
        }).then(() => {
          console.log("ack message success!");
        });
      }
    }
  }

  @Post("production")
  async production(@Body() query: IProducerQuery) {
    const options: MessageOptions = {
      topic: query.topic,
      tag: query.tag,
      keys: [query.keys as string] || [Date.now().toString()],
      body: Buffer.from(query.body)
    };
    query.group ? options.messageGroup = query.group : null;

    if (query.num && query.num > 1) {
      for (let i = 0; i < query.num; i++) {
        options.keys = [Date.now().toString()];
        options.body = Buffer.from(query.body + i);

        await this.producer
          .send(options)
          .catch((e) => {
            console.error("producer send message error", e);
          })
          .then((r) => {
            console.log(`producer send message success! receipt -> ${r ? JSON.stringify(r) : "null"}`);
          });
      }
      return;
    }

    await this.producer
      .send(options)
      .catch((e) => {
        console.error("producer send message error", e);
      })
      .then((r) => {
        console.log(`producer send message success! receipt -> ${r ? JSON.stringify(r) : "null"}`);
      });
  }
}

If nothing goes wrong, you should wait until the program has an error:

receive message error InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63)
    at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15)
    at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
    at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12)
    at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) {
  code: 50001
}

/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81
        throw new InternalErrorException(status.code, status.message, requestId);
              ^
InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63)
    at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15)
    at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
    at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12)
    at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24)

This error comes from StatusChecker.check(status) in the receiveMessage method.

This error indicates that when calling RocketMQ to receive messages, Settings.getSubscription() returned null, causing a NullPointerException.

Why does the Subscription data in Settings suddenly disappear? Why does the NullPointerException exist?

What Did You Expect to See?

Continuous, continuous and normal operation

What Did You See Instead?

receive message error InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63)
    at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15)
    at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
    at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12)
    at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) {
  code: 50001
}

/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81
        throw new InternalErrorException(status.code, status.message, requestId);
              ^
InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63)
    at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15)
    at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
    at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/rocketmq-client-nodejs@1.0.0/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12)
    at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24)

Additional Context

No response

JanYork commented 1 month ago

proxy logs:

2024-09-19 19:53:36 INFO ClientHousekeepingScheduledThread1 - clear handle of this client when client unregister. group:task-to-pool, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=JanYorkMacBook-Pro.local@6593@1@m198e8vo, remoteAddress=169.254.95.59:54910, localAddress=169.254.95.59:8081}, clientId=JanYorkMacBook-Pro.local@6593@1@m198e8vo, language=HTTP, version=473, lastUpdateTimestamp=1726746690044]
2024-09-19 19:53:36 INFO ClientHousekeepingScheduledThread1 - remove grpc channel when client unregister. group:task-to-pool, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=JanYorkMacBook-Pro.local@6593@1@m198e8vo, remoteAddress=169.254.95.59:54910, localAddress=169.254.95.59:8081}, clientId=JanYorkMacBook-Pro.local@6593@1@m198e8vo, language=HTTP, version=473, lastUpdateTimestamp=1726746690044], removed:true
2024-09-19 19:53:36 INFO ClientHousekeepingScheduledThread1 - remove remoting channel when client unregister. clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=JanYorkMacBook-Pro.local@6593@1@m198e8vo, remoteAddress=169.254.95.59:54910, localAddress=169.254.95.59:8081}, clientId=JanYorkMacBook-Pro.local@6593@1@m198e8vo, language=HTTP, version=473, lastUpdateTimestamp=1726746690044]
2024-09-19 19:53:36 INFO GrpcClientSettingsManagerCleaner - remove unused grpc client settings. group:null, settings:client_type: SIMPLE_CONSUMER
access_point {
  scheme: IPv4
  addresses {
    host: "127.0.0.1"
    port: 8081
  }
}
backoff_policy {
  max_attempts: 17
  customized_backoff {
    next {
      seconds: 1
    }
    next {
      seconds: 5
    }
    next {
      seconds: 10
    }
    next {
      seconds: 30
    }
    next {
      seconds: 60
    }
    next {
      seconds: 120
    }
    next {
      seconds: 180
    }
    next {
      seconds: 240
    }
    next {
      seconds: 300
    }
    next {
      seconds: 360
    }
    next {
      seconds: 420
    }
    next {
      seconds: 480
    }
    next {
      seconds: 540
    }
    next {
      seconds: 600
    }
    next {
      seconds: 1200
    }
    next {
      seconds: 1800
    }
    next {
      seconds: 3600
    }
    next {
      seconds: 7200
    }
  }
}
request_timeout {
  seconds: 3
}
subscription {
  group {
    name: "task-to-pool"
  }
  subscriptions {
    topic {
      name: "task-processing"
    }
    expression {
      type: TAG
      expression: "task-to-pool"
    }
  }
  fifo: false
  receive_batch_size: 32
  long_polling_timeout {
    seconds: 30
  }
}
user_agent {
  language: NODE_JS
  version: "1.0.0"
  platform: "darwin"
  hostname: "JanYorkMacBook-Pro.local"
}

2024-09-19 19:53:49 ERROR GrpcConsumerThreadPool-7 - internal server error
java.lang.NullPointerException: Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null
        at org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63)
        at org.apache.rocketmq.proxy.grpc.v2.DefaultGrpcMessingActivity.receiveMessage(DefaultGrpcMessingActivity.java:117)
        at org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication.lambda$receiveMessage$14(GrpcMessagingApplication.java:278)
        at org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication$GrpcTask.run(GrpcMessagingApplication.java:442)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
2024-09-19 19:53:49 ERROR GrpcRequestExecutorThread-42 - telemetry on error
io.grpc.StatusRuntimeException: CANCELLED: client cancelled
        at io.grpc.Status.asRuntimeException(Status.java:530)
        at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291)
        at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor$1.onCancel(GlobalExceptionInterceptor.java:65)
        at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
        at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378)
        at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365)
        at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)