NASA-IMPACT / veda-pforge-job-runner

Apache Beam + EMR Serverless Job Runner for Pangeo Forge Recipes
2 stars 2 forks source link

Failing: MUR SST #15

Closed ranchodeluxe closed 4 months ago

ranchodeluxe commented 8 months ago

kerchunk: https://github.com/developmentseed/pangeo-forge-staging/tree/mursst-kerchunk

Runs on LocalDirectBakery using prune option

local-runner-cofig.py at bottom

pip install git+https://github.com/pangeo-forge/pangeo-forge-recipes.git@main

EARTHDATA_USERNAME=blah EARTHDATA_PASSWORD="blah" EARTHDATA_PROTOCOL=https pangeo-forge-runner bake \
    --repo=https://github.com/developmentseed/pangeo-forge-staging \
    --ref="mursst-kerchunk" \
    -f local-runner-config.py \
    --feedstock-subdir="recipes/mursst" \
    --Bake.recipe_id=MUR-JPL-L4-GLOB-v4.1 --Bake.job_name=local_test \
    --prune
pangeo-forge-runner bake \
  --repo=https://github.com/ranchodeluxe/mursst-example \
  --ref="main" \
  -f config.py

Runs on LocalDirectBakery for all timesteps

same as above without --prune

pangeo-forge-runner bake \
  --repo=https://github.com/ranchodeluxe/mursst-example \
  --ref="main" \
  -f config.py

Runs on FlinkOperatorBakery for prune option

This has not been attempted yet because it depends on https://github.com/NASA-IMPACT/veda-pforge-job-runner/pull/28 (for the protocol parameter) and https://github.com/NASA-IMPACT/veda-pforge-job-runner/pull/25 (for passing Earthdata username and password)

curl -X POST \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
-H "Authorization: token blahblah" \
https://api.github.com/repos/NASA-IMPACT/veda-pforge-job-runner/actions/workflows/job-runner.yaml/dispatches \
-d '{"ref":"main", "inputs":{"protocol": "s3", "repo":"https://github.com/developmentseed/pangeo-forge-staging","ref":"mursst-kerchunk","prune":"1"}}'
curl -X POST \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
-H "Authorization: token blahblah" \
https://api.github.com/repos/NASA-IMPACT/veda-pforge-job-runner/actions/workflows/job-runner.yaml/dispatches \
-d '{"ref":"main", "inputs":{"repo":"https://github.com/ranchodeluxe/mursst-example","ref":"main","prune":"1"}}'

Runs on FlinkOperatorBakery for all timesteps

Not yet tested

curl -X POST \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
-H "Authorization: token blahblah" \
https://api.github.com/repos/NASA-IMPACT/veda-pforge-job-runner/actions/workflows/job-runner.yaml/dispatches \
-d '{"ref":"main", "inputs":{"repo":"https://github.com/ranchodeluxe/mursst-example","ref":"main","prune":"0"}}'

local-runner-config.py

c.Bake.bakery_class = "pangeo_forge_runner.bakery.local.LocalDirectBakery"

c.MetadataCacheStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
# Metadata cache should be per `{{job_name}}`, as kwargs changing can change metadata
c.MetadataCacheStorage.root_path = "./metadata"

c.TargetStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
c.TargetStorage.root_path = "./target"

c.InputCacheStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
c.InputCacheStorage.root_path = "./cache"
ranchodeluxe commented 8 months ago

The way Quinlan wrote this the data is downloaded during task kickoff on the host. So we should rewrite as I mentioned in this slack thread: https://developmentseed.slack.com/archives/C04PY5QRHCM/p1698890912079009

Needs to be rewritten to be lazy anyhow like this: https://github.com/ranchodeluxe/mursst-example/blob/main/feedstock/recipe.py

ranchodeluxe commented 8 months ago

Local runs fail and the hunch is that the parent process is being killed during rechunking b/c it's taking too much memory. Running on Flink big box to gauge $

ranchodeluxe commented 8 months ago

Flink run fails here and it seems there's something wrong with the data/logic: https://github.com/NASA-IMPACT/veda-pforge-job-runner/actions/runs/7469136161

abarciauskas-bgse commented 8 months ago

