banzaicloud / koperator

Oh no! Yet another Apache Kafka operator for Kubernetes
Apache License 2.0
784 stars 195 forks source link

Cluster expansion of the capacity problems #539

Closed lengrongfu closed 3 years ago

lengrongfu commented 3 years ago

Describe the bug Hi, My cluster started with only three nodes, and then expanded one node. However, during the expansion process, the client reported the following warning and disconnected after the timeout period. Is this normal?

[2021-01-16 16:36:10,522] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 1 (/10.14.245.96:8001) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:10,541] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 2 (/10.14.245.96:8002) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:10,656] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 1 (/10.14.245.96:8001) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:10,656] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 2 (/10.14.245.96:8002) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:10,791] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 0 (/10.14.245.96:8000) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:10,905] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 1 (/10.14.245.96:8001) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:10,905] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 2 (/10.14.245.96:8002) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:11,015] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 0 (/10.14.245.96:8000) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:11,126] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 0 (/10.14.245.96:8000) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:11,317] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 1 (/10.14.245.96:8001) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:11,421] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 2 (/10.14.245.96:8002) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:11,532] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 0 (/10.14.245.96:8000) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:11,944] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 0 (/10.14.245.96:8000) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:12,111] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 2 (/10.14.245.96:8002) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:12,256] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 1 (/10.14.245.96:8001) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:12,823] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 0 (/10.14.245.96:8000) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:13,234] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 2 (/10.14.245.96:8002) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:13,489] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 1 (/10.14.245.96:8001) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:13,750] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 0 (/10.14.245.96:8000) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-16 16:36:14,102] WARN [Consumer clientId=consumer-perf-consumer-15459-1, groupId=perf-consumer-15459] Connection to node 2 (/10.14.245.96:8002) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
WARNING: Exiting before consuming the expected number of messages: timeout (10000 ms) exceeded. You can use the --timeout option to increase the timeout.
lengrongfu commented 3 years ago

After the cluster is expanded, the envoy proxy is no longer available. Telnet cannot reach the service port exposed by envoy, but it can be reached by telnet before scale. How to locate this kind of problem.

stoader commented 3 years ago

please describe repro steps also attach your kafkacluster CR, logs and status for each pod in the kafka namespace

lengrongfu commented 3 years ago

Steps to reproduce:

  1. Create 3 brokers using the default CR.
  2. Use sangrenel for continuous production to meet BrokerOverLoaded alter rule.
  3. Make it automatically expand.
  4. Envoy cannot be accessed after successful expansion.

CR status:

Status:
  Alert Count:  0
  Brokers State:
    0:
      Configuration State:  ConfigInSync
      Graceful Action State:
        Cruise Control State:  GracefulUpscaleSucceeded
        Error Message:         CruiseControl not yet ready
      Rack Awareness State:    
    1:
      Configuration State:  ConfigInSync
      Graceful Action State:
        Cruise Control State:  GracefulUpscaleSucceeded
        Error Message:         CruiseControl not yet ready
      Rack Awareness State:    
    2:
      Configuration State:  ConfigInSync
      Graceful Action State:
        Cruise Control State:  GracefulUpscaleSucceeded
        Error Message:         CruiseControl not yet ready
      Rack Awareness State:    
    3:
      Configuration State:  ConfigInSync
      Graceful Action State:
        Task Started:            Sat, 16 Jan 2021 08:37:42 GMT
        Cruise Control State:    GracefulUpscaleSucceeded
        Cruise Control Task Id:  3530fa59-9ae2-4199-b9bb-86d146b924eb
        Error Message:           
      Rack Awareness State:      
    4:
      Configuration State:  ConfigInSync
      Graceful Action State:
        Task Started:            Sat, 16 Jan 2021 08:41:57 GMT
        Cruise Control State:    GracefulUpscaleSucceeded
        Cruise Control Task Id:  820bcd73-7751-4683-b54b-1bde959f1252
        Error Message:           
      Rack Awareness State:      
  Cruise Control Topic Status:   CruiseControlTopicReady
  Rolling Upgrade Status:
    Error Count:   0
    Last Success:  2021-01-15 09:16:13
  State:           ClusterReconciling
Events:            <none>

envoy-external1-kafka pod log :

[2021-01-16 08:40:42.305][7][info][main] [source/server/server.cc:328]   request header map: 608 bytes: :authority,:method,:path,:protocol,:scheme,accept,accept-encoding,access-control-request-method,authorization,cache-control,cdn-loop,connection,content-encoding,content-length,content-type,expect,grpc-accept-encoding,grpc-timeout,if-match,if-modified-since,if-none-match,if-range,if-unmodified-since,keep-alive,origin,pragma,proxy-connection,referer,te,transfer-encoding,upgrade,user-agent,via,x-client-trace-id,x-envoy-attempt-count,x-envoy-decorator-operation,x-envoy-downstream-service-cluster,x-envoy-downstream-service-node,x-envoy-expected-rq-timeout-ms,x-envoy-external-address,x-envoy-force-trace,x-envoy-hedge-on-per-try-timeout,x-envoy-internal,x-envoy-ip-tags,x-envoy-max-retries,x-envoy-original-path,x-envoy-original-url,x-envoy-retriable-header-names,x-envoy-retriable-status-codes,x-envoy-retry-grpc-on,x-envoy-retry-on,x-envoy-upstream-alt-stat-name,x-envoy-upstream-rq-per-try-timeout-ms,x-envoy-upstream-rq-timeout-alt-response,x-envoy-upstream-rq-timeout-ms,x-forwarded-client-cert,x-forwarded-for,x-forwarded-proto,x-ot-span-context,x-request-id
[2021-01-16 08:40:42.305][7][info][main] [source/server/server.cc:328]   request trailer map: 128 bytes: 
[2021-01-16 08:40:42.305][7][info][main] [source/server/server.cc:328]   response header map: 424 bytes: :status,access-control-allow-credentials,access-control-allow-headers,access-control-allow-methods,access-control-allow-origin,access-control-expose-headers,access-control-max-age,age,cache-control,connection,content-encoding,content-length,content-type,date,etag,expires,grpc-message,grpc-status,keep-alive,last-modified,location,proxy-connection,server,transfer-encoding,upgrade,vary,via,x-envoy-attempt-count,x-envoy-decorator-operation,x-envoy-degraded,x-envoy-immediate-health-check-fail,x-envoy-ratelimited,x-envoy-upstream-canary,x-envoy-upstream-healthchecked-cluster,x-envoy-upstream-service-time,x-request-id
[2021-01-16 08:40:42.305][7][info][main] [source/server/server.cc:328]   response trailer map: 152 bytes: grpc-message,grpc-status
[2021-01-16 08:40:42.405][7][info][main] [source/server/server.cc:448] admin address: 0.0.0.0:9901
[2021-01-16 08:40:42.406][7][info][main] [source/server/server.cc:583] runtime: layers:
  - name: base
    static_layer:
      {}
  - name: admin
    admin_layer:
      {}
