lithops-cloud / lithops

A multi-cloud framework for big data analytics and embarrassingly parallel jobs, that provides an universal API for building parallel applications in the cloud ☁️🚀
http://lithops.cloud
Apache License 2.0
317 stars 105 forks source link

Race condition in storage monitor after re-running task #1285

Closed tomwhite closed 6 months ago

tomwhite commented 6 months ago

I am seeing the following error which looks like a race condition:

``` tests/utils.py:42: in run computed_result = result.compute( ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/site-packages/xarray/core/dataset.py:1013: in compute return new.load(**kwargs) ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/site-packages/xarray/core/dataset.py:845: in load evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute( ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/site-packages/cubed_xarray/cubedmanager.py:65: in compute return compute(*data, **kwargs) ../cubed/cubed/core/array.py:282: in compute plan.execute( ../cubed/cubed/core/plan.py:212: in execute executor.execute_dag( ../cubed/cubed/runtime/executors/lithops.py:292: in execute_dag execute_dag( ../cubed/cubed/runtime/executors/lithops.py:247: in execute_dag for _, stats in map_unordered( ../cubed/cubed/runtime/executors/lithops.py:136: in map_unordered finished, pending = lithops_function_executor.wait( ../cubed/cubed/runtime/executors/lithops_retries.py:153: in wait done, pending = self.executor.wait( ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/site-packages/lithops/executors.py:456: in wait raise e ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/site-packages/lithops/executors.py:431: in wait wait(fs=futures, ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/site-packages/lithops/wait.py:165: in wait raise e ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/site-packages/lithops/wait.py:139: in wait _get_executor_data(fs, executor_data, pbar=pbar, ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/site-packages/lithops/wait.py:296: in _get_executor_data list(pool.map(get_status, fs_to_wait_on)) ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/concurrent/futures/_base.py:619: in result_iterator yield _result_or_cancel(fs.pop()) ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/concurrent/futures/_base.py:317: in _result_or_cancel return fut.result(timeout) ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/concurrent/futures/_base.py:449: in result return self.__get_result() ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/concurrent/futures/_base.py:401: in __get_result raise self._exception ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/concurrent/futures/thread.py:58: in run result = self.fn(*self.args, **self.kwargs) ../../miniconda3/envs/cubed-benchmarks-lithops-gcp/lib/python3.11/site-packages/lithops/wait.py:290: in get_status f.status(throw_except=throw_except, internal_storage=exec_data.internal_storage) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = , throw_except = False, internal_storage = check_only = False def status(self, throw_except=True, internal_storage=None, check_only=False): """ Return the status returned by the call. If the call raised an exception, this method will raise the same exception If the future is cancelled before completing then CancelledError will be raised. :param check_only: Return None immediately if job is not complete. Default False. :param throw_except: Reraise exception if call raised. Default true. :param internal_storage: Storage handler to poll cloud storage. Default None. :return: Result of the call. :raises CancelledError: If the job is cancelled before completed. :raises TimeoutError: If job is not complete after `timeout` seconds. """ if self._state == ResponseFuture.State.New: raise ValueError("task not yet invoked") if self.success or self.done: return self._call_status if self._call_status is None or self._call_status['type'] == '__init__': if internal_storage is None: internal_storage = InternalStorage(self._storage_config) check_storage_path(internal_storage.get_storage_config(), self._storage_path) self._call_status = internal_storage.get_call_status(self.executor_id, self.job_id, self.call_id) self._status_query_count += 1 if check_only: return self._call_status while self._call_status is None: time.sleep(self.GET_RESULT_SLEEP_SECS) self._call_status = internal_storage.get_call_status(self.executor_id, self.job_id, self.call_id) self._status_query_count += 1 self._host_status_done_tstamp = time.time() self.stats['host_status_done_tstamp'] = self._host_status_done_tstamp or time.time() self.stats['host_status_query_count'] = self._status_query_count self.activation_id = self._call_status['activation_id'] if 'logs' in self._call_status: self.logs = zlib.decompress(base64.b64decode(self._call_status['logs'].encode())).decode() job_key = create_job_key(self.executor_id, self.job_id) log_file = os.path.join(LOGS_DIR, job_key + '.log') header = "Activation: '{}' ({})\n[\n".format(self.runtime_name, self.activation_id) tail = ']\n\n' output = self.logs.replace('\r', '').replace('\n', '\n ', self.logs.count('\n') - 1) with open(log_file, 'a') as lf: lf.write(header + ' ' + output + tail) with open(FN_LOG_FILE, 'a') as lf: lf.write(header + ' ' + output + tail) for key in self._call_status: if any(key.startswith(ss) for ss in ['func', 'host', 'worker']): self.stats[key] = self._call_status[key] > self.stats['worker_exec_time'] = round(self.stats['worker_end_tstamp'] - self.stats['worker_start_tstamp'], 8) E KeyError: 'worker_end_tstamp' ```