@ranchodeluxe should I create a new issue in this repo for the kerchunk recipe? I modified the kerchunk recipe in https://github.com/pangeo-forge/staged-recipes/pull/259 to use FilePattern in place of GranuleQuery and also the WriteCombinedReference from https://github.com/pangeo-forge/pangeo-forge-recipes/pull/660. It works locally for 30 timesteps.

However, it is working locally because I am using HTTPS so I think I would like to try running it on Flink so it can use direct S3 access. Right now, the protocol is hard-coded as HTTPS but I am thinking we should make it configurable as either an environment variable or (I think more ideal) part of the configuration. Do you know if there is any documentation on how to pass configuration parameters, either via the command line or config file?

ranchodeluxe commented 8 months ago

@ranchodeluxe should I create a new issue in this repo for the kerchunk recipe? I modified the kerchunk recipe in pangeo-forge/staged-recipes#259 to use FilePattern in place of GranuleQuery and also the WriteCombinedReference from pangeo-forge/pangeo-forge-recipes#660. It works locally for 30 timesteps.

Nice! You could just edit this existing one above and point the examples to the right repo and ref and that's about the same thing. But your call

However, it is working locally because I am using HTTPS so I think I would like to try running it on Flink so it can use direct S3 access. Right now, the protocol is hard-coded as HTTPS but I am thinking we should make it configurable as either an environment variable or (I think more ideal) part of the configuration. Do you know if there is any documentation on how to pass configuration parameters, either via the command line or config file?

For the time being we can just add a global input here and add it as an os env var here (like the secrets are being handled for now). Then you recipe can conditionally check if os.environ.get('WHATEVER'). Let me know if that makes sense. There still might be a use case for a traitlet option to pump arbitrary key/values per recipe into the os environment but we can sip on that idea for now like a fine wine

abarciauskas-bgse commented 8 months ago

Thanks @ranchodeluxe I:

  1. Updated the issue above to include the local run and future flink run for kerchunk. One question I have is does it make sense to include "Runs on LocalDirectBakery for all timesteps". Do we need to include this test if its working for a few files, I feel like it makes sense to move to flink at that point. Perhaps testing more than 2 files makes sense, but testing with the entire archive seems like that is what flink is designed to handle.
  2. I opened #28 to pass the protocol configuration option (and updated the mursst-kerchunk branch)

As noted in the issue text, I think we need to merge #25 and #28 before we can run this with the flink cluster hooked up to this repo, is that right?

ranchodeluxe commented 8 months ago

As noted in the issue text, I think we need to merge #25 and #28 before we can run this with the flink cluster hooked up to this repo, is that right?

Yep, merged these blocking PRs.

1. Updated the issue above to include the local run and future flink run for kerchunk. One question I have is does it make sense to include "Runs on `LocalDirectBakery` for all timesteps". Do we need to include this test if its working for a few files, I feel like it makes sense to move to flink at that point. Perhaps testing more than 2 files makes sense, but testing with the entire archive seems like that is what flink is designed to handle.

I mean you're right that we shouldn't need to be running all timesteps on the LocalDirectBakery. The reason I had that listed is b/c this whole stack is so new/flakey and LocalDirectBakery is really the only way to get accurate feedback about whether something is working or not. I'll probably keep running things in full on there (EC2 machine with 8 cores so it's real fast 😉) until I know that the current blockers (below) to ETL'ing full datasets are resolved. The first one is the most important blocker:

  1. https://github.com/pangeo-forge/pangeo-forge-recipes/issues/667

  2. pangeo-forge-recipes only has the changes we need on main and a release needs to be cut for these changes. That cut should also include this pending bug fix IMHO

  3. https://github.com/NASA-IMPACT/veda-pforge-job-runner/issues/27

abarciauskas-bgse commented 8 months ago

Thanks @ranchodeluxe

this whole stack is so new/flakey and LocalDirectBakery is really the only way to get accurate feedback about whether something is working or not

that makes sense and seems like something we need to resolve to make flink useful, otherwise why wouldn't we just use the output from "tests" for all timesteps on EC2? (Reuse and reproducibility of course). Thanks for opening https://github.com/NASA-IMPACT/veda-pforge-job-runner/issues/27 to address this.

ranchodeluxe commented 8 months ago

otherwise why wouldn't we just use the output from "tests" for all timesteps on EC2?

