druid-io / druid-operator

Druid Kubernetes Operator
Other
205 stars 93 forks source link

Druid Kafka jobs are going to Pending state #295

Open ghost opened 2 years ago

ghost commented 2 years ago

Deployed Druid using the Kubernetes druid operator. We are using a small cluster with a single instance of broker, coordinators, historicals and routers. Using the druid UI we created a Kafka ingestion job. For parallel ingestion, we created 3 tasks. One task is running and the other two tasks are going to a pending state. There is no error in the log files

AdheipSingh commented 2 years ago

this seems like a druid configuration specific issue. please provide sufficient evidence on the operator/k8s side to debug.

ghost commented 2 years ago

@AdheipSingh PFB the configuration for my druid cluster

# This spec only works on a single node kubernetes cluster(e.g. typical k8s cluster setup for dev using kind/minikube or single node AWS EKS cluster etc)
# as it uses local disk as "deep storage".
#
apiVersion: "druid.apache.org/v1alpha1"
kind: "Druid"
metadata:
  name: tiny-cluster
spec:
  image: apache/druid:0.22.1
  # Optionally specify image for all nodes. Can be specify on nodes also
  # imagePullSecrets:
  # - name: tutu
  startScript: /druid.sh
  podLabels:
    environment: stage
    release: alpha
  podAnnotations:
    dummykey: dummyval
  readinessProbe:
    httpGet:
      path: /status/health
      port: 8088
  securityContext:
    fsGroup: 1000
    runAsUser: 1000
    runAsGroup: 1000
  services:
    - spec:
        type: ClusterIP
        clusterIP: None
  commonConfigMountPath: "/opt/druid/conf/druid/cluster/_common"
  jvm.options: |-
    -server
    -XX:MaxDirectMemorySize=10240g
    -Duser.timezone=UTC
    -Dfile.encoding=UTF-8
    -Dlog4j.debug
    -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
    -Djava.io.tmpdir=/druid/data
  log4j.config: |-
    <?xml version="1.0" encoding="UTF-8" ?>
    <Configuration status="WARN">
        <Appenders>
            <Console name="Console" target="SYSTEM_OUT">
                <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
            </Console>
        </Appenders>
        <Loggers>
            <Root level="info">
                <AppenderRef ref="Console"/>
            </Root>
        </Loggers>
    </Configuration>
  common.runtime.properties: |

    # Zookeeper
    druid.zk.service.host=tiny-cluster-zk-0.tiny-cluster-zk
    druid.zk.paths.base=/druid
    druid.zk.service.compress=false

    # Metadata Store
    druid.metadata.storage.type=derby
    druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/druid/data/derbydb/metadata.db;create=true
    druid.metadata.storage.connector.host=localhost
    druid.metadata.storage.connector.port=1527
    druid.metadata.storage.connector.createTables=true

    # Deep Storage
    druid.storage.type=local
    druid.storage.storageDirectory=/druid/deepstorage
    #
    # Extensions
    #
    druid.extensions.loadList=["druid-kafka-indexing-service"]

    #
    # Service discovery
    #
    druid.selectors.indexing.serviceName=druid/overlord
    druid.selectors.coordinator.serviceName=druid/coordinator

    druid.indexer.logs.type=file
    druid.indexer.logs.directory=/druid/data/indexing-logs
    druid.lookup.enableLookupSyncOnStartup=false

  metricDimensions.json: |-
    {
      "query/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer"},
      "query/bytes" : { "dimensions" : ["dataSource", "type"], "type" : "count"},
      "query/node/time" : { "dimensions" : ["server"], "type" : "timer"},
      "query/node/ttfb" : { "dimensions" : ["server"], "type" : "timer"},
      "query/node/bytes" : { "dimensions" : ["server"], "type" : "count"},
      "query/node/backpressure": { "dimensions" : ["server"], "type" : "timer"},
      "query/intervalChunk/time" : { "dimensions" : [], "type" : "timer"},

      "query/segment/time" : { "dimensions" : [], "type" : "timer"},
      "query/wait/time" : { "dimensions" : [], "type" : "timer"},
      "segment/scan/pending" : { "dimensions" : [], "type" : "gauge"},
      "query/segmentAndCache/time" : { "dimensions" : [], "type" : "timer" },
      "query/cpu/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer" },

      "query/count" : { "dimensions" : [], "type" : "count" },
      "query/success/count" : { "dimensions" : [], "type" : "count" },
      "query/failed/count" : { "dimensions" : [], "type" : "count" },
      "query/interrupted/count" : { "dimensions" : [], "type" : "count" },
      "query/timeout/count" : { "dimensions" : [], "type" : "count" },

      "query/cache/delta/numEntries" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/sizeBytes" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/hits" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/misses" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/evictions" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/hitRate" : { "dimensions" : [], "type" : "count", "convertRange" : true },
      "query/cache/delta/averageBytes" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/timeouts" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/errors" : { "dimensions" : [], "type" : "count" },

      "query/cache/total/numEntries" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/sizeBytes" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/hits" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/misses" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/evictions" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/hitRate" : { "dimensions" : [], "type" : "gauge", "convertRange" : true },
      "query/cache/total/averageBytes" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/timeouts" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/errors" : { "dimensions" : [], "type" : "gauge" },

      "ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/events/processed" : { "dimensions" : ["dataSource", "taskType", "taskId"], "type" : "count" },
      "ingest/events/messageGap" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "ingest/rows/output" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/persists/count" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/persists/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
      "ingest/persists/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },
      "ingest/persists/backPressure" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "ingest/persists/failed" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/handoff/failed" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
      "ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },

      "ingest/kafka/lag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "ingest/kafka/maxLag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "ingest/kafka/avgLag" : { "dimensions" : ["dataSource"], "type" : "gauge" },

      "task/success/count" : { "dimensions" : ["dataSource"], "type" : "count" },
      "task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count" },
      "task/running/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },

      "taskSlot/total/count" : { "dimensions" : [], "type" : "gauge" },
      "taskSlot/idle/count" : { "dimensions" : [], "type" : "gauge" },
      "taskSlot/busy/count" : { "dimensions" : [], "type" : "gauge" },
      "taskSlot/lazy/count" : { "dimensions" : [], "type" : "gauge" },
      "taskSlot/blacklisted/count" : { "dimensions" : [], "type" : "gauge" },

      "task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer" },
      "segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
      "segment/moved/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
      "segment/nuked/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },

      "segment/assigned/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/moved/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/dropped/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/deleted/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/unneeded/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/unavailable/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "segment/underReplicated/count" : { "dimensions" : ["dataSource", "tier"], "type" : "gauge" },
      "segment/cost/raw" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/cost/normalization" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/cost/normalized" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/loadQueue/size" : { "dimensions" : ["server"], "type" : "gauge" },
      "segment/loadQueue/failed" : { "dimensions" : ["server"], "type" : "gauge" },
      "segment/loadQueue/count" : { "dimensions" : ["server"], "type" : "gauge" },
      "segment/dropQueue/count" : { "dimensions" : ["server"], "type" : "gauge" },
      "segment/size" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "segment/overShadowed/count" : { "dimensions" : [], "type" : "gauge" },

      "segment/max" : { "dimensions" : [], "type" : "gauge"},
      "segment/used" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge" },
      "segment/usedPercent" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge", "convertRange" : true },
      "segment/pendingDelete" : { "dimensions" : [], "type" : "gauge"},

      "jvm/pool/committed" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge" },
      "jvm/pool/init" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge" },
      "jvm/pool/max" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge" },
      "jvm/pool/used" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge" },
      "jvm/bufferpool/count" : { "dimensions" : ["bufferpoolName"], "type" : "gauge" },
      "jvm/bufferpool/used" : { "dimensions" : ["bufferpoolName"], "type" : "gauge" },
      "jvm/bufferpool/capacity" : { "dimensions" : ["bufferpoolName"], "type" : "gauge" },
      "jvm/mem/init" : { "dimensions" : ["memKind"], "type" : "gauge" },
      "jvm/mem/max" : { "dimensions" : ["memKind"], "type" : "gauge" },
      "jvm/mem/used" : { "dimensions" : ["memKind"], "type" : "gauge" },
      "jvm/mem/committed" : { "dimensions" : ["memKind"], "type" : "gauge" },
      "jvm/gc/count" : { "dimensions" : ["gcName", "gcGen"], "type" : "count" },
      "jvm/gc/cpu" : { "dimensions" : ["gcName", "gcGen"], "type" : "count" },

      "ingest/events/buffered" : { "dimensions" : ["serviceName", "bufferCapacity"], "type" : "gauge"},

      "sys/swap/free" : { "dimensions" : [], "type" : "gauge"},
      "sys/swap/max" : { "dimensions" : [], "type" : "gauge"},
      "sys/swap/pageIn" : { "dimensions" : [], "type" : "gauge"},
      "sys/swap/pageOut" : { "dimensions" : [], "type" : "gauge"},
      "sys/disk/write/count" : { "dimensions" : ["fsDevName"], "type" : "count"},
      "sys/disk/read/count" : { "dimensions" : ["fsDevName"], "type" : "count"},
      "sys/disk/write/size" : { "dimensions" : ["fsDevName"], "type" : "count"},
      "sys/disk/read/size" : { "dimensions" : ["fsDevName"], "type" : "count"},
      "sys/net/write/size" : { "dimensions" : [], "type" : "count"},
      "sys/net/read/size" : { "dimensions" : [], "type" : "count"},
      "sys/fs/used" : { "dimensions" : ["fsDevName", "fsDirName", "fsTypeName", "fsSysTypeName", "fsOptions"], "type" : "gauge"},
      "sys/fs/max" : { "dimensions" : ["fsDevName", "fsDirName", "fsTypeName", "fsSysTypeName", "fsOptions"], "type" : "gauge"},
      "sys/mem/used" : { "dimensions" : [], "type" : "gauge"},
      "sys/mem/max" : { "dimensions" : [], "type" : "gauge"},
      "sys/storage/used" : { "dimensions" : ["fsDirName"], "type" : "gauge"},
      "sys/cpu" : { "dimensions" : ["cpuName", "cpuTime"], "type" : "gauge"},

      "coordinator-segment/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "historical-segment/count" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge" },

      "jetty/numOpenConnections" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/caffeine/total/requests" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/caffeine/total/loadTime" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/caffeine/total/evictionBytes" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/memcached/total" : { "dimensions" : ["[MEM] Reconnecting Nodes (ReconnectQueue)",
        "[MEM] Request Rate: All",
        "[MEM] Average Bytes written to OS per write",
        "[MEM] Average Bytes read from OS per read",
        "[MEM] Response Rate: All (Failure + Success + Retry)",
        "[MEM] Response Rate: Retry",
        "[MEM] Response Rate: Failure",
        "[MEM] Response Rate: Success"],
        "type" : "gauge" },
      "query/cache/caffeine/delta/requests" : { "dimensions" : [], "type" : "count" },
      "query/cache/caffeine/delta/loadTime" : { "dimensions" : [], "type" : "count" },
      "query/cache/caffeine/delta/evictionBytes" : { "dimensions" : [], "type" : "count" },
      "query/cache/memcached/delta" : { "dimensions" : ["[MEM] Reconnecting Nodes (ReconnectQueue)",
        "[MEM] Request Rate: All",
        "[MEM] Average Bytes written to OS per write",
        "[MEM] Average Bytes read from OS per read",
        "[MEM] Response Rate: All (Failure + Success + Retry)",
        "[MEM] Response Rate: Retry",
        "[MEM] Response Rate: Failure",
        "[MEM] Response Rate: Success"],
        "type" : "count" }
    }

  volumeMounts:
    - mountPath: /druid/data
      name: data-volume
    - mountPath: /druid/deepstorage
      name: deepstorage-volume
  volumes:
    - name: data-volume
      emptyDir: {}
    - name: deepstorage-volume
      hostPath:
        path: /tmp/druid/deepstorage
        type: DirectoryOrCreate
  env:
    - name: POD_NAME
      valueFrom:
        fieldRef:
          fieldPath: metadata.name
    - name: POD_NAMESPACE
      valueFrom:
        fieldRef:
          fieldPath: metadata.namespace

  nodes:
    brokers:
      # Optionally specify for running broker as Deployment
      # kind: Deployment
      nodeType: "broker"
      # Optionally specify for broker nodes
      # imagePullSecrets:
      # - name: tutu
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/broker"
      replicas: 1
      runtime.properties: |
        druid.service=druid/broker
        # HTTP server threads
        druid.broker.http.numConnections=5
        druid.server.http.numThreads=10
        # Processing threads and buffers
        druid.processing.buffer.sizeBytes=1
        druid.processing.numMergeBuffers=1
        druid.processing.numThreads=1
        druid.sql.enable=true
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M

    coordinators:
      # Optionally specify for running coordinator as Deployment
      # kind: Deployment
      nodeType: "coordinator"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/master/coordinator-overlord"
      replicas: 1
      runtime.properties: |
        druid.service=druid/coordinator

        # HTTP server threads
        druid.coordinator.startDelay=PT30S
        druid.coordinator.period=PT30S

        # Configure this coordinator to also run as Overlord
        druid.coordinator.asOverlord.enabled=true
        druid.coordinator.asOverlord.overlordService=druid/overlord
        druid.indexer.queue.startDelay=PT30S
        druid.indexer.runner.type=local
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M

    historicals:
      nodeType: "historical"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/historical"
      replicas: 1
      runtime.properties: |
        druid.service=druid/historical
        druid.server.http.numThreads=5
        druid.processing.buffer.sizeBytes=536870912
        druid.processing.numMergeBuffers=1
        druid.processing.numThreads=1

        # Segment storage
        druid.segmentCache.locations=[{\"path\":\"/druid/data/segments\",\"maxSize\":10737418240}]
        druid.server.maxSize=10737418240
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M
    middlemanagers:
      druid.port: 8080
      kind: StatefulSet
      nodeType: middleManager
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/data/middleManager
      env:
        - name: DRUID_XMX
          value: 4096m
        - name: DRUID_XMS
          value: 4096m
        - name: AWS_REGION
          value: ap-southeast-1
      podDisruptionBudgetSpec:
        maxUnavailable: 1
      replicas: 2
      # resources:
      #   limits:
      #     cpu: 8
      #     memory: 8Gi
      #   requests:
      #     cpu: 8
      #     memory: 8Gi
      livenessProbe:
        initialDelaySeconds: 60
        periodSeconds: 5
        failureThreshold: 3
        httpGet:
          path: /status/health
          port: 8080
      readinessProbe:
        initialDelaySeconds: 60
        periodSeconds: 5
        failureThreshold: 3
        httpGet:
          path: /status/health
          port: 8080
      runtime.properties: |
          druid.service=druid/middleManager
          druid.worker.capacity=4
          druid.indexer.runner.javaOpts=-server -Xms2g -Xmx2g -XX:MaxDirectMemorySize=2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
          druid.indexer.task.baseTaskDir=var/druid/task
          # HTTP server threads
          druid.server.http.numThreads=500
          # Processing threads and buffers on Peons
          druid.indexer.fork.property.druid.processing.numMergeBuffers=2
          druid.indexer.fork.property.druid.processing.buffer.sizeBytes=32000000
          druid.indexer.fork.property.druid.processing.numThreads=2
      volumeClaimTemplates:
        -
          metadata:
            name: data-volume
          spec:
            accessModes:
              - ReadWriteOnce
            resources:
              requests:
                storage: 15Gi
      volumeMounts:
        -
          mountPath: /var/druid
          name: data-volume

    routers:
      nodeType: "router"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/router"
      replicas: 1
      runtime.properties: |
        druid.service=druid/router

        # HTTP proxy
        druid.router.http.numConnections=10
        druid.router.http.readTimeout=PT5M
        druid.router.http.numMaxThreads=10
        druid.server.http.numThreads=10

        # Service discovery
        druid.router.defaultBrokerServiceName=druid/broker
        druid.router.coordinatorServiceName=druid/coordinator

        # Management proxy to coordinator / overlord: required for unified web console.
        druid.router.managementProxy.enabled=true       
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M
acherla commented 2 years ago

Your druid.indexer.runner.type=local it should be set to druid.indexer.runner.type=remote to take advantage of the MM peons u configured. This will ensure your tasks are scheduled to the MM peons which have a capacity of 4, ensuring your tasks dont go into a pending state.

Coordinator logs FYI will tell you why tasks arent being scheduled.