polyseam / cndi

Self-Host Cloud-Native Apps with the Ease of PaaS
https://cndi.dev
Apache License 2.0
180 stars 7 forks source link

[Template]: spark #327

Open johnstonmatt opened 1 year ago

johnstonmatt commented 1 year ago

https://github.com/GoogleCloudPlatform/spark-on-k8s-operator

vipmarwah commented 2 months ago

The Spark operator has been taken over by Kubeflow.

repoURL: 'https://kubeflow.github.io/spark-operator' targetRevision: 2.0.0-rc.0

vipmarwah commented 2 months ago

The Spark operator allows Spark applications to be defined declaratively and run without needing to manage the Spark submission process. The architecture consist of:

  1. Spark Application Controller: handles creation, updates and deletion of spark application
  2. Submission runner: handles application submissions and run spark-submit
  3. Spark pod monitor : monitor pod status and send updates to controller.
  4. Mutating Admission Webhook: handles customizations for Spark driver and executor pods
vipmarwah commented 2 months ago

Spark Applications consist of a driver process and a set of executor processes. The driver process executes your main() function, resides on a node within the cluster, and is responsible for three key tasks: managing information about the Spark application, handling user inputs or program instructions, and coordinating the analysis, distribution, and scheduling of tasks across the executors. The driver process is the heart of a Spark Application and maintains all relevant information during the lifetime of the application. The executors are responsible for carrying out the tasks assigned to them by the driver. Each executor has two main responsibilities: executing the code assigned by the driver and reporting the status of the computation on that executor back to the driver node.

Basic structure of a SparkApplication to run the SparkPI program when SparkPI program is embedded within the Spark image:

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi-python
  namespace: default
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: spark:3.5.2
  imagePullPolicy: IfNotPresent
  mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
  sparkVersion: 3.5.2
  driver:
    cores: 1
    memory: 512m
    serviceAccount: spark-operator-spark
  executor:
    instances: 1
    cores: 1
    memory: 512m
vipmarwah commented 2 months ago

Basic structure of a SparkApplication to run the SparkPI program hosted in a GitHub public repository.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi-python
  namespace: default
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: spark:3.5.2
  imagePullPolicy: IfNotPresent
  mainApplicationFile: 'https://raw.githubusercontent.com/username/RepositoryName/BranchName/prog.py'
  sparkVersion: 3.5.2
  driver:
    cores: 1
    memory: 512m
    serviceAccount: spark-operator-spark
  executor:
    instances: 1
    cores: 1
    memory: 512m
vipmarwah commented 2 months ago

If the GitHub repository is private, we need to define GitHub secrets in the cluster manifest and use git-sync as a sidecar to synchronize the repository into a local directory.

Define the github secret.

 apiVersion: v1
 kind: Secret
 metadata:
   name: github-auth-secret  # Name of the secret
   namespace: default        # Namespace where the secret will be created
 stringData:
   GITHUB_USER: $cndi_on_ow.seal_secret_from_env_var(GIT_USERNAME) 
   GITHUB_TOKEN: $cndi_on_ow.seal_secret_from_env_var(GIT_TOKEN)      

Structure of SparkApplication using gitsync as a sidecar

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi-python
  namespace: default
  annotations: 
    argocd.argoproj.io/sync-options: SkipDryRunOnMissingResource=true
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: spark:3.5.0
  imagePullPolicy: Always
  mainApplicationFile: local:///git/repo/pythonprogs/prog.py
  sparkVersion: 3.5.0
  volumes:
    - name: spark-local-dir     # Create volume to store git repository  
      emptyDir: {}
  driver:
    labels:
      version: 3.5.0
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-spark-operator-spark
    volumeMounts:
      - name: spark-local-dir
        mountPath: /git/repo
    sidecars:
    - name: git-sync-sidecar
      image: registry.k8s.io/git-sync/git-sync:v4.2.3
      args:
        - --repo= 'https://raw.githubusercontent.com/username/RepositoryName'
        - --root=/git/repo
      env:
        - name: GITSYNC_USERNAME
          valueFrom:
            secretKeyRef:
              name: github-auth-secret
              key: GITHUB_USER
        - name: GITSYNC_PASSWORD
          valueFrom:
            secretKeyRef:
              name: github-auth-secret
              key: GITHUB_TOKEN   
        - name: GITSYNC_BRANCH
          value: "main"
        - name: GITSYNC_REV
          value: "v1.0.0"
        - name: GITSYNC_WAIT
          value: "60"
        - name: GITSYNC_ONE_TIME
          value: "false"
      volumeMounts:
        - name: spark-local-dir
          mountPath: /git/repo
  executor:
    labels:
      version: 3.5.0
    instances: 1
    cores: 1
    coreLimit: 1200m
    memory: 512m

In the above program, git-sync will synchronize the Git repository every 60 seconds, downloading the entire repository into the local directory /git/repo under the volume spark-local-dir. Since the spark-local-dir volume is mounted on both the SparkApplication and the sidecar, the SparkApplication will access the latest version of the GitHub repository and execute the required program.

vipmarwah commented 2 months ago

If a data pipeline needs to be scheduled at a specific time, the SparkApplication should be replaced with a ScheduledSparkApplication, including the scheduling details. Below are the required changes.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: ScheduledSparkApplication
metadata:
  name: spark-pi-python
  namespace: default
spec:
  schedule: "@every 10m"
  concurrencyPolicy: Replace   
  # concurrencyPolicy values can be : 
  # Allow (If the previous job hasn't finished and it's time for the next run, both jobs will run in parallel)  
  # Replace (If a new job is scheduled while the previous job is still running, Kubernetes will stop the previous job and start the new one)
  # Forbid ( If the previous job is still running, the next scheduled job will not start until the current one finishes)
  template:
    type: Python
    pythonVersion: "3"
    # same as defined above in spark application
    ..........
    ..........
    ..........
    volumes:
      .........
      .........
    driver:
      labels:
        .........
      cores: 1
      ..........
      ..........
      serviceAccount: spark-spark-operator-spark
      volumeMounts:
        ..........
        ..........
      initContainers:
      - name: git-sync-sidecar
        image: registry.k8s.io/git-sync/git-sync:v4.2.3
        args:
          - --repo= 'https://raw.githubusercontent.com/username/RepositoryName'
          - --root=/git/repo
        env:
          - name: GITSYNC_USERNAME
            valueFrom:
              secretKeyRef:
                name: github-auth-secret
                key: GITHUB_USER
          - name: GITSYNC_PASSWORD
            valueFrom:
              secretKeyRef:
                name: github-auth-secret
                key: GITHUB_TOKEN   
          - name: GITSYNC_BRANCH
            value: "main"
          - name: GITSYNC_REV
            value: "v1.0.0"
          - name: GITSYNC_ONE_TIME #If GITSYNC_ONE_TIME is set to false, the driver pod will remain active continuously syncing the GitHub repository, preventing the executor from executing the submitted job.
            value: "true"

        volumeMounts:  #volume should be same as volume mounted under driver
          ..........
          ..........
    executor:
      labels:
        .........
      instances: 1
      .........
      .........
vipmarwah commented 2 months ago

The schedule field in a ScheduledSparkApplication uses the cron syntax to define when the Spark application should run. Cron expressions consist of five fields that specify the minute, hour, day of the month, month, and day of the week when the job should execute. Here's the basic format:


| | | | | | | | | +--- Day of the week (0-7) (Sunday=0 or 7, Monday=1, etc.) | | | +----- Month (1-12) | | +------- Day of the month (1-31) | +--------- Hour (0-23) +----------- Minute (0-59)

e.g schedule: " " (runs every minute) schedule: "30 15 " (runs every day 3:30 PM) schedule: "0 10 1" (runs every monday 10 AM) schedule: "0 2 1 " (runs every month 2 AM)

vipmarwah commented 2 months ago

PVC is needed to manage data storage that persists across the lifecycle of Spark jobs. I provided an example of using a PVC to store logs for Spark.

  1. Declare a storage class as below. Since the Spark application uses the Spark user for job executions, we need to assign permissions to the Spark user when declaring a storage class. Uid for spark application is : 185. Permissions for files and directories can be set as needed. For demonstration purposes, we assigned 0777.
    apiVersion: storage.k8s.io/v1
    kind: StorageClass
    metadata:
    name:  my-new-sc
    namespace: default
    annotations: 
    argocd.argoproj.io/sync-options: SkipDryRunOnMissingResource=true
    provisioner: file.csi.azure.com
    reclaimPolicy: Delete
    volumeBindingMode: WaitForFirstConsumer
    mountOptions:
    - dir_mode=0777
    - file_mode=0777
    - uid=185
    - gid=185
    parameters:
    skuName: Standard_LRS
  2. PVC declaration
    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
    name: spark-pvc
    namespace: default
    annotations: 
    argocd.argoproj.io/sync-options: SkipDryRunOnMissingResource=true
    spec:
    storageClassName: my-new-sc
    accessModes:
    - ReadWriteMany
    volumeMode: Filesystem
    resources:
    requests:
      storage: 5Gi

    To utilize this persistent volume in the SparkApplication/ScheduledSparkApplication, it needs to be mounted in the Spark driver as shown in the example below.

    
    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: ScheduledSparkApplication
    metadata:
    name: spark-pi-python
    namespace: default
    annotations: 
    argocd.argoproj.io/sync-options: SkipDryRunOnMissingResource=true
    spec:
    schedule: "@every 5m"
    concurrencyPolicy: Allow
    template:
    type: Python
    pythonVersion: "3"
    mode: cluster
    .........................
    .........................   
    sparkConf:  # Enable the SparkApplication to log events
      spark.eventLog.enabled: "true"
      spark.eventLog.dir: "/mnt/spark-logs" # it will log the events in mounted volume.
    volumes:     # Create a volume using the PVC created above. 
      - name: spark-data
        persistentVolumeClaim:
          claimName: spark-pvc   # Ensure to use the same name as the PVC created earlier
    driver:
      ...........
      ...........    
      volumeMounts:
        - name: spark-data
          mountPath: /mnt/spark-logs
vipmarwah commented 1 month ago

Note: Template works well with instance type Standard_D2_v3, the default Standard_D2s_v3 sometimes hang due to limited resources error . Image