moleculerjs / moleculer

:rocket: Progressive microservices framework for Node.js
https://moleculer.services/
MIT License
6.16k stars 585 forks source link

NodeID conflict on restart when nodeID is set and using Kafka transporter #629

Closed ujwal-setlur closed 4 years ago

ujwal-setlur commented 4 years ago

Prerequisites

Please answer the following questions for yourself before submitting an issue.

Current Behavior

I have set the NodeID in the configuration file, and am using the kafka transporter. When I initially start the service, everything is OK. I have turned off the load balancer, but the Kafka transporter says it doesn't have a built-in load balancer (really?), so the service broker load balancer is turned back on. When I restart the service from the REPL console by issuing quit and issuing npm run dev again, I get this error:

[2019-12-07T21:42:55.325Z] INFO  api-1/BROKER: Moleculer v0.14.0-beta6 is starting...
[2019-12-07T21:42:55.327Z] INFO  api-1/BROKER: Namespace: <not defined>
[2019-12-07T21:42:55.327Z] INFO  api-1/BROKER: Node ID: api-1
[2019-12-07T21:42:55.328Z] INFO  api-1/METRICS: Metrics: Enabled
[2019-12-07T21:42:55.347Z] INFO  api-1/REGISTRY: Strategy: RoundRobinStrategy
[2019-12-07T21:42:55.349Z] INFO  api-1/BROKER: Serializer: JSONSerializer
[2019-12-07T21:42:55.376Z] INFO  api-1/BROKER: Registered 14 internal middleware(s).
[2019-12-07T21:42:55.379Z] INFO  api-1/BROKER: Transporter: KafkaTransporter
[2019-12-07T21:42:55.379Z] WARN  api-1/BROKER: The KafkaTransporter has no built-in balancer. Broker balancer is ENABLED.
[2019-12-07T21:42:55.494Z] INFO  api-1/API: 🚀  Apollo Server ready at http://localhost:3000/
[2019-12-07T21:42:55.497Z] INFO  api-1/TRANSIT: Connecting to the transporter...
[2019-12-07T21:42:55.708Z] INFO  api-1/TRANSPORTER: Kafka client is connected.

[2019-12-07T21:43:19.788Z] FATAL api-1/BROKER: ServiceBroker has detected a nodeID conflict, use unique nodeIDs. ServiceBroker stopped. undefined

Expected Behavior

Service should start up again.

Failure Information

Steps to Reproduce

Please provide detailed steps for reproducing the issue.

  1. Set up a service with 0.14-beta moleculer
  2. Specify a specific node ID
  3. Use kafka transporter
  4. turn off service loadbalancer though the kafka transporter claims it doesn't have built-in loadbalancer, so service load balancer is turned back on.

This doesn't happen with other transporters (amqp, stan)

Reproduce code snippet

"use strict";
import { BrokerOptions, Errors } from "moleculer";

/**
 * Moleculer ServiceBroker configuration file
 *
 * More info about options: https://moleculer.services/docs/0.13/broker.html#Broker-options
 *
 * Overwrite options in production:
 * ================================
 *  You can overwrite any option with environment variables.
 *  For example to overwrite the "logLevel", use `LOGLEVEL=warn` env var.
 *  To overwrite a nested parameter, e.g. retryPolicy.retries, use `RETRYPOLICY_RETRIES=10` env var.
 *
 *  To overwrite broker’s deeply nested default options, which are not presented in "moleculer.config.ts",
 *  via environment variables, use the `MOL_` prefix and double underscore `__` for nested properties in .env file.
 *  For example, to set the cacher prefix to `MYCACHE`, you should declare an env var as `MOL_CACHER__OPTIONS__PREFIX=MYCACHE`.
 */
