jupyterhub / zero-to-jupyterhub-k8s

Helm Chart & Documentation for deploying JupyterHub on Kubernetes
https://zero-to-jupyterhub.readthedocs.io
Other
1.52k stars 791 forks source link

Spark on Kubernetes integration #1030

Closed h4gen closed 3 years ago

h4gen commented 5 years ago

Hello everybody!

I am using pangeo as configuration for my JupyterHub but decided to post this issue here as I think it is not pangeo specific. As some of you maybe know the current version of Spark (2.4) introduces PySpark support for the new kubernetes functionality of spark. I tried to get it running on my cluster by adapting this tutorial. I know this issue is primarily a PySpark issue but I thought it is maybe interesting to discuss it here as I can imagine it is interesting for other JupyterHub users too. Here is what I did:

  1. Adding this configuration to my cluster:

    kind: Role
    apiVersion: rbac.authorization.k8s.io/v1
    metadata:
    namespace: default
    name: spark-role
    rules:
    - apiGroups: [“”]
    resources: [“pods”]
    verbs: [“get”, “watch”, “list”, “edit”, “create”, “delete”]
    ---
    kind: RoleBinding
    apiVersion: rbac.authorization.k8s.io/v1
    metadata:
    name: spark
    namespace: default
    subjects:
    - kind: User
    name: system:serviceaccount:pangeo:daskkubernetes
    apiGroup: “”
    roleRef:
    kind: Role
    name: spark-role
    apiGroup: “”

    This extends the rights of the daskkubernetes service account which is necessary for pangeo to interact with dask properly.

  2. Creating a user pod from the current Jupyter PySpark docker image, which supports Spark 2.4

  3. Getting my master ip with kubectl cluster-info

  4. Create an new SparkContext in the following way:

    sc = SparkContext(master='k8s://https://<Master-IP>')

    This is the output of the context: image When I do kubectl get po --all-namespaces I can not see a new Spark pod running.

  5. Try to create a dummy data frame with:

    sqlc = SQLContext(sc)
    l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
    rdd = sc.parallelize(l)
    people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
    schemaPeople = sqlc.createDataFrame(people)

The last line sadly stucks and when I interrupt it this is the output:

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-6-f2a7a0f2bef0> in <module>
      3 rdd = sc.parallelize(l)
      4 people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
----> 5 schemaPeople = sqlc.createDataFrame(people)

/usr/local/spark/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    305         Py4JJavaError: ...
    306         """
--> 307         return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema)
    308 
    309     @since(1.3)

/usr/local/spark/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    744 
    745         if isinstance(data, RDD):
--> 746             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    747         else:
    748             rdd, schema = self._createFromLocal(map(prepare, data), schema)

/usr/local/spark/python/pyspark/sql/session.py in _createFromRDD(self, rdd, schema, samplingRatio)
    388         """
    389         if schema is None or isinstance(schema, (list, tuple)):
--> 390             struct = self._inferSchema(rdd, samplingRatio, names=schema)
    391             converter = _create_converter(struct)
    392             rdd = rdd.map(converter)

/usr/local/spark/python/pyspark/sql/session.py in _inferSchema(self, rdd, samplingRatio, names)
    359         :return: :class:`pyspark.sql.types.StructType`
    360         """
--> 361         first = rdd.first()
    362         if not first:
    363             raise ValueError("The first row in RDD is empty, "

/usr/local/spark/python/pyspark/rdd.py in first(self)
   1376         ValueError: RDD is empty
   1377         """
-> 1378         rs = self.take(1)
   1379         if rs:
   1380             return rs[0]

/usr/local/spark/python/pyspark/rdd.py in take(self, num)
   1358 
   1359             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1360             res = self.context.runJob(self, takeUpToNumLeft, p)
   1361 
   1362             items += res

/usr/local/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
   1049         # SparkContext#runJob.
   1050         mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1051         sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
   1052         return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
   1053 

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1253             proto.END_COMMAND_PART
   1254 
-> 1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
   1257             answer, self.gateway_client, self.target_id, self.name)

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in send_command(self, command, retry, binary)
    983         connection = self._get_connection()
    984         try:
--> 985             response = connection.send_command(command)
    986             if binary:
    987                 return response, self._create_connection_guard(connection)

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in send_command(self, command)
   1150 
   1151         try:
-> 1152             answer = smart_decode(self.stream.readline()[:-1])
   1153             logger.debug("Answer received: {0}".format(answer))
   1154             if answer.startswith(proto.RETURN_MESSAGE):

/opt/conda/lib/python3.6/socket.py in readinto(self, b)
    584         while True:
    585             try:
--> 586                 return self._sock.recv_into(b)
    587             except timeout:
    588                 self._timeout_occurred = True

Referring to the tutorial I think that the SparkContext needs more Information to run correctly. Sadly it does not throw any errors when creating it. Is there anybody with more spark/kubernetes knowledge interested in trying it and sharing insights? Edit: The main problem seems to be that referring to the pyspark api, there is no way to provide the necessary information to the SparkContext.

Thank you very much!

h4gen commented 5 years ago

Okay, I just figured out, that it is necessary to provide the self built docker images based on the executing environment (in this case the jupyter pyspark docker image) and to provide it to the SparkContext probably via the SparkConf object. I'll keep you posted.

consideRatio commented 5 years ago

Thanks for documenting what you learn @h4gen!

h4gen commented 5 years ago

So, some progress over here. I did the following to build the images and provide the necessary information to the SparkContext:

  1. Run the jupyter pyspark image:
    
    docker run -i --rm -e GRANT_SUDO=yes \
    -v /var/run/docker.sock:/var/run/docker.sock \ # This is important to expose the hosts docker daemon
    jupyter/pyspark-notebook:5b2160dfd919 # Tag with spark 2.4

2. Enter the running container to build executor images from environment

docker exec -it -u root bash


