AbsaOSS / spot

Aggregate and analyze Spark history, export to elasticsearch, visualize and monitor with Kibana.
Apache License 2.0
5 stars 0 forks source link

Spot logo

What is Spot?

Spot is a set of tools for monitoring and performance tuning of Spark applications. The main idea is to continuously apply statistical analysis on repeating (production) runs of the same applications. This enables comparison of target metrics (e.g. time, cluster load, cloud cost) between different code versions and configurations. Furthermore, ML models and optimization techniques can be applied to configure new application runs automatically [Future].

One of the primary use cases considered is ETL (Extract Transform Load) in batch mode. Enceladus is an example of one such projects. Such an application runs repeatedly (e.g. thousands of runs per hour) on new data instances which vary greatly in size and processing complexity. For this reason, a uniform setup would not be optimal for the entire spectrum of runs. In contrast, the statistical approach allows for the categorization of cases and an automatic setup of configurations for new runs.

Spot relies on metadata available in Spark History and therefore does not require additional instrumentation of Spark apps. This enables collection of statistics of production runs without compromising their performance.

Spot consists of the following modules:

Module Short description
Crawler The crawler performs collection and initial processing of Spark history data. The output is stored in Elasticsearch and can be visualized with Kibana for monitoring.
Regression (Future) The regression models use the stored data in order to interpolate time VS. config values.
Setter (Future) The Setter module suggests config values for new runs of Spark apps based on the regression model.
Enceladus The Enceladus module provides integration capabilities for Spot usage with Enceladus.
Yarn The module contains its own crawler which provides data collection from YARN API. The data can be visualized with provided Kibana dashboards. (Future) The YARN data is merged with data from other sources (Spark, Enceladus) for a more complete analyses.
Kibana A collection of Kibana dashboards and alerts which provide visualization and monitoring for the Spot data.

A detailed description of each module can be found in section Modules.

The diagram below shows current Spot architecture.

Spot architecture

Monitoring examples

In this section we provide examples of plots and analysis which demonstrate how Spot is applied to monitor and tune the performance of Spark jobs.

Example: Cluster usage over time

Cluster usage This plot shows how many CPU cores were allocated for Spark apps by each user over time. Similar plots can be obtained for memory used by executors, the amount of shuffled data and so on. The series can also be split by application name or other metadata. Kibana time series, used in this example, does not account for the duration of allocation. This is planned to be addressed using custom plots in the future.

Example: Characteristics of a particular Spark application

When an application is running repeatedly, statistics of runs can be used to focus code optimization towards the most critical and common cases. Such statistics can also be used to compare app versions. The next two plots show histograms of run duration in milliseconds (attempt.duration) and size of input data in bytes (attempts.aggs.stages.inputBytes.max). Filters on max values are applied to both plots in order to keep a reasonable scale. Time histogram Size histogram The next figure shows statistics of configurations used in different runs of the same app. Configurations When too many executors are allocated to a relatively small job or partitioning is not working properly (e.g. unsplitable data formats), some of the executors remain idle for the entire run. In such cases the resource allocation can be safely decreased in order to reduce the cluster load. The next histogram illustrates such a case. Zero tasks

Example: Dynamic VS Fixed resource allocation

Dynamic Resource Allocation The plot above shows the relationship between run duration, input size and total CPU core allocation for Enceladus runs on a particular dataset. The left sub-plot corresponds to a fixed resource allocation which was the default. Due to great variation of input size in data pipelines, fixed allocation often leads to either: 1) extended time in case of under-allocation or 2) wasted resources in case of over-allocation. The right sub-plot demonstrates how Dynamic Resource Allocation, set as a new default, solves this issue. Here the number of cores is adjusted based on the input size, and as a result the total job duration stabilizes and efficiency improves.

Example: Small files issue

Small files issue When shuffle operations are present, Spark creates 200 partitions by default regardless of the data size. Excessive fragmentation of small files compromises HDFS performance. The presented plot, produced by Spot, shows how the number of output partitions depends on the input size with old/new configurations. As can be seen in the plot, Adaptive Execution creates a reasonable number of partitions proportional to data size. Based on such analysis, enabled by Spot, it was set as a new default for Enceladus.

Example: Parallelism

Here we demonstrate application of selected metrics from parellel algorithms theory to Spark execution model.

The diagram below shows the execution timeline of a Spark app. Here, for simplicity of the demonstration, we assume each executor has a single CPU core. The duration of the run on m executors is denoted T(m). The allocation time of each executor is presented by a dotted orange rectangle. Tasks on the executors are shown as green rectangles. The tasks are organized in stages which may overlap. Tasks in each stage are executed in parallel. The parts of the driver program which do not overlap with stages are pictured as red rectangles. In our analysis, we assume these parts make up the sequential part of the program which has a fixed duration. This includes the code which is not parallelizable (on executors): startup and scheduling overheads, Spark's query optimizations, external API calls, custom driver code, etc. In other words, this is the part of the program during which there are no tasks running on the executors. The rest of the run duration corresponds to the parallel part, i.e. when tasks can be executed. Spark parallelism Total allocated core time is the sum of the products of allocation time per executor and the number of cores allocated to that executor, as defined by this formula:

Knowing the duration of the sequential part and the total duration of all of the tasks, we can also estimate the duration of a (hypothetical) run on a single executor. The next plot shows an example of how Spot visualizes the described metrics. Here, the values averaged over multiple runs are shown for two types of Enceladus apps.

Parallelism per job

The efficiency and speedup are estimated using the following formulas:

Please note that in this analysis we focus on parallelism on executors; the possible parallelism of the driver part on multiple driver cores requires a separate investigation.

The next two histograms display the efficiency and speedup of multiple runs for a sample Spark app with different inputs and configurations. Efficiency hist Speedup hist Further analysis of such metrics may include dependencies on particular configuration values.

Modules

Crawler

The Crawler module aggregates Spark history data and stores it in Elasticsearch for further analysis by tools such as Kibana. The Spark History data are merged from several APIs (attempts, executors, stages) into a single raw JSON document.

Information from external services (currently: Menas) is added for supported applications (currently: Enceladus). The raw documents are stored in a separate Elasticsearch collection. Aggregations for each document are stored in a separate collection. The aggregations are performed in the following way: custom aggregations (e.g. min, max, mean, non-zero) are calculated for each value (e.g. completed tasks) across elements of each array in the original raw document (e.g. executors). Custom calculated values are added, e.g. total CPU allocation, estimated efficiency and speedup. Some of the records can be inconsistent due to external services (e.g Spark History Server error) and raise exceptions during processing. Such exceptions are handled and corresponding records are stored in a separate collection along with error messages.

Regression

(Future) The regression models are using the stored data in order to interpolate time VS. config values.

Setter

(Future) The Setter module suggests config values for new runs of Spark apps based on the regression model.

Enceladus

The Enceladus module provides integration capabilities for Spot usage with Enceladus

YARN

The module contains its own crawler which provides data collection from YARN API. The data can be visualized with provided Kibana dashboards. (Future) The YARN data is merged with data from other sources (Spark, Enceladus) for a more complete analyses.|

Kibana

A collection of Kibana dashboards and alerts which provide visualization and monitoring for the Spot data.

Deployment

Multicluster configuration

It is possible to monitor multiple clusters (each with its own Spark History server) with Spot. For this scenario a separate Spot crawler process needs to be running for each Spark History server (and optionally Menas). Each process writes to its own set of indexes within the same elasticsearch instance. If the index names follow the defined pattern (spot_\<raw/agg/err>_\<clustername>\\<id>) the data can be visualized in Kibana using the setup provided in Kibana directory. There the data can be filtered by history_host.keyword if required.

Run Crawler

cd spot/crawler

python3 crawler.py [options]

Option Default Description
--min_end_date None Optional. Minimal completion date of the Spark job in the format YYYY-MM-DDThh:mm:ss. Crawler processes Spark jobs completed after the latest of a) the max completion date among already processed jobs (stored in the database) b) this option. In the first run, when there are niether processed jobs in the database nor this option is specified, the crawler starts with the earliest completed job in Spark History.

This will start the main loop of the crawler. It gets new completed apps, processes and stores them in the database. When all the new apps are processed the crawler sleeps sleep_seconds (see config.ini) before the next iteration. To exit the loop, kill the process.

Import Kibana Demo Dashboard

Kibana directory contains objects which can be imported to Kibana. For example, there is a demo dashboard demonstrating basic statistics of Spark applications.

Configure Alerts

To trigger an alert in Kibana when a critical error occurs in Spot (e.g. Spark History server is in an incorrect state) the example queries can be used.

The Kibana alerts can be configured to use an AWS SNS topic as a destination, which can then be configured to send notifications via email, etc. In addition, an encrypted SNS topic can be used (recommended) which requires additional configuration of an IAM role, as documented in the referenced tutorial. An example of generating an alert message used together with the example query is provided.

YARN integration

Spot can import and visualize monitoring metrics from YARN API. The import is performed in a separate yarn_crawler.py process. This process should be run on a host where it can access YARN API and Elasticserach. It uses the same configuration config.ini as the main crawler.py process, where some of the configurations are shared and more are added for YARN specifically. The relevant parameters are:

Kibana directory contains dashboards which visualize the data collected from YARN. Description of available metrics can be found in YARN documentation.

It is planned to enrich Spark jobs metadata with the YARN metadata in future. For instance it would add exact details which are not available from Spark History alone, e.g. vCoresSeconds and memorySeconds.