pangeo-forge / pangeo-forge-runner

Run pangeo-forge recipes on Apache Beam
https://pangeo-forge-runner.readthedocs.io
Apache License 2.0
8 stars 8 forks source link

Tracking Flink intergation test failure #111

Closed norlandrhagen closed 7 months ago

norlandrhagen commented 8 months ago

Currently the integration test for apache-flink is failing. Opening this issue to track a fix.

Examining the output of the subprocesses that called pangeo-forge-runner shows:

CompletedProcess(args=['pangeo-forge-runner', 'bake', '--repo', 'https://github.com/pforgetest/gpcp-from-gcs-feedstock.git', '--ref', '0.10.x', '--json', '-f', '/var/folders/mb/7d7yq_4j2qgdfm_j3j4tsyl40000gn/T/tmpwxx6buh0.json'], returncode=1, stdout=b'{"message": "Target Storage is FSSpecTarget(S3FileSystem(key=, secret=, client_kwargs=, root_path=\\"s3://gpcp/target/\\")\\n", "status": "setup"}\n{"message": "Input Cache Storage is CacheFSSpecTarget(S3FileSystem(key=, secret=, client_kwargs=, root_path=\\"s3://gpcp/input-cache/\\")\\n", "status": "setup"}\n{"message": "Metadata Cache Storage is MetadataTarget(S3FileSystem(key=, secret=, client_kwargs=, root_path=\\"s3://gpcp/metadata-cache/\\")\\n", "status": "setup"}\n{"message": "Picked Git content provider.\\n", "status": "fetching"}\n{"message": "Cloning into \'/var/folders/mb/7d7yq_4j2qgdfm_j3j4tsyl40000gn/T/tmp6kozi2ej\'...\\n", "status": "fetching"}\n{"message": "HEAD is now at 3c691af update pangeo-forge-recipes to 0.10.0\\n", "status": "fetching"}\n{\'TARGET_STORAGE\': FSSpecTarget(fs=, root_path=\'s3://gpcp/target/\'), \'INPUT_CACHE_STORAGE\': CacheFSSpecTarget(fs=, root_path=\'s3://gpcp/input-cache/\')}\n{\'StoreToZarr\': {\'target_root\': \'TARGET_STORAGE\'}, \'WriteCombinedReference\': {\'target_root\': \'TARGET_STORAGE\'}, \'OpenURLWithFSSpec\': {\'cache\': \'INPUT_CACHE_STORAGE\'}}\n{"message": "Parsing recipes...", "status": "running"}\n{"message": "Error during running: Command \'[\'kubectl\', \'apply\', \'--wait\', \'-f\', \'/var/folders/mb/7d7yq_4j2qgdfm_j3j4tsyl40000gn/T/tmpzoi7mym0\']\' returned non-zero exit status 1.", "exc_info": "Traceback (most recent call last):\\n File \\"/Users/nrhagen/micromamba/envs/pangeo-forge-recipes/bin/pangeo-forge-runner\\", line 8, in \\n sys.exit(main())\\n File \\"/Users/nrhagen/Documents/carbonplan/pangeo_forge/pangeo-forge-runner/pangeo_forge_runner/cli.py\\", line 28, in main\\n app.start()\\n File \\"/Users/nrhagen/Documents/carbonplan/pangeo_forge/pangeo-forge-runner/pangeo_forge_runner/cli.py\\", line 23, in start\\n super().start()\\n File \\"/Users/nrhagen/micromamba/envs/pangeo-forge-recipes/lib/python3.10/site-packages/traitlets/config/application.py\\", line 481, in start\\n return self.subapp.start()\\n File \\"/Users/nrhagen/Documents/carbonplan/pangeo_forge/pangeo-forge-runner/pangeo_forge_runner/commands/bake.py\\", line 225, in start\\n pipeline_options = bakery.get_pipeline_options(\\n File \\"/Users/nrhagen/Documents/carbonplan/pangeo_forge/pangeo-forge-runner/pangeo_forge_runner/bakery/flink.py\\", line 223, in get_pipeline_options\\n subprocess.check_call(cmd)\\n File \\"/Users/nrhagen/micromamba/envs/pangeo-forge-recipes/lib/python3.10/subprocess.py\\", line 369, in check_call\\n raise CalledProcessError(retcode, cmd)\\nsubprocess.CalledProcessError: Command \'[\'kubectl\', \'apply\', \'--wait\', \'-f\', \'/var/folders/mb/7d7yq_4j2qgdfm_j3j4tsyl40000gn/T/tmpzoi7mym0\']\' returned non-zero exit status 1.", "status": "failed"}\n', stderr=b'/Users/nrhagen/micromamba/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/__init__.py:79: UserWarning: This version of Apache Beam has not been sufficiently tested on Python 3.10. You may encounter bugs or missing features.\n warnings.warn(\nUnable to connect to the server: dial tcp: lookup prefect-dns-df188628.hcp.westeurope.azmk8s.io on 10.0.0.1:53: no such host\n')