@abarciauskas-bgse: that's the backup plan 😉 Maybe we use AWS Batch. And when I say the "whole stack being new/flakey" I'm also talking about the pangeo-forge-recipes not just the Flink runner. That said we still haven't seen anything fail on Flink that runs locally so as long as we get good error feedback there's nothing wrong with Flink at this moment

abarciauskas-bgse commented 7 months ago

I did some testing on EC2 to try and figure out, if we did use a local runner, what type of resources we might need to generate the kerchunk reference in a reasonable amount of time and without error.

1. Getting setup

sudo apt-get update
sudo apt-get install python3-pip
python3 -m pip install --upgrade pip

pip install \
 fsspec \
 s3fs \
 boto3 \
 requests \
 apache-beam==2.52.0 \
 pangeo-forge-runner>='0.9.1' \
 git+https://github.com/pangeo-forge/pangeo-forge-recipes.git@main

export PATH=$PATH:/home/ubuntu/.local/bin

1b. Add local-runner-config.py

# vi local-runner-config.py
c.Bake.bakery_class = "pangeo_forge_runner.bakery.local.LocalDirectBakery"

c.MetadataCacheStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
# Metadata cache should be per `{{job_name}}`, as kwargs changing can change metadata
c.MetadataCacheStorage.root_path = "./metadata"

c.TargetStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
c.TargetStorage.root_path = "./target"

c.InputCacheStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
c.InputCacheStorage.root_path = "./cache"

2. Updating the temporal range

Modify the recipe in github for different temporal ranges in order to evaluate the duration of the recipe run and size of the output.

git clone https://github.com/developmentseed/pangeo-forge-staging
cd pangeo-forge-staging
git checkout mursst-kerchunk
cd ..
vi pangeo-forge-staging/recipes/mursst/recipe.py

3. Run the recipe

export EDU=aimeeb
export EDP="XXX"

time EARTHDATA_USERNAME=$EDU EARTHDATA_PASSWORD=$EDP PROTOCOL=s3 pangeo-forge-runner bake \
 --repo=./pangeo-forge-staging \
 --ref="mursst-kerchunk" \
 -f local-runner-config.py \
 --feedstock-subdir="recipes/mursst" \
 --Bake.recipe_id=MUR-JPL-L4-GLOB-v4.1 --Bake.job_name=local_test

notes:

Test 1: t3.medium 2vCPU and 4GB RAM

days time (seconds) size (mb)
30 71 22
61 145 42
91 238 62
122 334 "Channel closed prematurely" Error (see full traceback below)

Test 2: t3.xlarge 4vCPU and 16GB RAM

days time (seconds) size (mb)
30 37 14
61 68 25
91 101 34
122 137 44
153 170 54
183 161 64

Test 3: t3.2xlarge 8vCPU and 32GB RAM

days time (seconds) size (mb)
30 27 14
61 47 19
91 64 24
122 77 29
153 92 35
183 118 39

Final kerchunk size estimate

Every month adds at least 5mb size to reference file data. 70mb per year or 1400mb for 20 years.

Warnings and Errors

Concatenated coordinate time contains less than...

I saw quite a few of these:

/home/ubuntu/.local/lib/python3.10/site-packages/kerchunk/combine.py:269: UserWarning: Concatenated coordinate 'time' contains less than expectednumber of values across the datasets: [676285200]

which I'm concerned has to do with the different sizes

Unconsolidated metadata

Metadata does not appear to be consolidated when I open the dataset

Channel closed prematurely