const brokerConfig: BrokerOptions = {
    // Namespace of nodes to segment your nodes on the same network.
    namespace: "",
    // Unique node identifier. Must be unique in a namespace.
    nodeID: process.env.MOLECULER_NODE_ID,

    // Enable/disable logging or use custom logger. More info: https://moleculer.services/docs/0.13/logging.html
    logger: true,
    // Log level for built-in console logger. Available values: trace, debug, info, warn, error, fatal
    logLevel: "info",
    // Log formatter for built-in console logger. Available values: default, simple, short. It can be also a `Function`.
    logFormatter: "default",
    // Custom object & array printer for built-in console logger.
    logObjectPrinter: null,

    // Define transporter.
    // More info: https://moleculer.services/docs/0.13/networking.html
    transporter: "kafka://kafka:9092",
    // transporter: {
    //   type: "AMQP",
    //  options: {
    //      url: "amqp://rabbitmq:5672",
    //  },
    // },
    // transporter: {
    //   type: "STAN",
    //  options: {
    //      url: "stan://nats-streaming:4222",
    //      clusterID: "byte-cluster"
    //  }
    // },

    // Define a serializer.
    // Available values: "JSON", "Avro", "ProtoBuf", "MsgPack", "Notepack", "Thrift".
    // More info: https://moleculer.services/docs/0.13/networking.html
    serializer: "JSON",

    // Number of milliseconds to wait before reject a request with a RequestTimeout error. Disabled: 0
    requestTimeout: 10 * 1000,

    // Retry policy settings. More info: https://moleculer.services/docs/0.13/fault-tolerance.html#Retry
    retryPolicy: {
        // Enable feature
        enabled: false,
        // Count of retries
        retries: 5,
        // First delay in milliseconds.
        delay: 100,
        // Maximum delay in milliseconds.
        maxDelay: 1000,
        // Backoff factor for delay. 2 means exponential backoff.
        factor: 2,
        // A function to check failed requests.
        check: (err: Errors.MoleculerRetryableError) => err && !!err.retryable,
    },

    // Limit of calling level. If it reaches the limit, broker will throw an MaxCallLevelError error. (Infinite loop protection)
    maxCallLevel: 100,

    // Number of seconds to send heartbeat packet to other nodes.
    heartbeatInterval: 5,
    // Number of seconds to wait before setting node to unavailable status.
    heartbeatTimeout: 15,

    // Tracking requests and waiting for running requests before shutdowning. More info: https://moleculer.services/docs/0.13/fault-tolerance.html
    tracking: {
        // Enable feature
        enabled: false,
        // Number of milliseconds to wait before shutdowning the process
        shutdownTimeout: 5000,
    },

    // Disable built-in request & emit balancer. (Transporter must support it, as well.)
    disableBalancer: true,

    // Settings of Service Registry. More info: https://moleculer.services/docs/0.13/registry.html
    registry: {
        // Define balancing strategy.
        // Available values: "RoundRobin", "Random", "CpuUsage", "Latency"
        strategy: "RoundRobin",
        // Enable local action call preferring.
        preferLocal: true,
    },

    // Settings of Circuit Breaker. More info: https://moleculer.services/docs/0.13/fault-tolerance.html#Circuit-Breaker
    circuitBreaker: {
        // Enable feature
        enabled: false,
        // Threshold value. 0.5 means that 50% should be failed for tripping.
        threshold: 0.5,
        // Minimum request count. Below it, CB does not trip.
        minRequestCount: 20,
        // Number of seconds for time window.
        windowTime: 60,
        // Number of milliseconds to switch from open to half-open state
        halfOpenTime: 10 * 1000,
        // A function to check failed requests.
        check: (err: Errors.MoleculerRetryableError) => err && err.code >= 500,
    },

    // Settings of bulkhead feature. More info: https://moleculer.services/docs/0.13/fault-tolerance.html#Bulkhead
    bulkhead: {
        // Enable feature.
        enabled: false,
        // Maximum concurrent executions.
        concurrency: 10,
        // Maximum size of queue
        maxQueueSize: 100,
    },

    validator: true,

    // Enable metrics function. More info: https://moleculer.services/docs/0.13/metrics.html
    metrics: true,

    // Register internal services ("$node"). More info: https://moleculer.services/docs/0.13/services.html#Internal-services
    internalServices: true,
    // Register internal middlewares. More info: https://moleculer.services/docs/0.13/middlewares.html#Internal-middlewares
    internalMiddlewares: true,

    // Watch the loaded services and hot reload if they changed. You can also enable it in Moleculer Runner with `--hot` argument
    hotReload: true,

    // Register custom middlewares
    middlewares: [],

    // Called after broker created.
    created(broker) {

    },

    // Called after broker starte.
    started(broker) {

    },

    // Called after broker stopped.
    stopped(broker) {

    },

    // Register custom REPL commands.
    replCommands: null,
};

export default brokerConfig;

Context

Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.

Failure Logs

