Use Spark Plugins to extend Apache Spark with custom metrics and executors' startup actions.
Author and contact: Luca.Canali@cern.ch
org.apache.spark.api.Plugin
interface, they can be written in Scala or Java
and can be used to run custom code at the startup of Spark executors and driver. --conf spark.plugins=<list of plugin classes>
--jars
and --packages
namespace=plugin.<Plugin Class Name>
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3
jar
. For example:
sbt +package
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3 --conf spark.plugins=ch.cern.DemoPlugin
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3 --conf spark.plugins=ch.cern.DemoMetricsPlugin
ch.cern.DemoMetricsPlugin.DriverTest42
: a gauge reporting a constant integer value, for testing.--conf spark.plugins=ch.cern.RunOSCommandPlugin
/usr/bin/touch /tmp/plugin.txt
--conf spark.cernSparkPlugin.command="command or script you want to run"
bin/spark-shell --master yarn \
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3 \
--conf spark.plugins=ch.cern.RunOSCommandPlugin
/tmp/plugin.txt
has been
created on the executor machines. Grafana Pyroscope is a tool for continuous profiling and Flame Graph visualization. This plugin allows to integrate Apache Spark and Pyroscope.
For details see:
How to profile Spark with Pyroscope
An example of how to put all the configuration together and start Spark on a cluster with Pyroscope Flame Graph continuous monitoring. Example:
Start Pyroscope
./pyroscope -server.http-listen-port 5040
docker run -it -p 5040:4040 grafana/pyroscope
Spark Spark (spark-shell, PySpark, spark-submit
bin/spark-shell --master yarn \
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3,io.pyroscope:agent:0.13.0 \ # update to use the latest versions
--conf spark.plugins=ch.cern.PyroscopePlugin \
--conf spark.pyroscope.server="http://<myhostname>:5040" # match with the server and port used when starting Pyroscope
Spark configurations:
This plugin adds the following configurations:
--conf spark.pyroscope.server - > default "http://localhost:4040", update to match the server name and port used by Pyroscope
--conf spark.pyroscope.applicationName -> default spark.conf.get("spark.app.id")
--conf spark.pyroscope.eventType -> default ITIMER, possible values ITIMER, CPU, WALL, ALLOC, LOCK
Example:
This is an example of how to use the configuration programmatically (using PySpark):
from pyspark.sql import SparkSession
# Get the Spark session
spark = (SparkSession.builder.
appName("Instrumented app").master("yarn")
.config("spark.executor.memory","16g")
.config("spark.executor.cores","4")
.config("spark.executor.instances", 2)
.config("spark.jars.packages", "ch.cern.sparkmeasure:spark-plugins_2.12:0.3,io.pyroscope:agent:0.13.0")
.config("spark.plugins", "ch.cern.PyroscopePlugin")
.config("spark.pyroscope.server", "http://<myhostname>:5040")
.getOrCreate()
)
Configure with: --conf spark.plugins=ch.cern.CgroupMetrics
Optional configuration: --conf spark.cernSparkPlugin.registerOnDriver
(default false)
Implemented using cgroup instrumentation of key system resource usage, intended mostly for Spark on Kubernetes
Collects metrics using CGroup stats from /sys/fs
and from /proc
filesystem
for CPU, Memory and Network usage. See also kernel documentation
Note: the metrics are reported for the entire cgroup to which the executor belongs to. This is mostly
intended for Spark running on Kubernetes. In other cases, the metrics reported
may not be easily correlated with executor's activity, as the cgroup metrics may include more
processes, up to the entire system.
Metrics implemented (gauges), with prefix ch.cern.CgroupMetrics
:
CPUTimeNanosec
: reports the CPU time used by the processes in the cgroup.MemoryRss
: number of bytes of anonymous and swap cache memory.MemorySwap
: number of bytes of swap usage.MemoryCache
: number of bytes of page cache memory.NetworkBytesIn
: network traffic inbound.NetworkBytesOut
: network traffic outbound.Example:
bin/spark-shell --master k8s://https://<K8S URL>:6443 --driver-memory 1g \
--num-executors 2 --executor-cores 2 --executor-memory 2g \
--conf spark.kubernetes.container.image=<registry>/spark:v330 \
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3 \
--conf spark.plugins=ch.cern.HDFSMetrics,ch.cern.CgroupMetrics \
--conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink" \
--conf "spark.metrics.conf.*.sink.graphite.host"=mytestinstance \
--conf "spark.metrics.conf.*.sink.graphite.port"=2003 \
--conf "spark.metrics.conf.*.sink.graphite.period"=10 \
--conf "spark.metrics.conf.*.sink.graphite.unit"=seconds \
--conf "spark.metrics.conf.*.sink.graphite.prefix"="youridhere"
Visualize the metrics using the Spark dashboard,
see Spark_Perf_Dashboard_v03_with_SparkPlugins
This Plugin measures HDFS extended statistics. In particular, it provides information on read locality and erasure coding usage (for HDFS 3.x).
Configure with: --conf spark.plugins=ch.cern.HDFSMetrics
Optional configuration: --conf spark.cernSparkPlugin.registerOnDriver
(default true)
Collects extended HDFS metrics using Hadoop's GlobalStorageStatistics implemented using Hadoop extended statistics metrics introduced in Hadoop 2.8.
Use this with Spark built with Hadoop 3.2 or higher (it does not work with Spark built with Hadoop 2.7).
Metrics (gauges) implemented have the prefix ch.cern.HDFSMetrics
. List of metrics:
bytesRead
bytesWritten
readOps
writeOps
largeReadOps
bytesReadLocalHost
bytesReadDistanceOfOneOrTwo
bytesReadDistanceOfThreeOrFour
bytesReadDistanceOfFiveOrLarger
bytesReadErasureCoded
Example
bin/spark-shell --master yarn \
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3 \
--conf spark.plugins=ch.cern.HDFSMetrics \
--conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink" \
--conf "spark.metrics.conf.*.sink.graphite.host"=mytestinstance \
--conf "spark.metrics.conf.*.sink.graphite.port"=2003 \
--conf "spark.metrics.conf.*.sink.graphite.period"=10 \
--conf "spark.metrics.conf.*.sink.graphite.unit"=seconds \
--conf "spark.metrics.conf.*.sink.graphite.prefix"="youridhere"
Visualize the metrics using the Spark dashboard,
see Spark_Perf_Dashboard_v03_with_SparkPlugins
This Plugin provides I/O statistics for Cloud Filesystem metrics (for s3a, gs, wasbs, oci, root, and any other storage system exposed as a Hadoop Compatible Filesystem).
Configure with:
--conf spark.plugins=ch.cern.CloudFSMetrics
--conf spark.cernSparkPlugin.cloudFsName=<name of the filesystem>
(example: "s3a", "gs", "wasbs", "root", "oci", etc.)
Optional configuration: --conf spark.cernSparkPlugin.registerOnDriver
(default true)
Collects I/O metrics for Hadoop-compatible filesystems using Hadoop's GlobalStorageStatistics API.
--conf spark.executor.metrics.fileSystemSchemes=<filesystems to measure>
(default: file,hdfs
)
however in Spark (up to 3.1) this is done using Hadoop Filesystem getAllStatistics, deprecated in recent versions of Hadoop.Metrics (gauges) implemented have the prefix ch.cern.S3AMetricsGSS
. List of metrics:
bytesRead
bytesWritten
readOps
writeOps
Example:
bin/spark-shell --master k8s://https://<K8S URL>:6443 --driver-memory 1g \
--num-executors 2 --executor-cores 2 --executor-memory 2g \
--conf spark.kubernetes.container.image=<registry>/spark:v311 \
--packages org.apache.hadoop:hadoop-aws:3.3.2,ch.cern.sparkmeasure:spark-plugins_2.12:0.3 \
--conf spark.plugins=ch.cern.CloudFSMetrics,ch.cern.CgroupMetrics \
--conf spark.cernSparkPlugin.cloudFsName="s3a" \
--conf spark.hadoop.fs.s3a.secret.key="<SECRET KEY HERE>" \
--conf spark.hadoop.fs.s3a.access.key="<ACCESS KEY HERE>" \
--conf spark.hadoop.fs.s3a.endpoint="https://<S3A URL HERE>" \
--conf spark.hadoop.fs.s3a.impl="org.apache.hadoop.fs.s3a.S3AFileSystem" \
--conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink" \
--conf "spark.metrics.conf.*.sink.graphite.host"=mytestinstance \
--conf "spark.metrics.conf.*.sink.graphite.port"=2003 \
--conf "spark.metrics.conf.*.sink.graphite.period"=10 \
--conf "spark.metrics.conf.*.sink.graphite.unit"=seconds \
--conf "spark.metrics.conf.*.sink.graphite.prefix"="youridhere"
Visualize the metrics using the Spark dashboard,
see Spark_Perf_Dashboard_v03_with_SparkPlugins
This Plugin provides I/O statistics for Cloud Filesystem metrics (for s3a, gs, wasbs, oci, root, and any other storage system exposed as a Hadoop Compatible Filesystem). Use this for Spark built using Hadoop 2.7.
--conf spark.plugins=ch.cern.CloudFSMetrics27
--conf spark.cernSparkPlugin.cloudFsName=<name of the filesystem>
(example: "s3a", "oci", "gs", "root", etc.)--conf spark.cernSparkPlugin.registerOnDriver
(default true) ch.cern.CloudFSMetrics27
.This section details a few experimental Spark plugins used to expose metrics for I/O-time
instrumentation of Spark workloads using Hadoop-compliant filesystems.
These plugins use instrumented experimental/custom versions of the Hadoop client API for HDFS and other Hadoop-Compliant File Systems.
Instruments the Hadoop S3A client.
Note: this requires custom S3A client implementation, see experimental code at: HDFS and S3A custom instrumentation
Spark config:
--conf spark.plugins=ch.cern.experimental.S3ATimeInstrumentation
--jars hadoop-aws-3.2.0.jar
Metrics implemented (gauges), with prefix ch.cern.experimental.S3ATimeInstrumentation
:
S3AReadTimeMuSec
S3ASeekTimeMuSec
S3ACPUTimeDuringReadMuSec
S3ACPUTimeDuringSeekMuSec
S3AReadTimeMinusCPUMuSec
S3ASeekTimeMinusCPUMuSec
S3ABytesRead
S3AGetObjectMetadataMuSec
S3AGetObjectMetadataMinusCPUMuSec
Example:
bin/spark-shell --master k8s://https://<K8S URL>:6443 --driver-memory 1g \
--num-executors 2 --executor-cores 2 --executor-memory 2g \
--conf spark.kubernetes.container.image=<registry>/spark:v311 \
--jars <PATH>/hadoop-aws-3.2.0.jar
--packages com.amazonaws:aws-java-sdk-bundle:1.11.880,ch.cern.sparkmeasure:spark-plugins_2.12:0.1 \
--conf spark.hadoop.fs.s3a.secret.key="<SECRET KEY HERE>" \
--conf spark.hadoop.fs.s3a.access.key="<ACCESS KEY HERE>" \
--conf spark.hadoop.fs.s3a.endpoint="https://<URL HERE>" \
--conf spark.hadoop.fs.s3a.impl="org.apache.hadoop.fs.s3a.S3AFileSystem" \
--conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink" \
--conf "spark.metrics.conf.*.sink.graphite.host"=mytestinstance \
--conf "spark.metrics.conf.*.sink.graphite.port"=2003 \
--conf "spark.metrics.conf.*.sink.graphite.period"=10 \
--conf "spark.metrics.conf.*.sink.graphite.unit"=seconds \
--conf "spark.metrics.conf.*.sink.graphite.prefix"="youridhere"
Visualize the metrics using the Spark dashboard,
see Spark_Perf_Dashboard_v03_with_SparkPlugins_Experimental
Instruments the Hadoop HDFS client.
Note: this requires custom HDFS client implementation, see experimental code at: HDFS and S3A custom instrumentation
Spark config:
--conf spark.plugins=ch.cern.experimental.HDFSTimeInstrumentation
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.1
$SPARK_HOME/jars/hadoop-hdfs-client-3.2.0.jar
with the jar built from this forkMetrics implemented (gauges), with prefix ch.cern.experimental.HDFSTimeInstrumentation
:
HDFSReadTimeMuSec
HDFSCPUTimeDuringReadMuSec
HDFSReadTimeMinusCPUMuSec
HDFSBytesRead
HDFSReadCalls
Example:
bin/spark-shell --master yarn --num-executors 2 --executor-cores 2 \
--jars <PATH>/sparkplugins_2.12-0.1.jar \
--conf spark.plugins=ch.cern.experimental.HDFSTimeInstrumentation
...NOTE: ADD here spark.metrics.conf parameters or configure metrics.conf>
Visualize the metrics with the Spark dashboard spark_perf_dashboard_spark3-0_v02_with_sparkplugins_experimental
Hadoop-XRootD Time Instrumentation
--conf spark.plugins=ch.cern.experimental.ROOTTimeInstrumentation \
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.1 \
--conf spark.driver.extraClassPath=<path>/hadoop-xrootd-1.0.5.jar \
--conf spark.executor.extraClassPath=<path>/hadoop-xrootd-1.0.5.jar \
--files <path_to_krbtgt>#krbcache \
--conf spark.executorEnv.KRB5CCNAME="FILE:krbcache"
ch.cern.experimental.ROOTTimeInstrumentation
:
ROOTBytesRead
ROOTReadOps
ROOTReadTimeMuSec
Visualize the metrics using the Spark dashboard,
see Spark_Perf_Dashboard_v03_with_SparkPlugins_Experimental
--conf spark.plugins=ch.cern.experimental.OCITimeInstrumentation
oci-hdfs-connector-2.9.2.6.jar
built from this fork
into $SPARK_HOME/jars
copy and install the relevant [oracle/oci-java-sdk release jars](oci-java-sdk release jars):
version 1.17.5: oci-java-sdk-full-1.17.5.jar and related third party jars.
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.1 \ # Note for K8S rather add this to the container \
--conf spark.hadoop.fs.oci.client.auth.pemfilepath="<PATH>/oci_api_key.pem" \
--conf spark.hadoop.fs.oci.client.auth.tenantId=ocid1.tenancy.oc1..TENNANT_KEY \
--conf spark.hadoop.fs.oci.client.auth.userId=ocid1.user.oc1.USER_KEY \
--conf spark.hadoop.fs.oci.client.auth.fingerprint=<fingerprint_here> \
--conf spark.hadoop.fs.oci.client.hostname="https://objectstorage.REGION_NAME.oraclecloud.com" \
ch.cern.experimental.OCITimeInstrumentation
:
OCIReadTimeMuSec
OCISeekTimeMuSec
OCICPUTimeDuringReadMuSec
OCICPUTimeDuringSeekMuSec
OCIReadTimeMinusCPUMuSec
OCISeekTimeMinusCPUMuSec
OCIBytesRead