Full traceback ```sh Target Storage is FSSpecTarget(LocalFileSystem(, root_path="./target") Input Cache Storage is CacheFSSpecTarget(LocalFileSystem(, root_path="./cache") Metadata Cache Storage is MetadataTarget(LocalFileSystem(, root_path="./metadata") Picked Git content provider. Cloning into '/tmp/tmpth2qfipk'... HEAD is now at df6ec0e Update recipe.py Parsing recipes... Baking only recipe_id='MUR-JPL-L4-GLOB-v4.1' Converting string literal type hint to Any: "beam.PCollection" Converting string literal type hint to Any: "beam.PCollection[zarr.storage.FSStore]" Converting string literal type hint to Any: "beam.PCollection" Converting string literal type hint to Any: "beam.PCollection" Converting string literal type hint to Any: "beam.PCollection" Converting string literal type hint to Any: "beam.PCollection" Converting string literal type hint to Any: "beam.PCollection" Converting string literal type hint to Any: "beam.PCollection" Converting string literal type hint to Any: "beam.PCollection" Converting string literal type hint to Any: "beam.PCollection" Using Any for unsupported type: typing.MutableMapping Converting string literal type hint to Any: "beam.PCollection" Converting string literal type hint to Any: "beam.PCollection" Converting string literal type hint to Any: "beam.PCollection" Converting string literal type hint to Any: "beam.PCollection[zarr.storage.FSStore]" Running job for recipe MUR-JPL-L4-GLOB-v4.1 ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== ==================== starting control server on port 45411 starting data server on port 36633 starting state server on port 44875 starting logging server on port 44713 Created Worker handler for environment ref_Environment_default_environment_1 (beam:env:harness_subprocess_python:v1, b'/usr/bin/python3 -m apache_beam.runners.worker.sdk_worker_main') Created Worker handler for environment ref_Environment_default_environment_1 (beam:env:harness_subprocess_python:v1, b'/usr/bin/python3 -m apache_beam.runners.worker.sdk_worker_main') Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 858491182 } message: "semi_persistent_directory: None" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:111" thread: "MainThread" Worker: severity: WARN timestamp { seconds: 1706148549 nanos: 865040779 } message: "Discarding unparseable args: [\'--direct_runner_use_stacked_bundle\', \'--pipeline_type_check\']" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/options/pipeline_options.py:367" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 869067668 } message: "Pipeline_options: {\'runner\': \'DirectRunner\', \'direct_num_workers\': 0, \'direct_running_mode\': \'multi_processing\', \'gcp_oauth_scopes\': [\'https://www.googleapis.com/auth/bigquery\', \'https://www.googleapis.com/auth/cloud-platform\', \'https://www.googleapis.com/auth/devstorage.full_control\', \'https://www.googleapis.com/auth/userinfo.email\', \'https://www.googleapis.com/auth/datastore\', \'https://www.googleapis.com/auth/spanner.admin\', \'https://www.googleapis.com/auth/spanner.data\', \'https://www.googleapis.com/auth/bigquery\', \'https://www.googleapis.com/auth/cloud-platform\', \'https://www.googleapis.com/auth/devstorage.full_control\', \'https://www.googleapis.com/auth/userinfo.email\', \'https://www.googleapis.com/auth/datastore\', \'https://www.googleapis.com/auth/spanner.admin\', \'https://www.googleapis.com/auth/spanner.data\'], \'requirements_file\': \'/tmp/tmpth2qfipk/recipes/mursst/requirements.txt\', \'pickle_library\': \'cloudpickle\', \'save_main_session\': True, \'sdk_worker_parallelism\': \'1\', \'environment_cache_millis\': \'0\'}" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:135" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 879352331 } message: "Creating state cache with size 104857600" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/statecache.py:234" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 879851341 } message: "Creating insecure control channel for localhost:45411." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:187" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 883674860 } message: "Control channel established." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:195" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 861497402 } message: "semi_persistent_directory: None" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:111" thread: "MainThread" Worker: severity: WARN timestamp { seconds: 1706148549 nanos: 868108987 } message: "Discarding unparseable args: [\'--direct_runner_use_stacked_bundle\', \'--pipeline_type_check\']" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/options/pipeline_options.py:367" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 872321844 } message: "Pipeline_options: {\'runner\': \'DirectRunner\', \'direct_num_workers\': 0, \'direct_running_mode\': \'multi_processing\', \'gcp_oauth_scopes\': [\'https://www.googleapis.com/auth/bigquery\', \'https://www.googleapis.com/auth/cloud-platform\', \'https://www.googleapis.com/auth/devstorage.full_control\', \'https://www.googleapis.com/auth/userinfo.email\', \'https://www.googleapis.com/auth/datastore\', \'https://www.googleapis.com/auth/spanner.admin\', \'https://www.googleapis.com/auth/spanner.data\', \'https://www.googleapis.com/auth/bigquery\', \'https://www.googleapis.com/auth/cloud-platform\', \'https://www.googleapis.com/auth/devstorage.full_control\', \'https://www.googleapis.com/auth/userinfo.email\', \'https://www.googleapis.com/auth/datastore\', \'https://www.googleapis.com/auth/spanner.admin\', \'https://www.googleapis.com/auth/spanner.data\'], \'requirements_file\': \'/tmp/tmpth2qfipk/recipes/mursst/requirements.txt\', \'pickle_library\': \'cloudpickle\', \'save_main_session\': True, \'sdk_worker_parallelism\': \'1\', \'environment_cache_millis\': \'0\'}" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:135" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 884671449 } message: "Creating state cache with size 104857600" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/statecache.py:234" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 885025024 } message: "Creating insecure control channel for localhost:45411." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:187" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 885364055 } message: "Initializing SDKHarness with unbounded number of workers." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:243" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 888333559 } message: "Control channel established." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:195" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 890057802 } message: "Python sdk harness starting." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:211" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 889965534 } message: "Initializing SDKHarness with unbounded number of workers." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:243" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 893246889 } message: "Python sdk harness starting." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:211" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 898192644 } message: "Creating insecure state channel for localhost:44875." instruction_id: "bundle_2" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:885" thread: "Thread-11" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 898584365 } message: "State channel established." instruction_id: "bundle_2" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:892" thread: "Thread-11" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 900778770 } message: "Creating client data channel for localhost:36633" instruction_id: "bundle_2" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py:770" thread: "Thread-11" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 903797388 } message: "Creating insecure state channel for localhost:44875." instruction_id: "bundle_1" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:885" thread: "Thread-12" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 904160976 } message: "State channel established." instruction_id: "bundle_1" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:892" thread: "Thread-12" Worker: severity: INFO timestamp { seconds: 1706148549 nanos: 907867431 } message: "Creating client data channel for localhost:36633" instruction_id: "bundle_1" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py:770" thread: "Thread-12" /home/ubuntu/.local/lib/python3.10/site-packages/kerchunk/combine.py:269: UserWarning: Concatenated coordinate 'time' contains less than expectednumber of values across the datasets: [675766800] warnings.warn( /home/ubuntu/.local/lib/python3.10/site-packages/kerchunk/combine.py:269: UserWarning: Concatenated coordinate 'time' contains less than expectednumber of values across the datasets: [675853200] warnings.warn( Killed Exception in thread beam_control_read: Traceback (most recent call last): File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner Exception in thread run_worker: Traceback (most recent call last): File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner self.run() File "/usr/lib/python3.10/threading.py", line 953, in run self._target(*self._args, **self._kwargs) File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/local_job_service.py", line 227, in run self.run() File "/usr/lib/python3.10/threading.py", line 953, in run self._target(*self._args, **self._kwargs) File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 120, in _read Failed to read inputs in the data plane. Traceback (most recent call last): File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs for elements in elements_iterator: File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 488, in __next__ return self._next() File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 480, in _next request = self._look_for_request() File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 462, in _look_for_request _raise_rpc_error(self._state) File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 162, in _raise_rpc_error raise rpc_error grpc.RpcError Exception in thread read_grpc_client_inputs: Traceback (most recent call last): File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner self.run() File "/usr/lib/python3.10/threading.py", line 953, in run self._target(*self._args, **self._kwargs) File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 669, in target=lambda: self._read_inputs(elements_iterator), raise RuntimeError( for data in self._input: File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 488, in __next__ RuntimeError: Worker subprocess exited with return code 137 return self._next() File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 480, in _next request = self._look_for_request() File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 462, in _look_for_request _raise_rpc_error(self._state) File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 162, in _raise_rpc_error raise rpc_error grpc.RpcError File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs for elements in elements_iterator: File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 488, in __next__ return self._next() File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 480, in _next request = self._look_for_request() File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 462, in _look_for_request _raise_rpc_error(self._state) File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 162, in _raise_rpc_error raise rpc_error grpc.RpcError Worker: severity: INFO timestamp { seconds: 1706148873 nanos: 758861303 } message: "No more requests from control plane" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:274" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148873 nanos: 759168624 } message: "SDK Harness waiting for in-flight requests to complete" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:275" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148873 nanos: 759240150 } message: "Closing all cached grpc data channels." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py:803" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148873 nanos: 759358882 } message: "Closing all cached gRPC state handlers." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:904" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148873 nanos: 796697378 } message: "Done consuming work." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:287" thread: "MainThread" Worker: severity: INFO timestamp { seconds: 1706148873 nanos: 796885013 } message: "Python sdk harness exiting." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:213" thread: "MainThread" Traceback (most recent call last): File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 505, in input_elements element = received.get(timeout=1) File "/usr/lib/python3.10/queue.py", line 179, in get raise Empty _queue.Empty During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/ubuntu/.local/bin/pangeo-forge-runner", line 8, in sys.exit(main()) File "/home/ubuntu/.local/lib/python3.10/site-packages/pangeo_forge_runner/cli.py", line 28, in main app.start() File "/home/ubuntu/.local/lib/python3.10/site-packages/pangeo_forge_runner/cli.py", line 23, in start super().start() File "/home/ubuntu/.local/lib/python3.10/site-packages/traitlets/config/application.py", line 474, in start return self.subapp.start() File "/home/ubuntu/.local/lib/python3.10/site-packages/pangeo_forge_runner/commands/bake.py", line 326, in start pipeline.run() File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 585, in run return self.runner.run_pipeline(self, self._options) File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 128, in run_pipeline return runner.run_pipeline(pipeline, options) File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline self._latest_run_result = self.run_via_runner_api( File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api return self.run_stages(stage_context, stages) File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 455, in run_stages bundle_results = self._execute_bundle( File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 783, in _execute_bundle self._run_bundle( File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1020, in _run_bundle result, splits = bundle_manager.process_bundle( File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1462, in process_bundle for result, split_result in executor.map(execute, zip(part_inputs, # pylint: disable=bad-option-value File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator yield _result_or_cancel(fs.pop()) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel return fut.result(timeout) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result return self.__get_result() File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1454, in execute return bundle_manager.process_bundle( File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1372, in process_bundle for output in self._worker_handler.data_conn.input_elements( File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 508, in input_elements raise RuntimeError('Channel closed prematurely.') RuntimeError: Channel closed prematurely. ```

