Open jhamman opened 4 years ago
Thanks. I need to run in a few minutes, but my WIP is at https://github.com/TomAugspurger/prefect-demo.
I'm close-ish to getting something working, but the handling of DaskExecutor <-> Dask Cluster on our k8s cluster is a bit awkward. May be best to sync up sometime in the next couple days to talk things through.
Great! I'm around the next few days so just ping me when you're ready to chat.
Also, just dropping this here for reference. On a pangeo hub, dropping the following bit into run.py
will allow you to run the hub "manually":
gateway = Gateway()
options = gateway.cluster_options()
options.worker_cores = 8
options.worker_memory = 51
cluster = gateway.new_cluster(cluster_options=options)
cluster.scale(4)
executor = DaskExecutor(
address=cluster.scheduler_address,
client_kwargs={"security": cluster.security}
)
pipeline.flow.run(executor=executor, )
@jhamman are you free around 12:30 Central / 10:30 Pacific?
I think Ryan was right about DaskGateway creating unnecessary difficulties. I'm able to get the DaskKubernetesExecutor working OK. Working to port this workflow now, and I'll make a PR.
Yes I am!
I was able to complete this flow using my prefect cloud account and our staging k8s cluster.
One-time setup + some stuff that's probably per k8s-namespace setup. Note that some names may be different (previously I used the pangeo-forge
bucket, now it's pangeo-forge-scratch
.
Per-flow things
It's unclear how best to do this. For now, calling it in python code works
>>> pipeline = TerraclimatePipeline(cache_location, target_location, variables, years)
>>> pipeline.flow.register(project="pangeo-forge")
I think that ideally this is done in the GitHub Action. The github worker just needs a token to log into our prefect cloud account.
This probably just happens on pushes to master since this is the computationally expensive part?
$ prefect run flow --name=terraclimate --project=pangeo-forge
This is also done in the GitHub Action worker.
At this point, a bunch of stuff happens on the k8s cluster
Random thoughts
storage
and environment
(/executor) baked into it
when it's registered. How best to do that? Ideally recipe authors don't need to worry
about it.worker_pod.yaml
and job.yaml
just to set serviceAccount and
serviceAccountName. Would be nice to avoid that (open issue at prefect)@TomAugspurger - amazing! How should we go about pulling in your proof of concept here? As it stands, I think this repo is a good candidate location for a place to test out a full CI workflow. I can spend some cycles on the GitHub actions side tomorrow and Friday if you want to push your modifications (storage/environment/register) to master here.
I think hooking up to the CI is the next step.
After that it'd be good to think through how we structure things like the environment and storage.
@TomAugspurger - I think I'm just missing the storage and environment setups you had in your flow. Once I have those, I think we'll be all set.
Whoops, https://github.com/TomAugspurger/terraclimate-feedstock/commit/78b127eee021404dc0831b679710a17c3379bf99 has all that (and probably some other stuff).
I'm registering the flow with python recipe/pipeline.py
, but that would probably belong in an external file. You just need to somehow get the Flow object at the top-level of the module I think.
🎉 We're up and rolling on GitHub actions: https://github.com/pangeo-forge/terraclimate-feedstock/runs/1067909406?check_suite_focus=true
However, the job failed on the prefect side:
Task Run State Message:
Unexpected error: OSError('Forbidden: https://www.googleapis.com/storage/v1/b/pangeo-scratch/o/terraclimate-cache%2F8052597365796761289\nPrimary: /namespaces/pangeo-181919.svc.id.goog with additional claims does not have storage.objects.get access to the Google Cloud Storage object.')
Everything is in master here now. A few notes about what I did:
pangeoforge
DockerHub account: https://hub.docker.com/orgs/pangeoforgePREFECT_ACCESS_TOKEN
, DOCKER_USER
, DOCKER_ACCESS_TOKEN
JUPYTERHUB_USER=pangeoforge
and the Docker storage object to point to the pangeoforge
DockerHub account.@TomAugspurger - I know you were working on the service account permissions yesterday. Any hints on what to do next?
Cool... I think I was surprised about not having to add any gcs-specific permissions to the pangeo-forge Google Service Account. Can you try poking around there?
Though that doesn't explain why it worked yesterday. Maybe it didn't actually work?
For reference, pangeo
has these
pangeo-forge doesn't. Maybe add those roles?
@TomAugspurger - I'm at a loss I think. I now have things set up as:
Yet, I'm still getting permission errors. How do we debug this?
Looks like fs = gcsfs.GCSFileSystem(token="cloud")
was needed.
On Thu, Sep 3, 2020 at 1:51 PM Joe Hamman notifications@github.com wrote:
@TomAugspurger https://github.com/TomAugspurger - I'm at a loss I think. I now have things set up as: [image: image] https://user-images.githubusercontent.com/2443309/92155097-af20bb00-eddb-11ea-9566-82b3ed56b8b0.png
Yet, I'm still getting permission errors. How do we debug this?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pangeo-forge/terraclimate-feedstock/issues/3#issuecomment-686693003, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAKAOIVWHNFNKBJPXNV5EVTSD7QTHANCNFSM4QSH72QQ .
Hmmm, how do we specify this with fsspec?
I'm not sure, but possibly setting GCSFS_DEFAULT_PROJECT
in https://github.com/dask/gcsfs/blob/e142fca992556479930083363e87b8f9509f6175/gcsfs/core.py#L80? I'm really not sure how this works on the hubs. I would think it's the same.
That's assuming we aren't able to use storage_options
.
After a few small fixes to the pipeline, we're there!
Amazing!
The pangeo-scratch
bucket is globally r/w for all gcs Pangeo users. We should set up a new bucket and link it to the service account for pangeo-forge.
@TomAugspurger - is the prefect agent still alive? I tried testing the full dataset pipeline (all years, all vars) and the github action is getting a timeout.
The pangeo-scratch bucket is globally r/w for all gcs Pangeo users. We should set up a new bucket and link it to the service account for pangeo-forge.
I've added a pangeo-forge-scratch
bucket to GCS and given the pangeo-forge service account read/write access.
It is alive.
These are the logs from the kubernetes pod started by prefect.
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/cli/dask_worker.py:277: UserWarning: The --bokeh/--no-bokeh flag has been renamed to --dashboard/--no-dashboard.
"The --bokeh/--no-bokeh flag has been renamed to --dashboard/--no-dashboard. "
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py", line 297, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py", line 435, in wait_for
await waiter
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py", line 304, in _
raise CommClosedError() from e
distributed.comm.core.CommClosedError
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py", line 297, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py", line 435, in wait_for
await waiter
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py", line 304, in _
raise CommClosedError() from e
distributed.comm.core.CommClosedError
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py", line 297, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py", line 435, in wait_for
await waiter
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py", line 304, in _
raise CommClosedError() from e
distributed.comm.core.CommClosedError
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py", line 297, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py", line 435, in wait_for
await waiter
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py", line 304, in _
raise CommClosedError() from e
distributed.comm.core.CommClosedError
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py", line 297, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py", line 435, in wait_for
await waiter
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/core.py", line 304, in _
raise CommClosedError() from e
distributed.comm.core.CommClosedError
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py", line 295, in _
await asyncio.wait_for(self.start(), timeout=timeout)
File "/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py", line 449, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/srv/conda/envs/notebook/bin/dask-worker", line 11, in <module>
sys.exit(go())
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 446, in go
main()
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 432, in main
loop.run_sync(run)
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 532, in run_sync
return future_cell[0].result()
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 426, in run
await asyncio.gather(*nannies)
File "/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py", line 301, in _
type(self).__name__, timeout
concurrent.futures._base.TimeoutError: Nanny failed to start in 60 seconds
I don't see those logs anywhere in the prefect UI. Perhaps because this is from before the job really gets started since the scheduler or workers failed to start.
I'll try to figure this out before the call today.
Fixed the permissions with gsutil iam ch serviceAccount:pangeo-forge@pangeo-181919.iam.gserviceaccount.com:roles/storage.objectAdmin gs://pangeo-forge-scratch
FYI, I modified the GCP settings yesterday. There was a "Bucket retention policy" set on pangeo-forge-scratch which prevented objects from being deleted. We instead want a lifecycle
policy on the scratch bucket that cleans things up.
Presumably, we'll also want a non-scratch bucket.
$ gsutil lifecycle set lifecycle.json gs://pangeo-forge-scratch
Setting lifecycle configuration on gs://pangeo-forge-scratch/...
$ cat lifecycle.json
{
"lifecycle": {
"rule": [
{
"action": {
"type": "Delete"
},
"condition": {
"age": 7,
"isLive": true
}
}
]
}
}
I've thrown this feedstock together to help us explore how to connect a CI system (e.g. GitHub Actions) to a Dask-Gateway Cluster / Prefect Cloud. The repository is laid out as follows:
recipe.pipeline
- A python module ready with aPangeoForge.Pipeline
object in it. Import asfrom recipe.pipeline import pipeline
.run.py
- A simple python script that imports the above described pipeline and (for now) prints some info about the flow contained therein..github.workflows.main
- A github action that installs a few dependencies and executesrun.py
.@TomAugspurger - hopefully this has the needed pieces to connect to what you were working on with Prefect Cloud.
cc @rabernat