kestra-io / plugin-spark

Apache License 2.0
3 stars 2 forks source link

Fix/fixes and improvements #32

Closed loicmathieu closed 1 year ago

loicmathieu commented 1 year ago

Tested with multiple flows and each time the logs are shown in the UI so I consider that it fixes #30

loicmathieu commented 1 year ago

SparkCLI example:

id: spark-cli
namespace: dev
tasks:
  - id: workingDirectory
    type: io.kestra.core.tasks.flows.WorkingDirectory
    tasks:
    - id: local-file
      type: io.kestra.core.tasks.storages.LocalFiles
      inputs:
        pi.py: |
          import sys
          from random import random
          from operator import add
          from pyspark.sql import SparkSession

          if __name__ == "__main__":
              spark = SparkSession \
                  .builder \
                  .appName("PythonPi") \
                  .getOrCreate()

              partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
              n = 100000 * partitions

              def f(_: int) -> float:
                  x = random() * 2 - 1
                  y = random() * 2 - 1
                  return 1 if x ** 2 + y ** 2 <= 1 else 0

              count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
              print("Pi is roughly %f" % (4.0 * count / n))

              spark.stop()
    - id: hello
      type: io.kestra.plugin.spark.SparkCLI
      docker:
        networkMode: host
      commands:
        - spark-submit --name Pi --master spark://localhost:37077 pi.py
loicmathieu commented 1 year ago

Python SparkSubmit example:

id: hello-spark-submit
namespace: dev
tasks:
  - id: hello
    type: io.kestra.plugin.spark.PythonSubmit
    master: spark://localhost:37077
    runner: DOCKER
    docker:
      networkMode: host
      user: root
    args:
    - "10"
    mainScript: |
      import sys
      from random import random
      from operator import add
      from pyspark.sql import SparkSession

      if __name__ == "__main__":
          spark = SparkSession \
              .builder \
              .appName("PythonPi") \
              .getOrCreate()

          partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
          n = 100000 * partitions

          def f(_: int) -> float:
              x = random() * 2 - 1
              y = random() * 2 - 1
              return 1 if x ** 2 + y ** 2 <= 1 else 0

          count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
          print("Pi is roughly %f" % (4.0 * count / n))

          spark.stop()