Next steps

abarciauskas-bgse commented 7 months ago

@ranchodeluxe @norlandrhagen last Friday I mentioned I was running into this error: https://github.com/fsspec/kerchunk/blob/063684618c053e93e3f1f25c4688ec2765c0d962/kerchunk/combine.py#L501-L506

It does appear there are a few days in 2023 where the mur sst netcdf data is chunked differently than all the other days. I started to go down a 🐇 🕳️ of kerchunk and pangeo forge recipes, which was a goose chase, because I could actually see the different chunk shapes if I just updated my version of xarray or used h5py or h5netcdf (see https://github.com/pydata/xarray/issues/8691).

unfortunately, the different chunk shape is not an issue for just a few files. All of the data has chunk shape (1, 1023, 2047) except the following date ranges:

My understanding is we cannot create kerchunk references for data with variable chunk shapes, which I think is the reason for the Variable Chunking Zarr Enhancement Proposal (ZEP).

In lieu of support for variable chunking in Zarr, there are 2 resolutions I can think of:

  1. Create new versions of this data with the preferred chunk shape (the data type for analysed_sset is int16 so I believe this would result in 4mb chunks for shape 1, 1023, 2047 and 51.8mb chunks for shape (1, 3600, 7200). But this would negate the reason we are using kerchunk (not creating copies of the data.
  2. Create a kerchunk reference for each chunk shape. The second kerchunk reference would be a lot smaller than the first.

Curious what you think, and also @sharkinsspatial

norlandrhagen commented 7 months ago

Thanks for the investigation @abarciauskas-bgse! Happy to chat about this tmrrw.

norlandrhagen commented 7 months ago

I wonder if there is a 3. of reaching out to the data provider and see if we can get any clarification on why this is happening.

abarciauskas-bgse commented 7 months ago

@norlandrhagen Totally agree, I was thinking to reach out to po.daac to see if there are plans to backprocess to complete the new chunk shape across the whole dataset or otherwise deliver a consistently chunked version.

norlandrhagen commented 7 months ago

Seems like that would be the best way forward, but having the variable chunking ZEP in place would be super helpful for cases like this.

abarciauskas-bgse commented 7 months ago

To recap from our meeting today, I think the next steps will be: