LucaCanali / sparkMeasure

This is the development repository for sparkMeasure, a tool and library designed for efficient analysis and troubleshooting of Apache Spark jobs. It focuses on easing the collection and examination of Spark metrics, making it a practical choice for both developers and data engineers.
Apache License 2.0
690 stars 144 forks source link

The approach is to deliver a Scala Map rather than a JSON object. #26

Closed dwurry closed 4 years ago

dwurry commented 4 years ago

There are two additions with the pull request: 1) The aggregateStageMetrics query has been changed to have a series of "as" column names added. This has the effect of changing the standard.out report to have the value instead of the sum(value). The user sees this: stageDuration => 458 (0.5 s) instead of the full query they currently see through 0.15: sum(stageDuration) => 476 (0.5 s) This change was made specifically so that data formatted output could have a concise key values without special characters like this:

"message" : {...
    "stageDuration" : "458",
     ...}

rather than this:

"message" : {...
    "sum(stageDuration)" : "458",
     ...}

2) There is a new reportMap() that returns a Scala Map. The Map allows for JSON, CSV, XML or other format. The user can also modify (add information to the map object).

reportMap() is then called AFTER runAndMeasure with the following use case:

2.1) user does sparkMeasure

val df :DataFrame =
stageMetrics.runAndMeasure(fillDataFrame(spark,format,
connectionProperties, query))

2.2) user calls reportMap() to get the map stageMetrics.reportMap() 2.3) User is able to manipulate the map:

    map.put("id", "2")
    map.put("event", "queryStructure")
    map.put("queryName", name)

2.4) User converts map to desired output: val msg = new org.apache.logging.log4j.message.ObjectMessage(map)

codecov-io commented 4 years ago

Codecov Report

Merging #26 into master will decrease coverage by 0.42%. The diff coverage is 0%.

Impacted file tree graph

@@           Coverage Diff           @@
##           master   #26      +/-   ##
=======================================
- Coverage   12.42%   12%   -0.43%     
=======================================
  Files           7     7              
  Lines         507   525      +18     
  Branches       29    36       +7     
=======================================
  Hits           63    63              
- Misses        444   462      +18
Impacted Files Coverage Δ
...main/scala/ch/cern/sparkmeasure/stagemetrics.scala 0% <0%> (ø) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update ab7a1b7...fda70c9. Read the comment docs.

dwurry commented 4 years ago

Please note while considering approval: This code changes Standard.out! Note the lack of Query terms and special characters below:

From this:

Scheduling mode = FIFO Spark Context default degree of parallelism = 1 Aggregated Spark stage metrics:numStages => 1sum(numTasks) => 1 elapsedTime => 476 (0.5 s) sum(stageDuration) => 476 (0.5 s) sum(executorRunTime) => 445 (0.4 s) sum(executorCpuTime) => 395 (0.4 s) sum(executorDeserializeTime) => 9 (9 ms) sum(executorDeserializeCpuTime) => 8 (8 ms) sum(resultSerializationTime) => 6 (6 ms) sum(jvmGCTime) => 0 (0 ms) sum(shuffleFetchWaitTime) => 0 (0 ms) sum(shuffleWriteTime) => 0 (0 ms) max(resultSize) => 2237 (2.0 KB) sum(numUpdatedBlockStatuses) => 0 sum(diskBytesSpilled) => 0 (0 Bytes) sum(memoryBytesSpilled) => 0 (0 Bytes) max(peakExecutionMemory) => 0 sum(recordsRead) => 20000 sum(bytesRead) => 3174055 (3.0 MB) sum(recordsWritten) => 0 sum(bytesWritten) => 0 (0 Bytes) sum(shuffleTotalBytesRead) => 0 (0 Bytes) sum(shuffleTotalBlocksFetched) => 0 sum(shuffleLocalBlocksFetched) => 0 sum(shuffleRemoteBlocksFetched) => 0 sum(shuffleBytesWritten) => 0 (0 Bytes) sum(shuffleRecordsWritten) => 0

TO this:

Scheduling mode = FIFO Spark Context default degree of parallelism = 1 Aggregated Spark stage metrics: numStages => 1 numTasks => 1 elapsedTime => 458 (0.5 s) stageDuration => 458 (0.5 s) executorRunTime => 435 (0.4 s) executorCpuTime => 392 (0.4 s) executorDeserializeTime => 9 (9 ms) executorDeserializeCpuTime => 7 (7 ms) resultSerializationTime => 3 (3 ms) jvmGCTime => 0 (0 ms) shuffleFetchWaitTime => 0 (0 ms) shuffleWriteTime => 0 (0 ms) resultSize => 2237 (2.0 KB) numUpdatedBlockStatuses => 0 diskBytesSpilled => 0 (0 Bytes) memoryBytesSpilled => 0 (0 Bytes) peakExecutionMemory => 0 recordsRead => 20000 bytesRead => 3174055 (3.0 MB) recordsWritten => 0 bytesWritten => 0 (0 Bytes) shuffleTotalBytesRead => 0 (0 Bytes) shuffleTotalBlocksFetched => 0 shuffleLocalBlocksFetched => 0 shuffleRemoteBlocksFetched => 0 shuffleBytesWritten => 0 (0 Bytes) shuffleRecordsWritten => 0

LucaCanali commented 4 years ago

Thanks @dwurry for the PR. It looks good to me. Before merging, can you please fix match condition list so to avoid the compilation warning "match may not be exhaustive".

dwurry commented 4 years ago

Can you give me a file and line number to look at?

David Urry 415-306-4982 (cell) dwurry@gmail.com

On Thu, Oct 24, 2019 at 12:37 AM Luca Canali notifications@github.com wrote:

Thanks @dwurry https://github.com/dwurry for the PR. It looks good to me. Before merging, can you please fix match condition list so to avoid the compilation warning "match may not be exhaustive".

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/LucaCanali/sparkMeasure/pull/26?email_source=notifications&email_token=AAJDY3M5MU5WIQRO4XDL6OTQQFGDPA5CNFSM4JBAE5GKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOECEBBUQ#issuecomment-545788114, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAJDY3JGXP5AN72EPM27UDTQQFGDPANCNFSM4JBAE5GA .

LucaCanali commented 4 years ago
.../stagemetrics.scala:182: match may not be exhaustive.
[warn] It would fail on the following input: (_, _)
[warn]       .foreach {
dwurry commented 4 years ago

Fixed in second commit now associated with the pull request. Didn't show up in the Maven build so I got sbt working on Mac, fixed and tested. See stagemetrics.scava:reportMap():185.

LucaCanali commented 4 years ago

Thanks David, mergin to master. Cheers, L.