3. Install docker in container following [this example](https://www.digitalocean.com/community/tutorials/how-to-install-and-use-docker-on-ubuntu-18-04)

4. Build executor images from env following [the Spark on Kubernetes example](https://spark.apache.org/docs/latest/running-on-kubernetes.html#configuration)

cd $SPARK_HOME ./bin/docker-image-tool.sh -r -t my-tag build ./bin/docker-image-tool.sh -r -t my-tag push

Feel free to skip these steps and use my pre-built images from docker hub for testing out yourself (assuming I made no mistakes so far):

idalab/spark idalab/spark-r idalab/spark-py


5. This information ist then provided to the `SparkConf` on the user pod:

conf.setMaster('k8s://https://') conf.set('spark.kubernetes.container.image', 'idalab/spark-py:spark') conf.set('spark.submit.deployMode', 'cluster') conf.set('spark.executor.instances', 2) conf.setAppName('spark-k8s-test')


6.  And the `conf` is provided to the `SparkContext`:

sc = SparkContext(conf=conf)


Now I get the following error:

Exception Traceback (most recent call last)

in ----> 1 sc = SparkContext(conf=conf) /usr/local/spark/python/pyspark/context.py in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls) 113 """ 114 self._callsite = first_spark_call() or CallSite(None, None, None) --> 115 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) 116 try: 117 self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, /usr/local/spark/python/pyspark/context.py in _ensure_initialized(cls, instance, gateway, conf) 296 with SparkContext._lock: 297 if not SparkContext._gateway: --> 298 SparkContext._gateway = gateway or launch_gateway(conf) 299 SparkContext._jvm = SparkContext._gateway.jvm 300 /usr/local/spark/python/pyspark/java_gateway.py in launch_gateway(conf) 92 93 if not os.path.isfile(conn_info_file): ---> 94 raise Exception("Java gateway process exited before sending its port number") 95 96 with open(conn_info_file, "rb") as info: Exception: Java gateway process exited before sending its port number ``` Short google query leads to the asumption that this is a sudo problem in the user pod. Will investigate further the next days. Cheers!
h4gen commented 5 years ago

Hello everyone,

I further investigated the error and came up with a running configuration. I am running the following code on my user pod to get spark running:

from pyspark import *
import os

# Config Spark
conf.setMaster('k8s://https://<MASTER_IP>:443') # The port is important. Otherwise it won't run.
conf.set('spark.kubernetes.container.image', 'idalab/spark-py:spark') # Provide the image for the executor pods
conf.set('spark.submit.deployMode', 'client') # Only client mode is possible 
conf.set('spark.executor.instances', '2') # Set the number of executer pods
conf.setAppName('pyspark-shell')
conf.set('spark.driver.host', '<USER_POD_IP>') # This is the IP of the user pod in the K8s cluster
os.environ['PYSPARK_PYTHON'] = 'python3' # Needs to be explicitly provided as env. Otherwise workers run Python 2.7
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3'  # Same

# Create context
sc = SparkContext(conf=conf) 

So far everything I tried worked pretty nice. I will update this if I encounter any further problems.

Cheers!

consideRatio commented 5 years ago

Wieeee thank you so luch @h4gen! This may help me out personally a lot in the future!

h4gen commented 5 years ago

A few further thoughts on this.

  1. Right now the cluster admin has to provide the user with the <MASTER_IP> of the cluster to be able to connect to create the executor pods. Is there a more convenient way of asking for this IP for the user? As far as I understand the pods are assigned a DNS name like jupyter-username. Is there something similar for the K8s master? Or would it be possible for the hub to expose the <MASTER_IP> as environment variable in the user pod?

  2. Currently the UI is not accessible. As far as I can see pangeo uses the nbserverproxy to expose the dask dashboard to the user. I think this would be also easy to do for the Jupyter Spark image. The question is whether the link for the dashboard in the SparkContext can be manipulated by configuration to make it more easily accessible for the user. Right now the user would have to type-in the correct link himself (which is annoying to explain for a lot of users).

consideRatio commented 5 years ago

some loose thoughts, no clear answer: writing from mobile

The hub.extraConfig can expand the dictionary found in c.KubeSpawner.environment, this will influence the env of spawned user pods if u need to find it programmatically.

but u van also use the charts singleuser.extraEnv to set it directly from the chart config.

if you have a k8s service pointing to some pod, u can reach it with mysvc.mynamespace.svc.cluster.local ar URI btw.

note that the the jupyter-username is a pod name, and u cannot access that as a network identifier like google.se. if u would need that u would need to create a service for each user pod pointing to a pod with certain labels.

i think the master may always be reached with a fixed ip on GKE and other managed k8s procided by some cloud provider btw

dsludwig commented 5 years ago

Kubernetes creates the environment variable KUBERNETES_SERVICE_HOST, which contains the master IP (the IP for the kubernetes service, actually).

See: https://kubernetes.io/docs/concepts/services-networking/service/#discovering-services

To get the pod IP, it is probably most convenient to use Kubernetes' downward API: https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information/#use-pod-fields-as-values-for-environment-variables

env:
        - name: MY_POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP

(You could also just call hostname -i, or write the equivalent Python code).

consideRatio commented 5 years ago

Slam dunk @dsludwig ! :D

betatim commented 5 years ago

This is a cool write up with lots of useful bits of information. Could you be persuaded to write it up once more now that you know what "the answer" is and then post it on https://discourse.jupyter.org/c/jupyterhub/z2jh-k8s? Our thinking is that the discourse forum is a lot more discoverable (and better indexed by google?) than GitHub issues. Something to ponder.

h4gen commented 5 years ago

@dsludwig Thank you for the information. That was exactly what I was looking for. @betatim Sure thing! Will do it the next days.

h4gen commented 5 years ago

Before I do the documentation I would like to ask again if somebody can give me a hint regarding the Spark UI problem. I assumed it would be working with nbserverextension, but it does not :( I sum up all Information, I have:

  1. I create the context which gives me the link to the UI.
  2. The link created by Spark is obviously not accessible on the hub as it points to <POD_IP>:4040
  3. I checked on a local version of the user image if the UI is accessible. That works fine!
  4. As I want to access the port of the POD via browser, I installed nbserverextension in the pod to make it accessible via .../user/proxy/4040 but this does not seem to work
  5. Other ports are accessible via this method so I assume nbserverextension is working correctly.
  6. This is the output of npnetstat -pl:
    Active Internet connections (only servers)
    Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
    tcp        0      0 localhost:51084         0.0.0.0:*               LISTEN      23/python
    tcp        0      0 localhost:42415         0.0.0.0:*               LISTEN      26/python
    tcp        0      0 localhost:58607         0.0.0.0:*               LISTEN      26/python
    tcp        0      0 localhost:39601         0.0.0.0:*               LISTEN      23/python
    tcp        0      0 localhost:34996         0.0.0.0:*               LISTEN      27/python
    tcp        0      0 localhost:41208         0.0.0.0:*               LISTEN      27/python
    tcp        0      0 0.0.0.0:8888            0.0.0.0:*               LISTEN      7/python
    tcp        0      0 localhost:58553         0.0.0.0:*               LISTEN      23/python
    tcp        0      0 jupyter-hagen:45243     0.0.0.0:*               LISTEN      74/java
    tcp        0      0 localhost:38241         0.0.0.0:*               LISTEN      23/python
    tcp        0      0 localhost:35746         0.0.0.0:*               LISTEN      27/python
    tcp        0      0 localhost:38050         0.0.0.0:*               LISTEN      26/python
    tcp        0      0 localhost:52964         0.0.0.0:*               LISTEN      26/python
    tcp        0      0 localhost:60869         0.0.0.0:*               LISTEN      26/python
    tcp        0      0 localhost:59910         0.0.0.0:*               LISTEN      27/python
    tcp        0      0 jupyter-hagen:42343     0.0.0.0:*               LISTEN      74/java
    tcp        0      0 localhost:47911         0.0.0.0:*               LISTEN      26/python
    tcp        0      0 0.0.0.0:4040            0.0.0.0:*               LISTEN      74/java
    tcp        0      0 localhost:35305         0.0.0.0:*               LISTEN      27/python
    tcp        0      0 localhost:40810         0.0.0.0:*               LISTEN      23/python
    tcp        0      0 localhost:36362         0.0.0.0:*               LISTEN      23/python
    tcp        0      0 localhost:43627         0.0.0.0:*               LISTEN      74/java
    tcp        0      0 localhost:45067         0.0.0.0:*               LISTEN      27/python
    tcp        0      0 localhost:57547         0.0.0.0:*               LISTEN      26/python
    Active UNIX domain sockets (only servers)
    Proto RefCnt Flags       Type       State         I-Node   PID/Program name     Path

    One can see that the exposed Local Address has another format than the other ones which are accessible.

  7. This output has the environment variable '_JAVA_OPTIONS' set to "-Djava.net.preferIPv4Stack=true" as I thought it would be an IPv6 problem which seems to be the standard for java but it did not resolve the issue.

Any ideas on this? I'm a bit lost. Thank you!

ryanlovett commented 5 years ago

@h4gen Can you submit this as an issue to nbserverproxy so we can discuss it there? A couple of simple things to try:

easel commented 5 years ago

Not entirely sure if pangeo is sufficiently similar for this to be useful or not, but I'll throw it up here for the record. I've got Spark 2.4 on Kubernetes running nicely with standard Toree kernel and version 0.7 of the jupyterhub helm charts:

Create a config.yaml:

singleuser:
  serviceAccountName: spark
  image:
    name: jupyter/all-spark-notebook
    tag: 14fdfbf9cfc1
  extraEnv:
    SPARK_OPTS: >-
      --deploy-mode=client
      --master k8s://https://kubernetes.default.svc
      --conf spark.kubernetes.namespace=`cat /var/run/secrets//kubernetes.io/serviceaccount/namespace`
      --conf spark.driver.pod.name=${HOSTNAME}
      --conf spark.driver.host=`hostname -i`
      --conf spark.driver.port=19998
      --conf spark.kubernetes.container.image=docker.io/7thsense/spark:spark24  
      --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token
      --conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt 

Then install. helm upgrade jhub --install jupyterhub/jupyterhub --namespace jhub --version 0.7.0 --values config.yaml

I've elided the spark RBAC setup but it's the usual stuff from the spark-on-kubernetes docs.

At least for me, figuring out how to use the service account credentials (and that they were needed) and getting the pod context configuration (namespace, pod name, ip) into the environment variable were tricky since most of the examples I've seen were using downward config maps which don't seem to "fit" into extraVars.

h4gen commented 5 years ago

@easel This is super helpful! A few questions regarding your setup:

  1. Did you also create your executor images from your environment as is described here or do you have a more convenient way?
  2. Is your Spark UI working with this setup?
  3. How exactly do you evaluate your SPARK_OPTS at container start? I think this is pretty elegant.
easel commented 5 years ago

Hi @h4gen, answers per above:

  1. I just built the images per the "spark on Kubernetes" docs at https://spark.apache.org/docs/latest/running-on-kubernetes.html#docker-images on my Mac -- the url's above are in a public repo though and you could just use them.

  2. Spark ui works but you have to forward the port into the single-user pod, something like kubectl port-forward pod/jupyter-erik 4040:4040. It would be cool to get it wired back into the hub or an ingress somehow but I haven't done anything like that.

  3. I may not fully understand your question. It just happens "magically", the extraVars pushes environment variables into the kernel and Toree picks it up. I guess maybe I got the idea from https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#in-an-apache-toree-scala-notebook ?

h4gen commented 5 years ago

Thanks for the answers @easel. Few more questions:

  1. Do you have to forward every uder pod by hand or is there a way to forward all automatically? Maybe jupyterhub/nbserverproxy#57 Is also interesting for you.

  2. I see now how the variables are set. But weirdly it does not work for me because the shell commands are not evaluated but set as string. Example: I set:

      SPARK_OPTS: >-
        --deploy-mode=client
        --master=k8s://https://kubernetes.default.svc
        --conf spark.driver.host=$(hostname -i)
        --conf spark.kubernetes.container.image=idalab/spark-py:spark
        --conf spark.ui.proxyBase=${JUPYTERHUB_SERVICE_PREFIX}proxy/4040
    ...

    but on the pod this results in an environment variabel like:

    'SPARK_OPTS': '--deploy-mode=client --master=k8s://https://kubernetes.default.svc --conf spark.driver.host=$(hostname -i) --conf spark.kubernetes.container.image=idalab/spark-py:spark --conf spark.ui.proxyBase=${JUPYTERHUB_SERVICE_PREFIX}proxy/4040

    Does anybody know why this is the case?

Thank you!

h4gen commented 5 years ago

@betatim Posted consolidated results here.

metrofun commented 5 years ago

Having the same issue with spark.driver.host. How to set this dynamic value before the Docker's ENDPOINT executes? I am using KubeSpawner, if it is relevant.

stevenstetzler commented 5 years ago

@metrofun The best way I found to set this dynamically is to write to a spark-defaults.conf file when the container starts. In your JupyterHub configuration:

jupyterhub:
  singleuser:
    lifecycleHooks:
      postStart:
        exec:
          command:
            - "/bin/sh"
            - "-c"
            - |
              echo "spark.driver.host=$(hostname -i)" >> $SPARK_HOME/conf/spark-defaults.conf
stevenstetzler commented 5 years ago

@h4gen were you able to find a way to automatically set SPARK_PUBLIC_DNS? I am running into the same issue where

jupyterhub:
  singleuser:
    extraEnv:
      SPARK_PUBLIC_DNS: my.url.com${JUPYTERHUB_SERVICE_PREFIX}proxy/4040/jobs/

does not evaluate correctly, and setting environment variables in the postStart lifecycleHooks for the container doesn't seem to work either (it will result in an empty environment variable):

jupyterhub:
  singleuser:
    lifecycleHooks:
      postStart:
        exec:
          command:
            - "/bin/sh"
            - "-c"
            - |
              export SPARK_PUBLIC_DNS="my.url.com${JUPYTERHUB_SERVICE_PREFIX}proxy/4040/jobs/"

I was thinking of opening a separate issue about setting environment variables that require other environment variables, but wanted to see if you had a solution first.

Edit: Of course the second solution won't work as that will only set the environment variable in the postStart shell. My workaround is to set SPARK_PUBLIC_DNS in the single user's Entrypoint script. I've opened an issue about this to see if this functionality is possible with extraEnv: https://github.com/jupyterhub/zero-to-jupyterhub-k8s/issues/1255

jasonsmithio commented 4 years ago

@h4gen have you updated this at all using updates over the past year or so?

h4gen commented 4 years ago

Hi, @TheJaySmith . Sorry no updates from my side. We switched to kubeflow.

consideRatio commented 3 years ago

@h4gen thank you soo much for writing this up publically, too much work is repeated when we work in isolation! :heart:

At this point, I'll go ahead and close this issue as its becoming stale and doesn't involve an action point - it is still very findable with search engines though.

Lots of love from Sweden!