paired down:

command kubectl apply returned non-zero exit status 1

ranchodeluxe commented 8 months ago

Overview

As discussed on #19 the Flink Operator version 1.5.0 on EKS produces zarr output that matches what the GCP DataFlow integration tests check against (though only sometimes and inconsistently) when using:

pangeo-forge-runner bake \
    --repo=https://github.com/ranchodeluxe/gpcp-from-gcs-feedstock.git  \
    --ref="test/integration" \
    -f /Users/ranchodeluxe/apps/gpcp-from-gcs-feedstock/feedstock/runner_config.py \
    --FlinkOperatorBakery.job_manager_resources='{"memory": "2048m", "cpu": 1.0}' \
    --FlinkOperatorBakery.task_manager_resources='{"memory": "3072m", "cpu": 1.0}' \
    --FlinkOperatorBakery.flink_configuration='{"taskmanager.numberOfTaskSlots": "1", "taskmanager.memory.flink.size": "2048m", "taskmanager.memory.task.off-heap.size": "736m"}' \
    --FlinkOperatorBakery.parallelism=1 \
    --FlinkOperatorBakery.flink_version="1.15" \
    --FlinkOperatorBakery.beam_executor_resources='{"requests": {"cpu": 1.0, "memory": 2046}}' \
    --Bake.container_image='apache/beam_python3.9_sdk:2.51.0' \
    --Bake.job_name=recipe \
    --Bake.bakery_class="pangeo_forge_runner.bakery.flink.FlinkOperatorBakery"

Success Metrics

The best success metrics I can find about a job status is from the logs of the JobManager (pod/recipe-<hash>). I'll hunt around to see if Flink Operator upgrades or Flink Runner upgrades might fix this issue:

2023-10-18 22:21:02,503 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job BeamApp-flink-1018220614-fe7baf59 (355ee8392fa9cab4f7e2e5854a7d73b9) switched from state RUNNING to FINISHED.
2023-10-18 22:21:02,522 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 355ee8392fa9cab4f7e2e5854a7d73b9 reached terminal state FINISHED.
2023-10-18 22:21:02,542 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 355ee8392fa9cab4f7e2e5854a7d73b9 has been registered for cleanup in the JobResultStore after reaching a terminal state.
2023-10-18 22:21:02,546 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Stopping the JobMaster for job 'BeamApp-flink-1018220614-fe7baf59' (355ee8392fa9cab4f7e2e5854a7d73b9).

Turnaround Time

Inconsistent at best. Sometimes I get it to happen in ~13 min and other times it seems to hang forever-eva šŸ˜ž Will keep on trying to get this to be faster so we can fix the integration test with some confidence

ranchodeluxe commented 8 months ago

Inconsistent at best. Sometimes I get it to happen in ~13 min and other times it seems to hang forever-eva šŸ˜ž Will keep on trying to get this to be faster so we can fix the integration test with some confidence

Can't get any consistency out of these runs. They produce output (which only sometimes seems to be the complete output) but most of the time the TaskManager(s) stick around forever and the JobManager hardly ever produces the success output in the logs I want šŸ³

It's been a week -- doesn't feel like the operator is where we want it to be. The newest stable version 1.6.0 seems to require Flink 1.17 which doesn't have a compatible related Flink Runner version: https://repo.maven.apache.org/maven2/org/apache/beam/ for us to try out

I have a few more tricks up my sleeve to find the magical incantation to make this thing consistent and will try those

ranchodeluxe commented 8 months ago

I've got consistent and completing runs happening!!! āœØ šŸ¤– šŸš€ šŸŽø

Final Incantation

I was about to admit defeat to the Flink Gods šŸ‰ (jobs for varied time dimensions were mostly hanging) before I realized the Flink K8 Operator logs were actually telling me they preferred Flink version 1.16. That Flink version only has job-server jars for apache-beam>=2.47.0:

Screen Shot 2023-10-22 at 10 15 34 AM

I then ran everything with a high parallelism:

pangeo-forge-runner bake \
    --repo=https://github.com/ranchodeluxe/gpcp-from-gcs-feedstock.git  \
    --ref="test/integration" \
    -f /Users/ranchodeluxe/apps/gpcp-from-gcs-feedstock/feedstock/runner_config.py \
    --FlinkOperatorBakery.job_manager_resources='{"memory": "6144m", "cpu": 1.0}' \
    --FlinkOperatorBakery.task_manager_resources='{"memory": "6144m", "cpu": 1.0}' \
    --FlinkOperatorBakery.flink_configuration='{"taskmanager.numberOfTaskSlots": "1", "taskmanager.memory.flink.size": "3072m", "taskmanager.memory.task.off-heap.size": "1024m", "taskmanager.memory.jvm-overhead.max": "4096m"}' \
    --FlinkOperatorBakery.parallelism=10 \
    --FlinkOperatorBakery.flink_version="1.16" \
    --Bake.container_image='apache/beam_python3.9_sdk:2.47.0' \
    --Bake.job_name=recipe \
    --Bake.bakery_class="pangeo_forge_runner.bakery.flink.FlinkOperatorBakery"

Turnaround Time

Still not as fast as GCP DataFlow (especially if only using one process):

Screen Shot 2023-10-22 at 10 56 09 AM
yuvipanda commented 8 months ago

This is awesome work, @ranchodeluxe!

I spent a bunch of time trying to have the job manager not be uploaded from my local machine but automatically set up, using the example in https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/tree/master/examples/beam/with_job_server (although that is based on a different, now deprecated flink operator). However, I eventually gave up and couldn't get it to work. I would suggest that speedups can be achieved here by having larger nodes or nodes with SSD disks for faster image pulling, rather than trying to fix the JAR upload times.

But regardless, am super excited to systematize this.

ranchodeluxe commented 8 months ago

Finally got a relevant traceback since all stdout seems to be lost (will work on that):

File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/zarr/storage.py", line 1530, in rmdir if self.fs.isdir(store_path): File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/fsspec/asyn.py", line 118, in wrapper return sync(self.loop, func, *args, **kwargs) File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/fsspec/asyn.py", line 103, in sync raise return_result File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/fsspec/asyn.py", line 56, in _runner result[0] = await coro File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/s3fs/core.py", line 1411, in _isdir return bool(await self._lsdir(path)) File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/s3fs/core.py", line 706, in _lsdir async for c in self._iterdir( File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/s3fs/core.py", line 756, in _iterdir async for i in it: File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/paginate.py", line 30, in __anext__ response = await self._make_request(current_kwargs) File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/client.py", line 366, in _make_api_call http, parsed_response = await self._make_request( File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/client.py", line 391, in _make_request return await self._endpoint.make_request( File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/endpoint.py", line 100, in _send_request while await self._needs_retry( File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/endpoint.py", line 262, in _needs_retry responses = await self._event_emitter.emit( File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/hooks.py", line 66, in _emit response = await resolve_awaitable(handler(**kwargs)) File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/_helpers.py", line 15, in resolve_awaitable return await obj File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/retryhandler.py", line 107, in _call if await resolve_awaitable(self._checker(**checker_kwargs)): File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/_helpers.py", line 15, in resolve_awaitable return await obj File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/retryhandler.py", line 126, in _call should_retry = await self._should_retry( File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/retryhandler.py", line 165, in _should_retry return await resolve_awaitable( File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/_helpers.py", line 15, in resolve_awaitable return await obj File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/retryhandler.py", line 174, in _call checker(attempt_number, response, caught_exception) File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/botocore/retryhandler.py", line 247, in __call__ return self._check_caught_exception( File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/botocore/retryhandler.py", line 416, in _check_caught_exception raise caught_exception File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/endpoint.py", line 181, in _do_get_response http_response = await self._send(request) File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/endpoint.py", line 285, in _send return await self.http_session.send(request) File "/opt/apache/beam-venv/beam-venv-worker-4-1/lib/python3.9/site-packages/aiobotocore/httpsession.py", line 247, in send raise ReadTimeoutError(endpoint_url=request.url, error=e) botocore.exceptions.ReadTimeoutError: Read timeout on endpoint URL: "http://10.1.0.20:19555/gpcp?list-type=2&prefix=target%2Fgpcp%2F&delimiter=%2F&encoding-type=url"
ranchodeluxe commented 8 months ago

Well it's not pretty and not final but then again neither am I šŸ˜ šŸŽ‰

Screen Shot 2023-10-24 at 7 39 55 PM
cisaacstern commented 8 months ago

@ranchodeluxe i think this is fixed?

ranchodeluxe commented 7 months ago

yes, doneso... closing now