databricks / databricks-sdk-py

Databricks SDK for Python (Beta)
https://databricks-sdk-py.readthedocs.io/
Apache License 2.0
352 stars 117 forks source link

[ISSUE] WorkspaceClient.clusters.ensure_cluster_is_running - Timeout not configurable #691

Open tseader opened 3 months ago

tseader commented 3 months ago

Description When calling WorkspaceClient.clusters.ensure_cluster_is_running, the timeout is not configurable. The timeout is also not cascaded to the successor function calls. This intermittently can result in failures due to slow cluster spin-up time. I am facing long Computer-cluster startup times in my orgs instance of Databricks so configuring this would be very helpful.

Reproduction

client = WorkspaceClient()
cluster_id = "1234"
client.clusters.ensure_cluster_is_running(cluster_id) 
# and 20+ minutes elapses

Expected behavior I expect ability to adjust the timeout period through the constructor

Instead of:

    def ensure_cluster_is_running(self, cluster_id: str) -> None:
        """Ensures that given cluster is running, regardless of the current state"""
        timeout = datetime.timedelta(minutes=20)
        deadline = time.time() + timeout.total_seconds()
        while time.time() < deadline:
            try:
                state = compute.State
                info = self.get(cluster_id)
                if info.state == state.RUNNING:
                    return
                elif info.state == state.TERMINATED:
                    self.start(cluster_id).result()
                    return
                elif info.state == state.TERMINATING:
                    self.wait_get_cluster_terminated(cluster_id)
                    self.start(cluster_id).result()
                    return
                elif info.state in (state.PENDING, state.RESIZING, state.RESTARTING):
                    self.wait_get_cluster_running(cluster_id)
                    return
                elif info.state in (state.ERROR, state.UNKNOWN):
                    raise RuntimeError(f'Cluster {info.cluster_name} is {info.state}: {info.state_message}')
            except DatabricksError as e:
                if e.error_code == 'INVALID_STATE':
                    _LOG.debug(f'Cluster was started by other process: {e} Retrying.')
                    continue
                raise e
            except OperationFailed as e:
                _LOG.debug('Operation failed, retrying', exc_info=e)
        raise TimeoutError(f'timed out after {timeout}')

Would be good to have:

    def ensure_cluster_is_running(self, cluster_id: str, timeout_minutes: int = 20) -> None:
        """Ensures that given cluster is running, regardless of the current state"""
        timeout = datetime.timedelta(minutes=timeout_minutes)
        deadline = time.time() + timeout.total_seconds()
        while time.time() < deadline:
            try:
                state = compute.State
                info = self.get(cluster_id)
                if info.state == state.RUNNING:
                    return
                elif info.state == state.TERMINATED:
                    self.start(cluster_id).result(timeout=timeout) # ADDED TIMEOUT
                    return
                elif info.state == state.TERMINATING:
                    self.wait_get_cluster_terminated(cluster_id, timeout=timeout) #ADDED TIMEOUT
                    self.start(cluster_id).result(timeout=timeout) #ADDED TIMEOUT)
                    return
                elif info.state in (state.PENDING, state.RESIZING, state.RESTARTING):
                    self.wait_get_cluster_running(cluster_id, timeout=timeout) # ADDED TIMEOUT
                    return
                elif info.state in (state.ERROR, state.UNKNOWN):
                    raise RuntimeError(f'Cluster {info.cluster_name} is {info.state}: {info.state_message}')
            except DatabricksError as e:
                if e.error_code == 'INVALID_STATE':
                    _LOG.debug(f'Cluster was started by other process: {e} Retrying.')
                    continue
                raise e
            except OperationFailed as e:
                _LOG.debug('Operation failed, retrying', exc_info=e)
        raise TimeoutError(f'timed out after {timeout}')

Is it a regression? Not that I'm aware of.

Debug Logs n/a

Other Information