kestra-io / plugin-spark

Apache License 2.0
3 stars 2 forks source link

Capture logs (stdout and stderr) from a Spark job #30

Closed anna-geller closed 1 year ago

anna-geller commented 1 year ago

Feature description

When running this flow:

id: spark-submit
namespace: dev

tasks:
  - id: spark
    type: io.kestra.plugin.spark.PythonSubmit
    runner: DOCKER
    docker:
      image: tabulario/spark-iceberg
      networkMode: host
      user: root
      entryPoint: 
      - /bin/sh
      - -c
    master: spark://localhost:8082 # 7077
    mainScript: |
      import sys
      from random import random
      from operator import add
      from pyspark.sql import SparkSession

      if __name__ == "__main__":
          spark = SparkSession \
              .builder \
              .appName("PythonPi") \
              .getOrCreate()

          partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
          n = 100000 * partitions

          def f(_: int) -> float:
              x = random() * 2 - 1
              y = random() * 2 - 1
              return 1 if x ** 2 + y ** 2 <= 1 else 0

          count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
          print("Pi is roughly %f" % (4.0 * count / n))

          spark.stop()

I would expect that the printed output would be captured in the logs, but it isn't. It would be great to see the logs directly in Kestra UI.

image

anna-geller commented 1 year ago

extra info: even when setting verbose: true, logs are not captured

image

loicmathieu commented 1 year ago

In fact I also had this strange behaviour when trying the Tabular spark images. It seems to be linked to these images (or maybe the example) as logs are correctly displayed with other images.

anna-geller commented 1 year ago

when I exec into that container though, the logs are captured, so must be some Docker issue:

image

anna-geller commented 1 year ago

ok, spent almost 1h trying to set it up, giving up for now :)

id: spark-submit
namespace: dev

tasks:
  - id: wdir
    type: io.kestra.core.tasks.flows.WorkingDirectory
    tasks:
      - id: local
        type: io.kestra.core.tasks.storages.LocalFiles
        inputs:
          log4j2.properties: |
            log4j.rootCategory=DEBUG, console
        outputs:
          - "*.properties"
      - id: spark
        type: io.kestra.plugin.spark.PythonSubmit
        configurations:
          spark.logConf: "true"
          spark.eventLog.enabled: "true"
          # log4j.logger.org.apache.spark.api.python.PythonGatewayServer: DEBUG
        verbose: true
        appFiles:
          log4j2.properties: "{{outputs.local.uris['log4j2.properties']}}"
        runner: DOCKER
        docker:
          image: tabulario/spark-iceberg
          networkMode: host
          user: root
          entryPoint: 
          - /bin/sh
          - -c
        master: spark://localhost:8082 # 7077
        mainScript: |
          import sys
          from random import random
          from operator import add
          from pyspark.sql import SparkSession

          if __name__ == "__main__":
              spark = SparkSession \
                  .builder \
                  .appName("PythonPi") \
                  .getOrCreate()
              spark.sparkContext.setLogLevel("DEBUG")
              partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
              n = 100000 * partitions

              def f(_: int) -> float:
                  x = random() * 2 - 1
                  y = random() * 2 - 1
                  return 1 if x ** 2 + y ** 2 <= 1 else 0

              count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
              print("Pi is roughly %f" % (4.0 * count / n))

              spark.stop()

I also tried mounting log4j.properties:

log4j.rootCategory=DEBUG, console
log4j.logger.org.apache.spark.api.python.PythonGatewayServer=DEBUG
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[spark][%d{yyyy-MM-dd HH:mm:ss}][%p][%c][%m]%n
log4j.appender.publicFile.layout.ConversionPattern=[spark][%p][%d{yy/MM/dd HH:mm:ss}][%c][%m]%n

to the spark-tabular container:

services:
  spark-iceberg:
    image: tabulario/spark-iceberg
    container_name: spark-iceberg
    networks:
      iceberg_net:
    depends_on:
      - rest
      - minio
    volumes:
      - ./warehouse:/home/iceberg/warehouse
      - ./notebooks:/home/iceberg/notebooks/notebooks
      - /Users/anna/gh/KESTRA_REPOS/dev/spark/log4j2.properties:/home/iceberg/conf/log4j2.properties
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
    ports:
      - 8888:8888
      - 8082:8080
      - 10000:10000
      - 10001:10001

based on these sources, this should be working now, but logs are still not captured despite log4j settings and setting log level on the spark context:

anna-geller commented 1 year ago

also, this seems to be a general issue not bound to the tabular image @loicmathieu

here is example flow using bitnami image:

id: spark-bitnami-pi
namespace: dev

tasks:
  - id: spark
    type: io.kestra.plugin.spark.PythonSubmit
    verbose: true
    runner: DOCKER
    docker:
      image: bitnami/spark
      networkMode: host
      user: root
      entryPoint:
      - /bin/sh
      - -c
    master: spark://localhost:7077
    mainScript: |
      import sys
      from random import random
      from operator import add
      from pyspark.sql import SparkSession

      if __name__ == "__main__":
          spark = SparkSession \
              .builder \
              .appName("PythonPi") \
              .getOrCreate()
          spark.sparkContext.setLogLevel("DEBUG")
          partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
          n = 100000 * partitions

          def f(_: int) -> float:
              x = random() * 2 - 1
              y = random() * 2 - 1
              return 1 if x ** 2 + y ** 2 <= 1 else 0

          count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
          print("Pi is roughly %f" % (4.0 * count / n))

          spark.stop()

here is a simple docker compose starting spark master on local port 7077:

version: "3"

services:
  spark:
    image: bitnami/spark
    container_name: bitnami
    network_mode: host
    volumes:
      - /Users/anna/gh/KESTRA_REPOS/dev/spark/log4j2.properties:/opt/bitnami/spark/conf/log4j2.properties

same issue, no logs: image

can you share more info how to set up other Spark image that captures the logs for you?