It's triggered by running a large Cubed workload on Lithops, but I don't think it's Cubed-specific. It looks like the following is what's happening:

  1. A task hits its timeout.
  2. Cubed schedules a retry task using the same FunctionExecutor.
  3. The retry task completes (quickly).
  4. Lithops stops the storage monitor.
  5. A status report for the timed out task fails with the above exception.

Here's a part of the Lithops logs showing these events. (I have patched Lithops to not raise the exception, but instead print "unknown" when the worker_end_tstamp key is missing.)

2024-03-26 09:28:12,451 [DEBUG] future.py:233 -- ExecutorID 8b468a-0 | JobID M002 - Got status from call 04275 - Activation ID: 57b9c78d2ccf4bef9ca1a72d57472376 - Time: 173.92 seconds
2024-03-26 09:28:12,454 [WARNING] future.py:258 -- ExecutorID 8b468a-0 | JobID M002 - CallID: 04275 - There was an exception - Activation ID: 57b9c78d2ccf4bef9ca1a72d57472376 - FSTimeoutError
2024-03-26 09:28:12,455 [INFO] invokers.py:107 -- ExecutorID 8b468a-0 | JobID M004 - Selected Runtime: cubed-runtime-dev - 2048MB
2024-03-26 09:28:12,455 [DEBUG] gcp_functions.py:462 -- Runtime key: gcp_functions/3.1.2/lithops-387809/europe-west2/lithops-worker-cubed-runtime-dev-312-fb77adaeb7
2024-03-26 09:28:12,455 [DEBUG] storage.py:470 -- Runtime metadata found in local memory cache
2024-03-26 09:28:12,456 [DEBUG] job.py:242 -- ExecutorID 8b468a-0 | JobID M004 - Serializing function and data
2024-03-26 09:28:12,458 [DEBUG] serialize.py:73 -- Referenced Modules: cubed, functools
2024-03-26 09:28:12,463 [DEBUG] module_dependency.py:109 -- Module 'cubed' is already installed in the runtime, skipping
2024-03-26 09:28:12,463 [DEBUG] module_dependency.py:109 -- Module 'functools' is already installed in the runtime, skipping
2024-03-26 09:28:12,463 [DEBUG] serialize.py:118 -- Modules to transmit: None
2024-03-26 09:28:12,464 [DEBUG] job.py:284 -- ExecutorID 8b468a-0 | JobID M004 - Function and modules found in local cache
2024-03-26 09:28:12,464 [DEBUG] job.py:316 -- ExecutorID 8b468a-0 | JobID M004 - Data per activation is < 8.0KiB. Passing data through invocation payload
2024-03-26 09:28:12,464 [INFO] invokers.py:172 -- ExecutorID 8b468a-0 | JobID M004 - Starting function invocation: <lambda>() - Total: 1 activations
2024-03-26 09:28:12,464 [DEBUG] invokers.py:177 -- ExecutorID 8b468a-0 | JobID M004 - Worker processes: 1 - Chunksize: 1
2024-03-26 09:28:12,464 [DEBUG] invokers.py:451 -- ExecutorID 8b468a-0 | JobID M004 - Reached maximum 1000 workers, queuing 1 function activations
2024-03-26 09:28:12,464 [INFO] invokers.py:208 -- ExecutorID 8b468a-0 | JobID M004 - View execution logs at /private/var/folders/9j/h1v35g4166z6zt816fq7wymc0000gn/T/lithops-tom/logs/8b468a-0-M004.log
2024-03-26 09:28:12,464 [INFO] wait.py:105 -- ExecutorID 8b468a-0 - Waiting for any of 4 function activations to complete
2024-03-26 09:28:12,465 [DEBUG] gcp_functions.py:387 -- ExecutorID 8b468a-0 | JobID M004 - Invoking function
2024-03-26 09:28:12,595 [DEBUG] invokers.py:371 -- ExecutorID 8b468a-0 | JobID M004 - Calls 00000 invoked (0.130s) - Activation ID: 10760232389452430
2024-03-26 09:28:16,787 [DEBUG] monitor.py:142 -- ExecutorID 8b468a-0 - Pending: 0 - Running: 4 - Done: 9998
2024-03-26 09:28:22,235 [DEBUG] monitor.py:142 -- ExecutorID 8b468a-0 - Pending: 0 - Running: 3 - Done: 9999
2024-03-26 09:28:22,528 [DEBUG] future.py:233 -- ExecutorID 8b468a-0 | JobID M004 - Got status from call 00000 - Activation ID: e95e551467d84b25935a56f5913e6bcc - Time: 4.78 seconds
2024-03-26 09:28:22,529 [INFO] wait.py:105 -- ExecutorID 8b468a-0 - Waiting for any of 3 function activations to complete
2024-03-26 09:29:09,457 [DEBUG] monitor.py:142 -- ExecutorID 8b468a-0 - Pending: 0 - Running: 0 - Done: 10002
2024-03-26 09:29:09,460 [DEBUG] monitor.py:455 -- ExecutorID 8b468a-0 - Storage job monitor finished
2024-03-26 09:29:09,817 [DEBUG] future.py:233 -- ExecutorID 8b468a-0 | JobID M001 - Got status from call 04260 - Activation ID: f33131977bf94701a786050bc14fe709 - Time: unknown seconds
2024-03-26 09:29:09,818 [WARNING] future.py:258 -- ExecutorID 8b468a-0 | JobID M001 - CallID: 04260 - There was an exception - Activation ID: f33131977bf94701a786050bc14fe709 - TimeoutError

