Open delgadom opened 6 years ago
Alternatively, I think these are slightly less hacky?
class WorkerNotFoundError(Exception):
pass
class PodNotFoundError(Exception):
pass
def find_worker_processing_future(future, cluster):
for worker in cluster.scheduler.workers.values():
if future.key in list(map(lambda x: x.key, worker.processing)):
return worker
raise WorkerNotFoundError('No worker found processing future {}'.format(future.key))
def find_pod_containing_worker(worker, cluster):
for pod in cluster.pods():
if pod.status.pod_ip == worker.host:
return pod
raise PodNotFoundError('No pod found running {}'.format(worker.__repr__()))
def get_pod_processing_future(future, cluster):
return find_pod_containing_worker(
find_worker_processing_future(future, cluster),
cluster)
Also, this kubernetes exec example seems to show promise, though I can't get the auth to work:
def exec_command_on_pod(pod, cluster, exec_command):
res = cluster.core_api.connect_get_namespaced_pod_exec(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
command=exec_command,
stderr=True, stdin=False,
stdout=True, tty=False)
return res
@bolliger32 the second comment may be helpful. I want the last bit to work, but this gets us at least partway there...
'''
To install google cloud SDK, follow the instructions in the
google cloud docs: https://cloud.google.com/sdk/docs/. On
compute.rhg.com, simply make sure your installation is up
to date:
.. code-block:: bash
sudo apt-get update && sudo apt-get upgrade -y google-cloud-sdk
Then, log in and set up your account:
.. code-block:: bash
gcloud init
To install kubectl:
.. code-block:: bash
sudo apt-get install kubectl
To set up credentials for directly executing commands on the pods:
.. code-block:: bash
# Get credentials for the cluster. For compute.rhg.com, the
# cluster is "jhub-cluster", and the project "rhg-project-1".
# For compute.impactlab.org, the cluster is "impactlab-hub",
# and the project "compute-impactlab".
gcloud container clusters get-credentials jhub-cluster \
--zone us-west1-a --project rhg-project-1
These steps only need to be executed once per machine
'''
import os
from tempfile import TemporaryFile
import tarfile
from kubernetes.stream import stream as kubernetes_stream
from kubernetes import client as kube_client, config as kube_config
_kube_core_v1_api = None
def _reload_kubernetes_config():
global _kube_core_v1_api
_kube_core_v1_api = kube_client.CoreV1Api()
kube_config.load_kube_config(
config_file='/home/jovyan/.kube/config',
client_configuration=_kube_core_v1_api.api_client.configuration,
persist_config=True)
class WorkerNotFoundError(Exception):
pass
class PodNotFoundError(Exception):
pass
def find_worker_processing_future(future, cluster):
'''
Look up the worker on a cluster currently processing a future
'''
_reload_kubernetes_config()
for worker in cluster.scheduler.workers.values():
if future.key in list(map(lambda x: x.key, worker.processing)):
return worker
raise WorkerNotFoundError('No worker found processing future {}'.format(future.key))
def find_pod_containing_worker(worker, cluster):
'''
Look up the pod on a cluster which hosts a worker
'''
_reload_kubernetes_config()
for pod in cluster.pods():
if pod.status.pod_ip == worker.host:
return pod
raise PodNotFoundError('No pod found running {}'.format(worker.__repr__()))
def get_pod_processing_future(future, cluster):
'''
Look up the pod on a cluster currently processing a future
'''
_reload_kubernetes_config()
return find_pod_containing_worker(
find_worker_processing_future(future, cluster),
cluster)
def execute_command_on_pod(pod, command):
'''
Execute an arbitrary command on a pod
Adapted from adapted from
https://github.com/aogier/k8s-client-python/blob/master/examples/exec.py
Parameters
----------
pod : object
A :py:class:`kubernetes.client.models.v1_pod.V1Pod` object
cmd : list
the command to executed, formatted as a list, with
the program to execute first, and the arguments after
Examples
--------
.. code-block:: python
>>> res = execute_command_on_pod(pod, ['ls', '/'])
>>> print(res)
bin
boot
dev
etc
home
lib
lib64
media
mnt
opt
proc
root
run
sbin
srv
sys
tmp
usr
var
'''
_reload_kubernetes_config()
resp = kubernetes_stream(
_kube_core_v1_api.connect_get_namespaced_pod_exec,
name=pod.metadata.name,
namespace=pod.metadata.namespace,
command=command,
stderr=True, stdin=False,
stdout=True, tty=False)
return resp
def copy_file_to_pod(pod, src, dst):
'''
Upload a file to the pod
adapted from
https://github.com/aogier/k8s-client-python/blob/12f1443895e80ee24d689c419b5642de96c58cc8/examples/exec.py#L101
Parameters
----------
pod : object
A :py:class:`kubernetes.client.models.v1_pod.V1Pod` object
src : str
The location of the file to upload on the local machine
dst : str
The destination of the file on the pod
'''
_reload_kubernetes_config()
# Copying file client -> pod
exec_command = ['tar', 'xvf', '-', '-C', os.path.dirname(dst)]
resp = kubernetes_stream(
_kube_core_v1_api.connect_get_namespaced_pod_exec,
pod.metadata.name,
pod.metadata.namespace,
command=exec_command,
stderr=True, stdin=True,
stdout=True, tty=False,
_preload_content=False)
with TemporaryFile() as tar_buffer:
with tarfile.open(fileobj=tar_buffer, mode='w') as tar:
tar.add(src)
tar.add(name=src, arcname=os.path.basename(dst))
tar_buffer.seek(0)
commands = []
commands.append(tar_buffer.read())
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
print("STDOUT: %s" % resp.read_stdout())
if resp.peek_stderr():
print("STDERR: %s" % resp.read_stderr())
if commands:
c = commands.pop(0)
#print("Running command... %s\n" % c)
resp.write_stdin(c.decode())
else:
break
resp.close()
def copy_file_from_pod(pod, src, dst):
'''
Download a file from the pod
adapted from
https://github.com/aogier/k8s-client-python/blob/12f1443895e80ee24d689c419b5642de96c58cc8/examples/exec.py#L101
Parameters
----------
pod : object
A :py:class:`kubernetes.client.models.v1_pod.V1Pod` object
src : str
The location of the file on the pod
dst : str
The destination of the file on the local machine
'''
_reload_kubernetes_config()
# Copying file pod -> client
exec_command = ['tar', 'cf', '-', '-C', os.path.dirname(src), os.path.basename(src)]
with TemporaryFile() as tar_buffer:
resp = kubernetes_stream(
_kube_core_v1_api.connect_get_namespaced_pod_exec,
pod.metadata.name,
pod.metadata.namespace,
command=exec_command,
stderr=True, stdin=True,
stdout=True, tty=False,
_preload_content=False)
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
out = resp.read_stdout()
#print("STDOUT: %s" % len(out))
tar_buffer.write(out.encode('ascii'))
if resp.peek_stderr():
print("STDERR: %s" % resp.read_stderr())
resp.close()
tar_buffer.flush()
tar_buffer.seek(0)
with tarfile.open(fileobj=tar_buffer, mode='r:') as tar:
tar.extract(os.path.basename(src), os.path.dirname(dst))
This can be used to execute an arbitrary command on a worker currently processing a future, e.g.:
In [1]: pod = get_pod_processing_future(running_futures[0], cluster)
In [2]: res = execute_command_on_pod(p, ['cat', '/mytmpdir/geoclaw.err'])
In [3]: print(res)
Out[3]:
/opt/conda/envs/worker/lib/python3.6/site-packages/numpy/core/_methods.py:29: RuntimeWarning: invalid value encountered in reduce
return umr_minimum(a, axis, None, out, keepdims)
system prerequisites to use the block of code above:
sudo apt-get update && sudo apt-get install --upgrade google-cloud-sdk kubectl
pip install --upgrade parameterize_jobs rhg_compute_tools
rm -rf /home/jovyan/.kube/
Log into google cloud:
gcloud init
Log into your personal account (create a new profile). I hope to get rid of this soon, and enable the use of service accounts, but for now this works. Use rhg-project-1
/us-west1-a
(13) for the project/region for compute.rhg.com. Use compute-impactlab
for the compute.impactlab.org project.
Then pull your credentials for the kubernetes cluster, e.g.:
gcloud container clusters get-credentials jhub-cluster --zone us-west1-a --project rhg-project-1
I've found I usually need to execute a kubectl exec command from the command line before the python version will work. Not sure why this is... it's super frustrating. But then this works fine... for a while, and then you'll have to run the get-credentials
and kubectl exec
lines again. Obviously something to do with the command modifying the config or caching something, but not sure what's missing from our python setup. E.g. run the following:
kubectl exec dask-delgadom-08c6d5e7-1gfwsc --namespace rhodium-jupyter -- ls /
and then you should be good to go for a while, even with different pods. You'll periodically need to run the last two commands again to refresh.
note that the upload/download functions currently require that the files be ascii and unicode compatible. Bytes are not supported. See this thread... apparently the kubernetes stream objects expect the data to be strings.
Goals
Given a list of futures (partially completed) and a cluster, should be easy to:
These should be tested on RHG hub, and, if stable/useful, submitted as PRs to
dask_kubernetes
.What we have so far
There have to be faster ways of doing this, but I don't see them in the dask_kubernetes docs. They're admittedly very hacky though.
The next steps of executing things on the pods could be done from the command line, but using the kubernetes api would obviously be more elegant. pods returned by
get_pod_from_ip
have a bunch of useful info. it's possible we could get the appropriately configured/authenticated kubernetes client from the dask cluster or client and execute things on this. The kubernetes package has some examples of callingexec
using the kubernetes api.