[2019-12-07T21:42:55.325Z] INFO  api-1/BROKER: Moleculer v0.14.0-beta6 is starting...
[2019-12-07T21:42:55.327Z] INFO  api-1/BROKER: Namespace: <not defined>
[2019-12-07T21:42:55.327Z] INFO  api-1/BROKER: Node ID: api-1
[2019-12-07T21:42:55.328Z] INFO  api-1/METRICS: Metrics: Enabled
[2019-12-07T21:42:55.347Z] INFO  api-1/REGISTRY: Strategy: RoundRobinStrategy
[2019-12-07T21:42:55.349Z] INFO  api-1/BROKER: Serializer: JSONSerializer
[2019-12-07T21:42:55.376Z] INFO  api-1/BROKER: Registered 14 internal middleware(s).
[2019-12-07T21:42:55.379Z] INFO  api-1/BROKER: Transporter: KafkaTransporter
[2019-12-07T21:42:55.379Z] WARN  api-1/BROKER: The KafkaTransporter has no built-in balancer. Broker balancer is ENABLED.
[2019-12-07T21:42:55.494Z] INFO  api-1/API: 🚀  Apollo Server ready at http://localhost:3000/
[2019-12-07T21:42:55.497Z] INFO  api-1/TRANSIT: Connecting to the transporter...
[2019-12-07T21:42:55.708Z] INFO  api-1/TRANSPORTER: Kafka client is connected.

[2019-12-07T21:43:19.788Z] FATAL api-1/BROKER: ServiceBroker has detected a nodeID conflict, use unique nodeIDs. ServiceBroker stopped. undefined
ujwal-setlur commented 4 years ago

It doesn't seem to depend on whether i set this.hasBuiltInBalancer = true; or not. This is what I see from kafka:

kafka             | [2019-12-07 22:11:07,580] INFO [GroupCoordinator 1001]: Preparing to rebalance group api-1 in state PreparingRebalance with old generation 4 (__consumer_offsets-28) (reason: Adding new member default-kafka-consumer-13c03448-b6ab-445b-a1cd-5fc0217ba42e with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka             | [2019-12-07 22:11:19,162] INFO [GroupCoordinator 1001]: Member default-kafka-consumer-35adefdc-4171-44e4-928b-b173c8e77b05 in group api-1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
kafka             | [2019-12-07 22:11:19,177] INFO [GroupCoordinator 1001]: Stabilized group api-1 generation 5 (__consumer_offsets-28) (kafka.coordinator.group.GroupCoordinator)
kafka             | [2019-12-07 22:11:19,197] INFO [GroupCoordinator 1001]: Assignment received from leader for group api-1 for generation 5 (kafka.coordinator.group.GroupCoordinator)
kafka             | [2019-12-07 22:11:49,238] INFO [GroupCoordinator 1001]: Member default-kafka-consumer-13c03448-b6ab-445b-a1cd-5fc0217ba42e in group api-1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
kafka             | [2019-12-07 22:11:49,238] INFO [GroupCoordinator 1001]: Preparing to rebalance group api-1 in state PreparingRebalance with old generation 5 (__consumer_offsets-28) (reason: removing member default-kafka-consumer-13c03448-b6ab-445b-a1cd-5fc0217ba42e on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
kafka             | [2019-12-07 22:11:49,240] INFO [GroupCoordinator 1001]: Group api-1 with generation 6 is now empty (__consumer_offsets-28) (kafka.coordinator.group.GroupCoordinator)

the AMQP transporter for RabbitMQ seems to be most stable of them all! Looks like I will use RabbitMQ, but will need to use a separate eventstore then. Oh well, moleculer is still awesome!

AndreMaz commented 4 years ago

the AMQP transporter for RabbitMQ seems to be most stable of them all! Looks like I will use RabbitMQ, but will need to use a separate eventstore then. Oh well, moleculer is still awesome!

So this is happening with other transporters also? Not only with Kafka? Can you create a repro repo?

ujwal-setlur commented 4 years ago

This is happening with Kafka alone. I have other issues with the Stan transporter (my initial choice), but amqp transporter worked straight out of the box for me. I will try to create a repro repo either today or tomorrow

ujwal-setlur commented 4 years ago

@AndreMaz sorry for the delay, but here is a reproduction repo:

https://github.com/ujwal-setlur/moleculer-629-reproduction

icebob commented 4 years ago

I think the problem is that Kafka stores the internal protocol messages (like INFO packet) as well, and the new client receives the old packets from Kafka. We should check the topic settings in Kafka transporter and set that doesn't store these packets.

ujwal-setlur commented 4 years ago

@icebob I agree with your theory. Essentially, when the node comes back up, it is getting its own messages.

icebob commented 4 years ago

Fixed. Could you test it with npm i moleculerjs/moleculer#next?

icebob commented 4 years ago

@ujwal-setlur could you join our Discord chat? I would like to ask you about your project & experiences. Thanks in advance!

ujwal-setlur commented 4 years ago

Sure, I think I joined a week or so ago

ujwal-setlur commented 4 years ago

Joined

ujwal-setlur commented 4 years ago

BTW, I will test the bug fix this week

icebob commented 4 years ago

Released: https://github.com/moleculerjs/moleculer/releases/tag/v0.14.0-rc1