Notice the last three lines, where first the job monitor finishes, then there is a status update from call 04260 with the missing key.

I'm not sure how to reproduce this on a small example. The workaround in https://github.com/lithops-cloud/lithops/compare/master...tomwhite:lithops:missing-worker-exec-time helps the job run to completion, but doesn't fix the underlying problem.

JosepSampe commented 6 months ago

Hi @tomwhite, thanks for reporting. Note that the issue of a missing worker_end_tstamp has been recently fixed in here

How does job monitoring and TimeoutError work?

  1. When you invoke functions, they have the ability to raise the TimeoutError by themselves 5 seconds before reaching the execution_timeout.
  2. The problem arised when some functions were stopped/killed in the cloud with no further notice and for no specific reason, or functions that for some reason take extremely long to complete compared to other functions from the same map. In this case, Lithops was keeping waiting indefinitely for those functions to finish. (I had experienced this across multiple cloud providers when invoking thousands of functions, as in your case. So, I guess this is a sort of cloud functions issue).
  3. To prevent this error, I integrated in the Lithops client the ability to raise a TimeoutError if a function exceeds the execution_timeout and no status object is found in the storage backend. In this scenario, Lithops generates a fake call_status. You can view this logic here. This prevents Lithops from waiting indefinitely for functions that have been terminated by the cloud provider for no reason, or functions that take extremely long to finish in comparison to other functions from the same map.
tomwhite commented 6 months ago

Hi @JosepSampe, thanks for the explanation! I'll try out the fix and see if it helps in this case.

JosepSampe commented 6 months ago

Not sure if it is also related to the issue you have experienced, but you can see a basic "retry failed invocations" example here that should work properly. ( I Don't know if you are using a more sophisticated mechanism in Cubed to retry a task using the same FunctionExecutor())

tomwhite commented 6 months ago

Note that the issue of a missing worker_end_tstamp has been recently fixed in here

That seems to fix the issue - thanks!

tomwhite commented 6 months ago

( I Don't know if you are using a more sophisticated mechanism in Cubed to retry a task using the same FunctionExecutor())

I've opened #1289.