mesos / kafka

Apache Kafka on Apache Mesos
Apache License 2.0
414 stars 140 forks source link

Missing broker task attribute 'endpoint' on broker startup / reconciliation #310

Open blakambr opened 7 years ago

blakambr commented 7 years ago

We have hit this issues multiple times where the framework is restarted when brokers are starting up, during reconciliation the framework fails to register an endpoint on that broker as it does not recognise _TASKSTARTING as a valid state.

This eventually leaves the broker state as follows when running:

{
    "id": "0",
    ...
    "task": {
        "id": "kafka-0-20a281db-069a-4885-96c2-53a6cc3db252",
        "slaveId": "c30ad8fa-8a52-45fb-bcf0-29e22140c8a3-S24",
        "executorId": "kafka-0-02690702-ae31-415e-8a0d-44d85d9636d1",
        "hostname": "localhost",
        "attributes": {},
        "state": "running"
    },
    ...
}

As the framework knows the hostname and port (from the mesos offer it receives) that the broker will start up on, wouldn't it make sense to add the endpoint initially when launching the task rather than appending to it after the task has started.

shangd commented 7 years ago

Here is an example log sequence when this issue happens:

2017-07-25 22:22:49,770 INFO           TaskReconciler] Reconciling 1/6 state of broker 22, task kafka-broker-22-c34b93bc-f76a-4595-b40d-486b8e9cfe76
2017-07-25 22:22:49,933 INFO      KafkaMesosScheduler] [statusUpdate] kafka-broker-22-c34b93bc-f76a-4595-b40d-486b8e9cfe76 TASK_STARTING slave:#3-S24 reason:REASON_RECONCILIATION message:Reconciliation: Latest task state
2017-07-25 22:22:49,980 WARN   BrokerLifecycleManager] Got unknown state TASK_STARTING for broker 22

2017-07-25 22:27:49,791 INFO           TaskReconciler] Reconciling 2/6 state of broker 22, task kafka-broker-22-c34b93bc-f76a-4595-b40d-486b8e9cfe76
2017-07-25 22:27:49,797 INFO      KafkaMesosScheduler] [statusUpdate] kafka-broker-22-c34b93bc-f76a-4595-b40d-486b8e9cfe76 TASK_STARTING slave:#3-S24 reason:REASON_RECONCILIATION message:Reconciliation: Latest task state
2017-07-25 22:27:49,799 WARN   BrokerLifecycleManager] Got unknown state TASK_STARTING for broker 22
2017-07-25 22:31:26,908 INFO      KafkaMesosScheduler] [statusUpdate] kafka-broker-22-c34b93bc-f76a-4595-b40d-486b8e9cfe76 TASK_RUNNING slave:#3-S24 data: myhost.com:10011
2017-07-25 22:31:26,908 WARN   BrokerLifecycleManager] Got unknown state TASK_RUNNING for broker 22

2017-07-25 22:32:49,795 INFO           TaskReconciler] Reconciling 3/6 state of broker 22, task kafka-broker-22-c34b93bc-f76a-4595-b40d-486b8e9cfe76
2017-07-25 22:32:49,799 INFO      KafkaMesosScheduler] [statusUpdate] kafka-broker-22-c34b93bc-f76a-4595-b40d-486b8e9cfe76 TASK_RUNNING slave:#3-S24 message:Reconciliation: Latest task state
2017-07-25 22:32:49,799 INFO   BrokerLifecycleManager] Finished reconciling of broker 22, task kafka-broker-22-c34b93bc-f76a-4595-b40d-486b8e9cfe76

https://github.com/mesos/kafka/blob/master/src/scala/main/ly/stealth/mesos/kafka/scheduler/BrokerLifecycleManager.scala#L79 Firstly there is no case matching (Reconciling, TaskStarting) state, so reconciliation attempt 1 failed. Secondly when the mesos task changes from TASK_STARTING to TASK_RUNNING during reconciliation attempt 2, the status update containing the endpoint data got ignored (2017-07-25 22:31:26,908) because the status update wasn't triggered by the reconciliation and did not match case Reconciling(broker, TaskRunning(_)). And finally when attempt 3 succeeded it already missed the only chance to get the endpoint data from the previous status update, so the running broker will now have a missing endpoint attribute.

A potential fix could be adding the endpoint when the task was created without the need to read it from the status update: https://github.com/mesos/kafka/blob/master/src/scala/main/ly/stealth/mesos/kafka/scheduler/mesos/BrokerTaskManager.scala#L84

      val task = Broker.Task(
        id,
        task_.getSlaveId.getValue,
        task_.getExecutor.getExecutorId.getValue,
        offer.getHostname,
        attributes)

      task.endpoint = new Endpoint(task.hostname, reservation.port.toInt)
      task
   }
steveniemitz commented 7 years ago

This is a good catch, and thanks for the investigation.

If I recall, the reason the endpoint isn't set on task creation is to avoid advertising a broker that isn't ready to accept connections yet.

If you wanted you could try adding case Reconciling(broker, TaskStarting(_)) => onStarting(broker, status) to the state machine and see if that fixes it. I think it would.

shangd commented 7 years ago

My concern is that if for whatever reason the status update from the executor failed to be processed by the framework, the endpoint attribute could still end up missing.

2017-07-25 22:31:26,908 INFO      KafkaMesosScheduler] [statusUpdate] kafka-broker-22-c34b93bc-f76a-4595-b40d-486b8e9cfe76 TASK_RUNNING slave:#3-S24 data: myhost.com:10011

Currently the only way to populate the endpoint is to get it from the above message which is only sent once from the executor when a broker become ready. This is very fragile and when it fails the endpoint is lost forever until the broker restart, reconciliation or restarting the framework won't recover the endpoint info.

I think adding the extra match case should work, in addition I'd still like to generate the endpoint on task creation, since clients could look at the state attribute to decide if the broker is ready.