[2021-01-16 08:40:42.407][7][info][config] [source/server/configuration_impl.cc:95] loading tracing configuration
[2021-01-16 08:40:42.407][7][info][config] [source/server/configuration_impl.cc:70] loading 0 static secret(s)
[2021-01-16 08:40:42.407][7][info][config] [source/server/configuration_impl.cc:76] loading 5 cluster(s)
[2021-01-16 08:40:42.412][7][info][config] [source/server/configuration_impl.cc:80] loading 5 listener(s)
[2021-01-16 08:40:42.498][7][info][config] [source/server/configuration_impl.cc:121] loading stats sink configuration
[2021-01-16 08:40:42.499][7][info][main] [source/server/server.cc:679] starting main dispatch loop
[2021-01-16 08:40:42.503][7][info][runtime] [source/common/runtime/runtime_impl.cc:421] RTDS has finished initialization
[2021-01-16 08:40:42.503][7][info][upstream] [source/common/upstream/cluster_manager_impl.cc:178] cm init: all clusters initialized
[2021-01-16 08:40:42.503][7][info][main] [source/server/server.cc:660] all clusters initialized. initializing init manager
[2021-01-16 08:40:42.503][7][info][config] [source/server/listener_manager_impl.cc:888] all dependencies initialized. starting workers
[2021-01-16 08:40:42.598][7][warning][main] [source/server/server.cc:565] there is no configured limit to the number of allowed active connections. Set a limit via the runtime key overload.global_downstream_max_connections
[2021-01-16 08:55:42.503][7][info][main] [source/server/drain_manager_impl.cc:70] shutting down parent after drain

kafka pod log:

2021-01-16 09:00:04,051] INFO [Log partition=__KafkaCruiseControlPartitionMetricSamples-7, dir=/kafka-logs/kafka] Incremented log start offset to 3348 due to segment deletion (kafka.log.Log)
[2021-01-16 09:01:04,051] INFO [Log partition=__KafkaCruiseControlPartitionMetricSamples-7, dir=/kafka-logs/kafka] Deleting segments LogSegment(baseOffset=2976, size=5672, lastModifiedTime=1610783960811, largestTime=1610783953387) (kafka.log.Log)
[2021-01-16 09:01:04,052] INFO Deleted log /kafka-logs/kafka/__KafkaCruiseControlPartitionMetricSamples-7/00000000000000002976.log.deleted. (kafka.log.LogSegment)
[2021-01-16 09:01:04,052] INFO Deleted offset index /kafka-logs/kafka/__KafkaCruiseControlPartitionMetricSamples-7/00000000000000002976.index.deleted. (kafka.log.LogSegment)
[2021-01-16 09:01:04,052] INFO Deleted time index /kafka-logs/kafka/__KafkaCruiseControlPartitionMetricSamples-7/00000000000000002976.timeindex.deleted. (kafka.log.LogSegment)
[2021-01-16 09:04:34,896] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2021-01-16 09:12:00,061] INFO [Log partition=__KafkaCruiseControlPartitionMetricSamples-5, dir=/kafka-logs/kafka] Incremented log start offset to 2480 due to leader offset increment (kafka.log.Log)
[2021-01-16 09:14:34,896] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2021-01-16 09:15:04,048] INFO [Log partition=__KafkaCruiseControlPartitionMetricSamples-5, dir=/kafka-logs/kafka] Found deletable segments with base offsets [2356] due to retention time 3600000ms breach (kafka.log.Log)
[2021-01-16 09:15:04,049] INFO [Log partition=__KafkaCruiseControlPartitionMetricSamples-5, dir=/kafka-logs/kafka] Rolled new log segment at offset 2480 in 1 ms. (kafka.log.Log)
[2021-01-16 09:15:04,049] INFO [Log partition=__KafkaCruiseControlPartitionMetricSamples-5, dir=/kafka-logs/kafka] Scheduling segments for deletion LogSegment(baseOffset=2356, size=2076, lastModifiedTime=1610784680810, largestTime=1610784673491) (kafka.log.Log)

[2021-01-16 09:16:04,050] INFO [Log partition=__KafkaCruiseControlPartitionMetricSamples-5, dir=/kafka-logs/kafka] Deleting segments LogSegment(baseOffset=2356, size=2076, lastModifiedTime=1610784680810, largestTime=1610784673491) (kafka.log.Log)
[2021-01-16 09:16:04,051] INFO Deleted log /kafka-logs/kafka/__KafkaCruiseControlPartitionMetricSamples-5/00000000000000002356.log.deleted. (kafka.log.LogSegment)
[2021-01-16 09:16:04,051] INFO Deleted offset index /kafka-logs/kafka/__KafkaCruiseControlPartitionMetricSamples-5/00000000000000002356.index.deleted. (kafka.log.LogSegment)
[2021-01-16 09:16:04,051] INFO Deleted time index /kafka-logs/kafka/__KafkaCruiseControlPartitionMetricSamples-5/00000000000000002356.timeindex.deleted. (kafka.log.LogSegment)

kafka-operator pod log:

{"level":"info","ts":"2021-01-16T09:17:28.850Z","logger":"controllers.KafkaCluster.generateCCTopic","msg":"CruiseControl topic has been created by CruiseControl","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol"}
{"level":"info","ts":"2021-01-16T09:17:28.944Z","logger":"controllers.KafkaCluster","msg":"CR status updated","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol","status":"CruiseControlTopicReady"}
{"level":"info","ts":"2021-01-16T09:17:28.945Z","logger":"controllers.KafkaCluster","msg":"Generating capacity config","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol"}
{"level":"info","ts":"2021-01-16T09:17:28.945Z","logger":"controllers.KafkaCluster","msg":"incoming network throughput is not set falling back to default value","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol"}
{"level":"info","ts":"2021-01-16T09:17:28.945Z","logger":"controllers.KafkaCluster","msg":"outgoing network throughput is not set falling back to default value","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol"}
{"level":"info","ts":"2021-01-16T09:17:28.945Z","logger":"controllers.KafkaCluster","msg":"The following brokerCapacity was generated","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol","brokerCapacity":{"brokerId":"0","capacity":{"DISK":{"/kafka-logs/kafka":"10737418240"},"CPU":"400","NW_IN":"125000","NW_OUT":"125000"},"doc":"Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."}}
{"level":"info","ts":"2021-01-16T09:17:28.946Z","logger":"controllers.KafkaCluster","msg":"incoming network throughput is not set falling back to default value","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol"}
{"level":"info","ts":"2021-01-16T09:17:28.946Z","logger":"controllers.KafkaCluster","msg":"outgoing network throughput is not set falling back to default value","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol"}
{"level":"info","ts":"2021-01-16T09:17:28.946Z","logger":"controllers.KafkaCluster","msg":"The following brokerCapacity was generated","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol","brokerCapacity":{"brokerId":"1","capacity":{"DISK":{"/kafka-logs/kafka":"10737418240"},"CPU":"400","NW_IN":"125000","NW_OUT":"125000"},"doc":"Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."}}
{"level":"info","ts":"2021-01-16T09:17:28.946Z","logger":"controllers.KafkaCluster","msg":"incoming network throughput is not set falling back to default value","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol"}
{"level":"info","ts":"2021-01-16T09:17:28.946Z","logger":"controllers.KafkaCluster","msg":"outgoing network throughput is not set falling back to default value","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol"}
{"level":"info","ts":"2021-01-16T09:17:28.946Z","logger":"controllers.KafkaCluster","msg":"The following brokerCapacity was generated","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol","brokerCapacity":{"brokerId":"2","capacity":{"DISK":{"/kafka-logs/kafka":"10737418240"},"CPU":"400","NW_IN":"125000","NW_OUT":"125000"},"doc":"Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."}}
{"level":"info","ts":"2021-01-16T09:17:28.946Z","logger":"controllers.KafkaCluster","msg":"incoming network throughput is not set falling back to default value","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol"}
{"level":"info","ts":"2021-01-16T09:17:28.946Z","logger":"controllers.KafkaCluster","msg":"outgoing network throughput is not set falling back to default value","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol"}
{"level":"info","ts":"2021-01-16T09:17:28.946Z","logger":"controllers.KafkaCluster","msg":"The following brokerCapacity was generated","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol","brokerCapacity":{"brokerId":"3","capacity":{"DISK":{"/kafka-logs/kafka":"10000000000"},"CPU":"150","NW_IN":"125000","NW_OUT":"125000"},"doc":"Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."}}
{"level":"info","ts":"2021-01-16T09:17:28.946Z","logger":"controllers.KafkaCluster","msg":"incoming network throughput is not set falling back to default value","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol"}
{"level":"info","ts":"2021-01-16T09:17:28.946Z","logger":"controllers.KafkaCluster","msg":"outgoing network throughput is not set falling back to default value","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol"}
{"level":"info","ts":"2021-01-16T09:17:28.946Z","logger":"controllers.KafkaCluster","msg":"The following brokerCapacity was generated","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol","brokerCapacity":{"brokerId":"4","capacity":{"DISK":{"/kafka-logs/kafka":"10000000000"},"CPU":"150","NW_IN":"125000","NW_OUT":"125000"},"doc":"Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."}}
{"level":"info","ts":"2021-01-16T09:17:28.946Z","logger":"controllers.KafkaCluster","msg":"Generated capacity config was successful with values: {\n    \"brokerCapacities\": [\n        {\n            \"brokerId\": \"0\",\n            \"capacity\": {\n                \"DISK\": {\n                    \"/kafka-logs/kafka\": \"10737418240\"\n                },\n                \"CPU\": \"400\",\n                \"NW_IN\": \"125000\",\n                \"NW_OUT\": \"125000\"\n            },\n            \"doc\": \"Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB.\"\n        },\n        {\n            \"brokerId\": \"1\",\n            \"capacity\": {\n                \"DISK\": {\n                    \"/kafka-logs/kafka\": \"10737418240\"\n                },\n                \"CPU\": \"400\",\n                \"NW_IN\": \"125000\",\n                \"NW_OUT\": \"125000\"\n            },\n            \"doc\": \"Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB.\"\n        },\n        {\n            \"brokerId\": \"2\",\n            \"capacity\": {\n                \"DISK\": {\n                    \"/kafka-logs/kafka\": \"10737418240\"\n                },\n                \"CPU\": \"400\",\n                \"NW_IN\": \"125000\",\n                \"NW_OUT\": \"125000\"\n            },\n            \"doc\": \"Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB.\"\n        },\n        {\n            \"brokerId\": \"3\",\n            \"capacity\": {\n                \"DISK\": {\n                    \"/kafka-logs/kafka\": \"10000000000\"\n                },\n                \"CPU\": \"150\",\n                \"NW_IN\": \"125000\",\n                \"NW_OUT\": \"125000\"\n            },\n            \"doc\": \"Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB.\"\n        },\n        {\n            \"brokerId\": \"4\",\n            \"capacity\": {\n                \"DISK\": {\n                    \"/kafka-logs/kafka\": \"10000000000\"\n                },\n                \"CPU\": \"150\",\n                \"NW_IN\": \"125000\",\n                \"NW_OUT\": \"125000\"\n            },\n            \"doc\": \"Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB.\"\n        }\n    ]\n}","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","component":"kafka-cruisecontrol"}
{"level":"info","ts":"2021-01-16T09:17:28.950Z","logger":"controllers.KafkaCluster","msg":"ensuring finalizers on kafkacluster","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka"}
{"level":"info","ts":"2021-01-16T09:17:28.991Z","logger":"controllers.KafkaCluster","msg":"CR status updated","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka","status":"ClusterRunning"}
{"level":"info","ts":"2021-01-16T09:17:28.991Z","logger":"controllers.KafkaCluster","msg":"Reconciling KafkaCluster","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka"}
{"level":"info","ts":"2021-01-16T09:17:29.056Z","logger":"controllers.KafkaCluster","msg":"could not update CR state: Operation cannot be fulfilled on kafkaclusters.kafka.banzaicloud.io \"kafka\": the object has been modified; please apply your changes to the latest version and try again","Request.Namespace":"kafka-dev/kafka","Request.Name":"kafka"}
{"level":"error","ts":"2021-01-16T09:17:29.056Z","logger":"controller","msg":"Reconciler error","reconcilerGroup":"kafka.banzaicloud.io","reconcilerKind":"KafkaCluster","controller":"KafkaCluster","name":"kafka","namespace":"kafka-dev","error":"could not update CR state: Operation cannot be fulfilled on kafkaclusters.kafka.banzaicloud.io \"kafka\": the object has been modified; please apply your changes to the latest version and try again","errorVerbose":"Operation cannot be fulfilled on kafkaclusters.kafka.banzaicloud.io \"kafka\": the object has been modified; please apply your changes to the latest version and try again\ncould not update CR state\ngithub.com/banzaicloud/kafka-operator/pkg/k8sutil.UpdateCRStatus\n\t/workspace/pkg/k8sutil/status.go:247\ngithub.com/banzaicloud/kafka-operator/controllers.(*KafkaClusterReconciler).Reconcile\n\t/workspace/controllers/kafkacluster_controller.go:110\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.3/pkg/internal/controller/controller.go:244\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.3/pkg/internal/controller/controller.go:218\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.3/pkg/internal/controller/controller.go:197\nk8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1\n\t/go/pkg/mod/k8s.io/apimachinery@v0.18.9/pkg/util/wait/wait.go:155\nk8s.io/apimachinery/pkg/util/wait.BackoffUntil\n\t/go/pkg/mod/k8s.io/apimachinery@v0.18.9/pkg/util/wait/wait.go:156\nk8s.io/apimachinery/pkg/util/wait.JitterUntil\n\t/go/pkg/mod/k8s.io/apimachinery@v0.18.9/pkg/util/wait/wait.go:133\nk8s.io/apimachinery/pkg/util/wait.Until\n\t/go/pkg/mod/k8s.io/apimachinery@v0.18.9/pkg/util/wait/wait.go:90\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1373","stacktrace":"github.com/go-logr/zapr.(*zapLogger).Error\n\t/go/pkg/mod/github.com/go-logr/zapr@v0.1.1/zapr.go:128\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.3/pkg/internal/controller/controller.go:246\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.3/pkg/internal/controller/controller.go:218\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.3/pkg/internal/controller/controller.go:197\nk8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1\n\t/go/pkg/mod/k8s.io/apimachinery@v0.18.9/pkg/util/wait/wait.go:155\nk8s.io/apimachinery/pkg/util/wait.BackoffUntil\n\t/go/pkg/mod/k8s.io/apimachinery@v0.18.9/pkg/util/wait/wait.go:156\nk8s.io/apimachinery/pkg/util/wait.JitterUntil\n\t/go/pkg/mod/k8s.io/apimachinery@v0.18.9/pkg/util/wait/wait.go:133\nk8s.io/apimachinery/pkg/util/wait.Until\n\t/go/pkg/mod/k8s.io/apimachinery@v0.18.9/pkg/util/wait/wait.go:90"}
lengrongfu commented 3 years ago

I also found that the broker is alive and can be confirmed through the Telenet 9094 port, so I think the problem should be on the envoy lb.

stoader commented 3 years ago

please include the entire kafkacluster CR and the output of kubectl get svc -n kafka

lengrongfu commented 3 years ago
$ kubectl get svc -n kafka
                          AGE
envoy-loadbalancer-external1-kafka   LoadBalancer   172.20.4.36     10.14.245.103   8000:8864/TCP,8001:8498/TCP,8002:8737/TCP,8003:8347/TCP,8004:8036/TCP   30h
kafka-cruisecontrol-svc              ClusterIP      172.20.29.255   <none>          8090/TCP,9020/TCP                                                       30h
kafka-headless                       ClusterIP      None            <none>          29092/TCP,29093/TCP,9020/TCP                                            30h
kafka-operator-dev-alertmanager      ClusterIP      172.20.18.19    <none>          9001/TCP                                                                4d3h
kafka-operator-dev-operator          ClusterIP      172.20.4.105    <none>          443/TCP                                                                 4d3h
$ kubectl get kafkacluster -n kafka
NAME    CLUSTER STATE        CLUSTER ALERT COUNT   LAST SUCCESSFUL UPGRADE   UPGRADE ERROR COUNT   AGE
kafka   ClusterReconciling   0                     2021-01-16 09:46:31       0                     30h
adamantal commented 3 years ago

Hi!

I tried to reproduce the issue, but did not succeed.

A few things that I can think of:

Make it automatically expand.

Did you mean that after you've configured the cr you've waited for the broker to come alive? (so you did not refer to any autoscaling)

could not update CR state: Operation cannot be fulfilled on kafkaclusters.kafka.banzaicloud.io \"kafka\": the object has been modified;

The operator can handle the case of concurrent editing, but wait for the new brokers to come alive before modifying the KafkaCluster cr again to make it sure that this operation finishes successfully.

Could you please send the full Kafka cluster CR yaml for reference? I am curious about the output of the following command:

kubectl get kafkacluster kafka -n kafka -o yaml
lengrongfu commented 3 years ago

Thank you for your help @adamantal I will provide cr to you when I verify this problem again. Regarding the hint that the object has been modified, I think the created envoy service was modified by Kubernetes. I observed a phenomenon that the nodeportport of the envoy service has been reassigned. This process is modifying the envoy service, so it is considered modified by kafkacluster crOver.

lengrongfu commented 3 years ago

@adamantal Hi,The following is the cr yaml description obtained.

apiVersion: kafka.banzaicloud.io/v1beta1
kind: KafkaCluster
metadata:
  annotations:
    meta.helm.sh/release-name: kafka-test
    meta.helm.sh/release-namespace: kafka-test
  creationTimestamp: "2021-01-20T16:24:08Z"
  finalizers:
  - finalizer.kafkaclusters.kafka.banzaicloud.io
  - topics.kafkaclusters.kafka.banzaicloud.io
  - users.kafkaclusters.kafka.banzaicloud.io
  generation: 5
  labels:
    app.kubernetes.io/managed-by: Helm
  managedFields:
  - apiVersion: kafka.banzaicloud.io/v1beta1
    fieldsType: FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .: {}
          f:meta.helm.sh/release-name: {}
          f:meta.helm.sh/release-namespace: {}
        f:labels:
          .: {}
          f:app.kubernetes.io/managed-by: {}
      f:spec:
        .: {}
        f:brokerConfigGroups:
          .: {}
          f:default:
            .: {}
            f:brokerAnnotations:
              .: {}
              f:app.kubernetes.io/part-of: {}
              f:prometheus.io/port: {}
              f:prometheus.io/scrape: {}
            f:kafkaHeapOpts: {}
            f:resourceRequirements:
              .: {}
              f:limits:
                .: {}
                f:cpu: {}
                f:memory: {}
              f:requests:
                .: {}
                f:cpu: {}
                f:memory: {}
            f:serviceAccountName: {}
            f:storageConfigs: {}
        f:clusterImage: {}
        f:cruiseControlConfig:
          .: {}
          f:clusterConfig: {}
          f:config: {}
          f:cruiseControlTaskSpec:
            .: {}
            f:RetryDurationMinutes: {}
          f:image: {}
          f:serviceAccountName: {}
          f:topicConfig:
            .: {}
            f:partitions: {}
            f:replicationFactor: {}
        f:envoyConfig:
          .: {}
          f:annotations:
            .: {}
            f:service.beta.kubernetes.io/cce-load-balancer-internal-vpc: {}
            f:service.beta.kubernetes.io/cce-load-balancer-lb-name: {}
            f:service.beta.kubernetes.io/cce-load-balancer-reserve-lb: {}
          f:image: {}
          f:serviceAccountName: {}
        f:envs: {}
        f:headlessServiceEnabled: {}
        f:ingressController: {}
        f:listenersConfig:
          .: {}
          f:externalListeners: {}
          f:internalListeners: {}
        f:monitoringConfig:
          .: {}
          f:jmxImage: {}
          f:pathToJar: {}
        f:oneBrokerPerNode: {}
        f:readOnlyConfig: {}
        f:rollingUpgradeConfig:
          .: {}
          f:failureThreshold: {}
        f:zkAddresses: {}
        f:zkPath: {}
    manager: Go-http-client
    operation: Update
    time: "2021-01-20T16:24:08Z"
  - apiVersion: kafka.banzaicloud.io/v1beta1
    fieldsType: FieldsV1
    fieldsV1:
      f:metadata:
        f:finalizers: {}
      f:spec:
        f:brokers: {}
        f:disruptionBudget: {}
        f:envoyConfig:
          f:annotations:
            f:banzaicloud.com/last-applied: {}
        f:istioIngressConfig: {}
        f:vaultConfig:
          .: {}
          f:authRole: {}
          f:issuePath: {}
          f:pkiPath: {}
          f:userStore: {}
      f:status:
        .: {}
        f:alertCount: {}
        f:brokersState:
          .: {}
          f:0:
            .: {}
            f:configurationState: {}
            f:gracefulActionState:
              .: {}
              f:cruiseControlState: {}
              f:errorMessage: {}
            f:rackAwarenessState: {}
          f:1:
            .: {}
            f:configurationState: {}
            f:gracefulActionState:
              .: {}
              f:cruiseControlState: {}
              f:errorMessage: {}
            f:rackAwarenessState: {}
          f:2:
            .: {}
            f:configurationState: {}
            f:gracefulActionState:
              .: {}
              f:cruiseControlState: {}
              f:errorMessage: {}
            f:rackAwarenessState: {}
          f:3:
            .: {}
            f:configurationState: {}
            f:gracefulActionState:
              .: {}
              f:TaskStarted: {}
              f:cruiseControlState: {}
              f:cruiseControlTaskId: {}
              f:errorMessage: {}
            f:rackAwarenessState: {}
          f:4:
            .: {}
            f:configurationState: {}
            f:gracefulActionState:
              .: {}
              f:TaskStarted: {}
              f:cruiseControlState: {}
              f:cruiseControlTaskId: {}
              f:errorMessage: {}
            f:rackAwarenessState: {}
        f:cruiseControlTopicStatus: {}
        f:rollingUpgradeStatus:
          .: {}
          f:errorCount: {}
          f:lastSuccess: {}
        f:state: {}
    manager: manager
    operation: Update
    time: "2021-01-21T03:32:15Z"
  name: kafka-test
  namespace: kafka-test
  resourceVersion: "5796230"
  selfLink: /apis/kafka.banzaicloud.io/v1beta1/namespaces/kafka-test/kafkaclusters/kafka-test
  uid: 96d99018-6b43-416f-a2de-91f22ec51c33
