awslabs / deequ

Deequ is a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets.
Apache License 2.0
3.32k stars 539 forks source link

CustomAggregator #572

Closed joshuazexter closed 4 months ago

joshuazexter commented 4 months ago

This pull request introduces the CustomAggregator, a tool designed for dynamic data aggregation based on user-specified conditions within Apache Spark DataFrames. This addition can preform customized metric calculations and aggregations, making it applicable where conditional data aggregation is required.

Core Features:

How It Can Be Used: To use the CustomAggregator, developers will need to:

Usage Examples: Included in the pull request are unit tests that demonstrate potential use cases:

Content Engagement Metrics:

val analyzer = CustomAggregator(contentEngagementLambda, "ContentEngagement", "AllPlatforms")

val data = session.read.format("csv").option("header", "true").load("path_to_data_file") val state = analyzer.computeStateFrom(data) val metric = analyzer.computeMetricFrom(state)

println("Content Engagement Metrics: " + metric.value.get) // Content Engagement Metrics: Map(Video -> 0.81, Article -> 0.18)

**Resource Utilization in Cloud Services:**

- Scenario Description: An IT administrator needs to monitor and analyze resource utilization across different cloud services to ensure efficient usage and cost management.
- Data: The organization collects usage data for each cloud service, including CPU hours, memory GBs used, and storage GBs used, stored in a DataFrame.
- Analysis Logic: The analyzer is used to aggregate and compute the total and percentage utilization of each resource type across services.
- Implementation Example:

val resourceUtilizationLambda: DataFrame => AggregatedMetricState = df => { val totalResources = df.groupBy("service_type") .agg( ((sum("cpu_hours") + sum("memory_gbs") + sum("storage_gbs")).cast("int") / df.count()).alias("percentageResources") ) .collect() .map(row => row.getString(0) -> row.getDouble(1) ) .toMap val totalSum = totalResources.values.sum AggregatedMetricState(resourceUtilizationLambda, totalSum.toInt) }

val analyzer = CustomAggregator(resourceUtilizationLambda, "ResourceUtilization", "CloudServices")

val data = session.read.format("csv").option("header", "true").load("path_to_usage_data_file") val state = analyzer.computeStateFrom(data) val metric = analyzer.computeMetricFrom(state)

println("Resource Utilization Metrics: " + metric.value.get) // Resource Utilization Metrics: Map(Compute -> 0.51, Database -> 0.27, Storage -> 0.21)



By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
rdsharma26 commented 4 months ago

Can we add a unit test that shows the usage of this analyzer along with other analyzers? See ColumnProfilerRunner and this readme

rdsharma26 commented 4 months ago

Great PR description! Can you also add the output of the println statements ?