spec:
  brokerConfigGroups:
    default:
      brokerAnnotations:
        app.kubernetes.io/part-of: kube-prometheus
        prometheus.io/port: "9020"
        prometheus.io/scrape: "true"
      kafkaHeapOpts: -Xmx4G -Xms4G
      resourceRequirements:
        limits:
          cpu: "4"
          memory: 5G
        requests:
          cpu: "2"
          memory: 4G
      serviceAccountName: kafka-operator
      storageConfigs:
      - mountPath: /kafka-logs
        pvcSpec:
          accessModes:
          - ReadWriteOnce
          resources:
            requests:
              storage: 20Gi
          storageClassName: cds-hp1
  brokers:
  - brokerConfigGroup: default
    id: 0
  - brokerConfigGroup: default
    id: 1
  - brokerConfigGroup: default
    id: 2
  - brokerConfig:
      image: test/kafka:2.13-2.6.0
      storageConfigs:
      - mountPath: /kafka-logs
        pvcSpec:
          accessModes:
          - ReadWriteOnce
          resources:
            requests:
              storage: 10G
          storageClassName: cds-hp1-prepaid
    id: 3
  - brokerConfig:
      image: test/kafka:2.13-2.6.0
      storageConfigs:
      - mountPath: /kafka-logs
        pvcSpec:
          accessModes:
          - ReadWriteOnce
          resources:
            requests:
              storage: 10G
          storageClassName: cds-hp1-prepaid
    id: 4
  clusterImage: test/kafka:2.13-2.6.0
  cruiseControlConfig:
    clusterConfig: |
      {
        "min.insync.replicas": 3
      }
    config: |
      # Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
      #
      # This is an example property file for Kafka Cruise Control. See KafkaCruiseControlConfig for more details.
      # Configuration for the metadata client.
      # =======================================
      # The maximum interval in milliseconds between two metadata refreshes.
      #metadata.max.age.ms=300000
      # Client id for the Cruise Control. It is used for the metadata client.
      #client.id=kafka-cruise-control
      # The size of TCP send buffer bytes for the metadata client.
      #send.buffer.bytes=131072
      # The size of TCP receive buffer size for the metadata client.
      #receive.buffer.bytes=131072
      # The time to wait before disconnect an idle TCP connection.
      #connections.max.idle.ms=540000
      # The time to wait before reconnect to a given host.
      #reconnect.backoff.ms=50
      # The time to wait for a response from a host after sending a request.
      #request.timeout.ms=30000
      # Configurations for the load monitor
      # =======================================
      # The number of metric fetcher thread to fetch metrics for the Kafka cluster
      num.metric.fetchers=1
      # The metric sampler class
      metric.sampler.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler
      # Configurations for CruiseControlMetricsReporterSampler
      metric.reporter.topic.pattern=__CruiseControlMetrics
      # The sample store class name
      sample.store.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore
      # The config for the Kafka sample store to save the partition metric samples
      partition.metric.sample.store.topic=__KafkaCruiseControlPartitionMetricSamples
      # The config for the Kafka sample store to save the model training samples
      broker.metric.sample.store.topic=__KafkaCruiseControlModelTrainingSamples
      # The replication factor of Kafka metric sample store topic
      sample.store.topic.replication.factor=2
      # The config for the number of Kafka sample store consumer threads
      num.sample.loading.threads=8
      # The partition assignor class for the metric samplers
      metric.sampler.partition.assignor.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.DefaultMetricSamplerPartitionAssignor
      # The metric sampling interval in milliseconds
      metric.sampling.interval.ms=120000
      metric.anomaly.detection.interval.ms=180000
      # The partition metrics window size in milliseconds
      partition.metrics.window.ms=300000
      # The number of partition metric windows to keep in memory
      num.partition.metrics.windows=1
      # The minimum partition metric samples required for a partition in each window
      min.samples.per.partition.metrics.window=1
      # The broker metrics window size in milliseconds
      broker.metrics.window.ms=300000
      # The number of broker metric windows to keep in memory
      num.broker.metrics.windows=20
      # The minimum broker metric samples required for a partition in each window
      min.samples.per.broker.metrics.window=1
      # The configuration for the BrokerCapacityConfigFileResolver (supports JBOD and non-JBOD broker capacities)
      capacity.config.file=config/capacity.json
      #capacity.config.file=config/capacityJBOD.json
      # Configurations for the analyzer
      # =======================================
      # The list of goals to optimize the Kafka cluster for with pre-computed proposals
      default.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal
      # The list of supported goals
      goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal
      # The list of supported hard goals
      hard.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
      # The minimum percentage of well monitored partitions out of all the partitions
      min.monitored.partition.percentage=0.95
      # The balance threshold for CPU
      cpu.balance.threshold=1.1
      # The balance threshold for disk
      disk.balance.threshold=1.1
      # The balance threshold for network inbound utilization
      network.inbound.balance.threshold=1.1
      # The balance threshold for network outbound utilization
      network.outbound.balance.threshold=1.1
      # The balance threshold for the replica count
      replica.count.balance.threshold=1.1
      # The capacity threshold for CPU in percentage
      cpu.capacity.threshold=0.8
      # The capacity threshold for disk in percentage
      disk.capacity.threshold=0.8
      # The capacity threshold for network inbound utilization in percentage
      network.inbound.capacity.threshold=0.8
      # The capacity threshold for network outbound utilization in percentage
      network.outbound.capacity.threshold=0.8
      # The threshold to define the cluster to be in a low CPU utilization state
      cpu.low.utilization.threshold=0.0
      # The threshold to define the cluster to be in a low disk utilization state
      disk.low.utilization.threshold=0.0
      # The threshold to define the cluster to be in a low network inbound utilization state
      network.inbound.low.utilization.threshold=0.0
      # The threshold to define the cluster to be in a low disk utilization state
      network.outbound.low.utilization.threshold=0.0
      # The metric anomaly percentile upper threshold
      metric.anomaly.percentile.upper.threshold=90.0
      # The metric anomaly percentile lower threshold
      metric.anomaly.percentile.lower.threshold=10.0
      # How often should the cached proposal be expired and recalculated if necessary
      proposal.expiration.ms=60000
      # The maximum number of replicas that can reside on a broker at any given time.
      max.replicas.per.broker=10000
      # The number of threads to use for proposal candidate precomputing.
      num.proposal.precompute.threads=1
      # the topics that should be excluded from the partition movement.
      #topics.excluded.from.partition.movement
      # Configurations for the executor
      # =======================================
      # The max number of partitions to move in/out on a given broker at a given time.
      num.concurrent.partition.movements.per.broker=10
      # The interval between two execution progress checks.
      execution.progress.check.interval.ms=10000
      # Configurations for anomaly detector
      # =======================================
      # The goal violation notifier class
      anomaly.notifier.class=com.linkedin.kafka.cruisecontrol.detector.notifier.SelfHealingNotifier
      # The metric anomaly finder class
      metric.anomaly.finder.class=com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomalyFinder
      # The anomaly detection interval
      anomaly.detection.interval.ms=10000
      # The goal violation to detect.
      anomaly.detection.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
      # The interested metrics for metric anomaly analyzer.
      metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_MAX,BROKER_PRODUCE_LOCAL_TIME_MS_MEAN,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MAX,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MAX,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_LOG_FLUSH_TIME_MS_MAX,BROKER_LOG_FLUSH_TIME_MS_MEAN
      ## Adjust accordingly if your metrics reporter is an older version and does not produce these metrics.
      #metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_50TH,BROKER_PRODUCE_LOCAL_TIME_MS_999TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_50TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_999TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_50TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_999TH,BROKER_LOG_FLUSH_TIME_MS_50TH,BROKER_LOG_FLUSH_TIME_MS_999TH
      # The zk path to store failed broker information.
      failed.brokers.zk.path=/CruiseControlBrokerList
      # Topic config provider class
      topic.config.provider.class=com.linkedin.kafka.cruisecontrol.config.KafkaTopicConfigProvider
      # The cluster configurations for the KafkaTopicConfigProvider
      cluster.configs.file=config/clusterConfigs.json
      # The maximum time in milliseconds to store the response and access details of a completed user task.
      completed.user.task.retention.time.ms=21600000
      # The maximum time in milliseconds to retain the demotion history of brokers.
      demotion.history.retention.time.ms=86400000
      # The maximum number of completed user tasks for which the response and access details will be cached.
      max.cached.completed.user.tasks=100
      # The maximum number of user tasks for concurrently running in async endpoints across all users.
      max.active.user.tasks=5
      # Enable self healing for all anomaly detectors, unless the particular anomaly detector is explicitly disabled
      self.healing.enabled=true
      # Enable self healing for broker failure detector
      #self.healing.broker.failure.enabled=true
      # Enable self healing for goal violation detector
      #self.healing.goal.violation.enabled=true
      # Enable self healing for metric anomaly detector
      #self.healing.metric.anomaly.enabled=true
      # configurations for the webserver
      # ================================
      # HTTP listen port
      webserver.http.port=9090
      # HTTP listen address
      webserver.http.address=0.0.0.0
      # Whether CORS support is enabled for API or not
      webserver.http.cors.enabled=false
      # Value for Access-Control-Allow-Origin
      webserver.http.cors.origin=http://localhost:8080/
      # Value for Access-Control-Request-Method
      webserver.http.cors.allowmethods=OPTIONS,GET,POST
      # Headers that should be exposed to the Browser (Webapp)
      # This is a special header that is used by the
      # User Tasks subsystem and should be explicitly
      # Enabled when CORS mode is used as part of the
      # Admin Interface
      webserver.http.cors.exposeheaders=User-Task-ID
      # REST API default prefix
      # (dont forget the ending *)
      webserver.api.urlprefix=/kafkacruisecontrol/*
      # Location where the Cruise Control frontend is deployed
      webserver.ui.diskpath=./cruise-control-ui/dist/
      # URL path prefix for UI
      # (dont forget the ending *)
      webserver.ui.urlprefix=/*
      # Time After which request is converted to Async
      webserver.request.maxBlockTimeMs=10000
      # Default Session Expiry Period
      webserver.session.maxExpiryTimeMs=60000
      # Session cookie path
      webserver.session.path=/
      # Server Access Logs
      webserver.accesslog.enabled=true
      # Location of HTTP Request Logs
      webserver.accesslog.path=access.log
      # HTTP Request Log retention days
      webserver.accesslog.retention.days=14
    cruiseControlTaskSpec:
      RetryDurationMinutes: 5
    image: test/cruise-control:2.5.13
    serviceAccountName: kafka-operator
    topicConfig:
      partitions: 12
      replicationFactor: 3
  disruptionBudget: {}
  envoyConfig:
    annotations:
      banzaicloud.com/last-applied: UEsDBBQACAAIAAAAAAAAAAAAAAAAAAAAAAAIAAAAb3JpZ2luYWyskkFvEzEQhf/LnO2Q3S1Vs0fKjQoqhLigCs16XyprXXs1ng0t0f535JBECT1ElTjafv78PWu29ATlnpWp3RLHmJTVp5jLMkM23mHRQXkxTB0kQpEXPr1zDjYk7m3HgaODWB8VEjnYzeioJZUJZN6GCJ2N/ARqCXGTXsp64PXAVpH1rTBBicOG7mAzGwrcIey68Tge3vHxUZAzGcKdz4oI+bzXeCZDO4WfTqilE5vZ0Llr4v74Np7PxUsyj+xwzjCUfkXIV6whiA6Z2h/FzH+HZJ/iIb3oOP5m70Ka+lJ2U5UfqMhQF5IbvhTIRwTo7k7pasilqJJCgBx2Bh97aulTId6GKSuEjiXOrCZfgqvrfrVaVjf2urtq7FV1vbZc97Cral3XcO8r1zQ0P8yG8ghX/nRMon877KmdpAFil2R2Z9TeLJdLQ6MkTS4Faunb7T0ZUpZH6P0xMpt/EdUporqMqF4j6lNEfRlRv0Y0p4jmMqKZH8rUBjhN8p+mTl/GErpL3H/Yz1vZzso67SY7nJ6023me/wQAAP//UEsHCDQdJ2+ZAQAA5QMAAFBLAQIUABQACAAIAAAAAAA0HSdvmQEAAOUDAAAIAAAAAAAAAAAAAAAAAAAAAABvcmlnaW5hbFBLBQYAAAAAAQABADYAAADPAQAAAAA=
      service.beta.kubernetes.io/cce-load-balancer-internal-vpc: "true"
      service.beta.kubernetes.io/cce-load-balancer-lb-name: envoy-lb-kafka-test
      service.beta.kubernetes.io/cce-load-balancer-reserve-lb: "true"
    image: test/envoy:v1.16.2
    serviceAccountName: kafka-operator
  envs:
  - name: JMX_PORT
    value: "8080"
  headlessServiceEnabled: true
  ingressController: envoy
  istioIngressConfig: {}
  listenersConfig:
    externalListeners:
    - accessMethod: LoadBalancer
      containerPort: 9094
      externalStartingPort: 8000
      name: ex
      type: plaintext
    internalListeners:
    - containerPort: 29092
      name: internal
      type: plaintext
      usedForInnerBrokerCommunication: true
    - containerPort: 29093
      name: controller
      type: plaintext
      usedForControllerCommunication: true
      usedForInnerBrokerCommunication: false
  monitoringConfig:
    jmxImage: test/jmx-javaagent:0.14.0
    pathToJar: /opt/jmx_exporter/jmx_prometheus_javaagent-0.14.0.jar
  oneBrokerPerNode: false
  readOnlyConfig: |
    auto.create.topics.enable=false
    cruise.control.metrics.topic.auto.create=true
    cruise.control.metrics.topic.num.partitions=1
    cruise.control.metrics.topic.replication.factor=2
  rollingUpgradeConfig:
    failureThreshold: 1
  vaultConfig:
    authRole: ""
    issuePath: ""
    pkiPath: ""
    userStore: ""
  zkAddresses:
  - zookeeper-client.zookeeper-dev:2181
  zkPath: /kafka-test
status:
  alertCount: 0
  brokersState:
    "0":
      configurationState: ConfigInSync
      gracefulActionState:
        cruiseControlState: GracefulUpscaleSucceeded
        errorMessage: CruiseControl not yet ready
      rackAwarenessState: ""
    "1":
      configurationState: ConfigInSync
      gracefulActionState:
        cruiseControlState: GracefulUpscaleSucceeded
        errorMessage: CruiseControl not yet ready
      rackAwarenessState: ""
    "2":
      configurationState: ConfigInSync
      gracefulActionState:
        cruiseControlState: GracefulUpscaleSucceeded
        errorMessage: CruiseControl not yet ready
      rackAwarenessState: ""
    "3":
      configurationState: ConfigInSync
      gracefulActionState:
        TaskStarted: Thu, 21 Jan 2021 03:09:13 GMT
        cruiseControlState: GracefulUpscaleSucceeded
        cruiseControlTaskId: cd5af38f-69e5-43ec-b58d-bf8225a40313
        errorMessage: ""
      rackAwarenessState: ""
    "4":
      configurationState: ConfigInSync
      gracefulActionState:
        TaskStarted: Thu, 21 Jan 2021 03:15:12 GMT
        cruiseControlState: GracefulUpscaleSucceeded
        cruiseControlTaskId: 9377785c-f00d-49c9-baba-f83f3c624a30
        errorMessage: ""
      rackAwarenessState: ""
  cruiseControlTopicStatus: CruiseControlTopicReady
  rollingUpgradeStatus:
    errorCount: 0
    lastSuccess: ""
  state: ClusterReconciling
lengrongfu commented 3 years ago

@adamantal Hi,I think it is a problem with this code that causes the service resource to be updated all the time, but I did not locate why this is happening. I can temporarily expand the broker normally through the following code modification; I hope you can find this The answer to the question will give you some inspiration. image

// envoy.go

// Reconcile implements the reconcile logic for Envoy
func (r *Reconciler) Reconcile(log logr.Logger) error {
    log = log.WithValues("component", componentName)

    log.V(1).Info("Reconciling")

    if r.KafkaCluster.Spec.ListenersConfig.ExternalListeners != nil && r.KafkaCluster.Spec.GetIngressController() == envoyutils.IngressControllerName {

        for _, eListener := range r.KafkaCluster.Spec.ListenersConfig.ExternalListeners {
            if eListener.GetAccessMethod() == corev1.ServiceTypeLoadBalancer {
                for _, res := range []resources.ResourceWithLogAndExternalListenerConfig{
                    r.loadBalancer,
                    r.configMap,
                    r.deployment,
                } {
                    o := res(log, eListener)
                    var goRun bool = true
                    switch o.(type) {
                    case *corev1.Service:
                        services := &corev1.Service{}
                        name := fmt.Sprintf(envoyutils.EnvoyServiceName, eListener.Name, r.KafkaCluster.GetName())
                        key := types.NamespacedName{
                            Name:      name,
                            Namespace: r.KafkaCluster.Namespace,
                        }
                        err := r.Get(context.Background(), key, services)
                        log.Info("start check envoy service.")
                        if err == nil {
                            existPorts := services.Spec.Ports
                            specPorts := (o.(*corev1.Service)).Spec.Ports
                            if checkEnvoyServiceIsUpdate(existPorts, specPorts, log) {
                                log.Info("skip update envoy service,service not need update.")
                                goRun = false
                            }
                        }
                    }
                    if !goRun {
                        continue
                    }
                    err := k8sutil.Reconcile(log, r.Client, o, r.KafkaCluster)
                    if err != nil {
                        return err
                    }
                }
            }
        }
    }

    log.V(1).Info("Reconciled")

    return nil
}

// checkEnvoyServiceIsUpdate 检查是否需要更新service
// true 不需要
// false 需要
func checkEnvoyServiceIsUpdate(existPorts []corev1.ServicePort, specPorts []corev1.ServicePort, log logr.Logger) bool {
    if len(existPorts) != len(specPorts) {
        return false
    }
    for i, existPort := range existPorts {
        if existPort.Name == "tcp-all-broker" {
            continue
        }
        marshal, _ := json.Marshal(existPort)
        log.Info("existPort :", "json", string(marshal))
        specPort := specPorts[i]
        marshal, _ = json.Marshal(specPort)
        log.Info("specPort : ", "json", string(marshal))
        if existPort.Name != specPort.Name ||
            existPort.Protocol != specPort.Protocol ||
            existPort.Port != specPort.Port ||
            existPort.TargetPort != specPort.TargetPort {
            log.Info("check fail,exist and spec not equal.")
            return false
        }
    }
    return true
}
adamantal commented 3 years ago

Thanks for debugging on your side @lengrongfu

We suspect that there is something in your environment that changes the spec upon submission, that's why it keeps reconciling (the operator reconciles the modified service after it detects the change, and the cycle goes on and on).

Have you managed to run the custom code snippet you pasted? I think it may give some information what fields are changed in the service, so if you have any operator logs, that would be nice. If the protocol port or spec is changed, the reconciler does its jobs, but if there's some webhook for example that only modifies the annotations of the service, then the reconcile is not needed.

Also you can also watch Kubernetes resources with the kubectl get --watch command which may give some insight.

idahoakl commented 3 years ago

I experienced a similar issue with the NodePort value constantly changing on the Envoy load balancer service after modifying any of the service definition within the KafkaCluster resource.

I tracked the issue down to the banzaicloud.com/last-applied, that is added to the Kubernetes service when it is created/updated, being considered as a user supplied annotation and since it isn't a stable annotation causes CheckIfObjectUpdated to return true. A log snippet emitted from https://github.com/banzaicloud/kafka-operator/blob/master/pkg/k8sutil/resource.go#L180 indicates that the patch difference is the banzaicloud.com/last-applied annotation.

"patch": "{\"metadata\":{\"annotations\":{\"banzaicloud.com/last-applied\":\"<scrubbed>\"

The annotation map returned from EnvoyConfig::GetAnnotations() is mutated during the create/update process which explains how the banzaicloud.com/last-applied annotation ends up back on the KafkaCluster resource.

A test build removing the banzaicloud.com/last-applied annotation from the KafkaCluster config before creating the load balancer resource resolves the issue. The banzaicloud.com/last-applied annotation is still added to the KafkaCluster resource due to the above mentioned call to EnvoyConfig::GetAnnotations() but it is no longer considered in the object updated logic and stops the load balancer resource update loop.

What is interesting is that on a small test cluster I didn't observe this behavior at all but on a sizable production cluster I could consistently reproduce the issue. I'm wondering if this has something to do with some caching happening on the Kubernetes event reader where the banzaicloud.com/last-applied on the KafkaCluster resource isn't updated in the cached events which then perpetuates the cycle whereas on a smaller cluster with fewer events this caching doesn't happen.

stoader commented 3 years ago

@idahoakl the envoyConfig.annotations should not be changed on KafkaCluster CR by kafka-operator during an create/update process as kafka-operator is not updating this field.

In order to help further investigate this could you provide the followings:

stoader commented 3 years ago

@idahoakl meanwhile we've found the root cause. This will be fixed in the next release.

adamantal commented 3 years ago

Based on the offline discussion with @stoader I created https://github.com/banzaicloud/kafka-operator/pull/551 where I included the fix for this bug. I also added some context so that it is hopefully clear.

idahoakl commented 3 years ago

@stoader I agree that the annotations should not be changed within the KafkaCluster CR.

This is running using v0.14.0 of the operator. Given that the only repro I was able to create is running within a production cluster I can't post the details here.

Glad to hear this will be fixed in the next release.

stoader commented 3 years ago

@idahoakl given that you're using kafka-operator in production would you mind adding yourself to https://github.com/banzaicloud/kafka-operator/blob/master/ADOPTERS.md