banzaicloud / spark-metrics

Spark metrics related custom classes and sinks (e.g. Prometheus)
Apache License 2.0
176 stars 64 forks source link

Custom group key #46

Closed onzo-mateuszzakarczemny closed 4 years ago

onzo-mateuszzakarczemny commented 4 years ago
Q A
Bug fix? no
New feature? yes
API breaks? no
Deprecations? no
Related tickets #39
License Apache 2.0

What's in this PR?

Added new group-key property to support custom push gateway gorup keys. Now labels are added so each metric separately and they are independent to PG group key. Whats more code was refactored to not re-implement custom PushGateway for overriding timestamp. This is a huge refactor but I think now code is cleaner and has better test coverage. Branch based on https://github.com/banzaicloud/spark-metrics/pull/45

Why?

Current implementation of group key contains instance which lead to creating separate metrics group per spark job run. It could lead to OOM errors in push gateway. See #39

onzo-mateuszzakarczemny commented 4 years ago

@stoader I rebased to the latest master and added DeduplicatorDecorator + test case to handle duplicated metrics.

stoader commented 4 years ago

@onzo-mateuszzakarczemny While testing the changes in this PR I noticed the following error in spark-master:

spark-master      | java.lang.IllegalArgumentException: Collector already registered that provides name: HiveExternalCatalog_fileCacheHits                                                                       
spark-master      |     at io.prometheus.client.CollectorRegistry.register(CollectorRegistry.java:54)                                                                                                            
spark-master      |     at com.banzaicloud.spark.metrics.DeduplicatedCollectorRegistry.register(DeduplicatedCollectorRegistry.scala:15)                                                                          
spark-master      |     at io.prometheus.client.Collector.register(Collector.java:139)                                                                                                                           
spark-master      |     at com.banzaicloud.spark.metrics.sink.PrometheusSink.start(PrometheusSink.scala:217)                                                                                                     
spark-master      |     at org.apache.spark.metrics.MetricsSystem$$anonfun$start$3.apply(MetricsSystem.scala:103)                                                                                                
spark-master      |     at org.apache.spark.metrics.MetricsSystem$$anonfun$start$3.apply(MetricsSystem.scala:103)                                                                                                
spark-master      |     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)                                                                                                        
spark-master      |     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)                                                                                                                    
spark-master      |     at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:103)                                                                                                                 
spark-master      |     at org.apache.spark.deploy.master.Master.onStart(Master.scala:162)                                                                                                                       
spark-master      |     at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:122)                                                                                                     
spark-master      |     at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)                                                                                                                          
spark-master      |     at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)                                                                                                                             
spark-master      |     at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)                                                                                                           
spark-master      |     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)                                                                                                       
spark-master      |     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)                                                                                                       
spark-master      |     at java.lang.Thread.run(Thread.java:748)   ```

Apparently, spark-master registers multiple collectors for the same type of metric families. Can we handle this by ignoring duplicates?

On a side note when running sbt clean +test tests are not compiling on Scala 2.11 (only on Scala 2.12)

[error] /spark-metrics/src/test/scala/com/banzaicloud/spark/metrics/sink/PrometheusSinkSuite.scala:106: Symbol 'term org.eclipse' is missing from the classpath.
[error] This symbol is required by 'method org.apache.spark.metrics.MetricsSystem.getServletHandlers'.
[error] Make sure that term eclipse is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'MetricsSystem.class' was compiled against an incompatible version of org.
[error]     case class Request(registry: CollectorRegistry, job: String, groupingKey: util.Map[String, String], method: String)
[error]                ^
[error] one error found
[error] (test:compileIncremental) Compilation failed

Do you get this compile error as well?

onzo-mateuszzakarczemny commented 4 years ago

@stoader I'll investigate that. Thanks

onzo-mateuszzakarczemny commented 4 years ago

@stoader I fixed test compile error with scala 2.11. Seems to be a known issue.

Drewster727 commented 4 years ago

Is this change not going to be implemented in any 2.3 versions of spark? We're stuck on 2.3 for a while...

Drewster727 commented 4 years ago

@stoader Looks like 2.3-2.1.2 is latest from November. If you could could generate a new 2.3-x version that incorporates this change, that'd help! This feature is needed. https://mvnrepository.com/artifact/com.banzaicloud/spark-metrics

stoader commented 4 years ago

@Drewster727 I cherry picked those commits and released as 2.3-3.0.0 for Spark 2.3.

Note that the Sink class was moved to a different package thus please update your metrics properties file to reflect: *.sink.prometheus.class=org.apache.spark.banzaicloud.metrics.sink.PrometheusSink

Drewster727 commented 4 years ago

@stoader thank you! Any way we can get 2.3-3.0.0 on the maven repo? Much appreciated! Update: nevermind, it seems to pull down correctly. I just didn't see that version on the maven repo site.

Drewster727 commented 4 years ago

@stoader did the response code check commit get pushed into 2.3-3.x ? https://github.com/banzaicloud/spark-metrics/pull/41

Seeing this in 2.3-3.0.0

java.io.IOException: Response code from http://<redacted>:9091/metrics/job/spark/role/driver/worker/<redacted>/environment/test/type/hadoop/cluster/abc was 200
    at io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297)
    at io.prometheus.client.exporter.PushGateway.pushAdd(PushGateway.java:171)
    at com.banzaicloud.spark.metrics.sink.PrometheusSink$Reporter.report(PrometheusSink.scala:98)
    at com.codahale.metrics.ScheduledReporter.report(ScheduledReporter.java:162)
    at com.codahale.metrics.ScheduledReporter$1.run(ScheduledReporter.java:117)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Thanks again!

stoader commented 4 years ago

@Drewster727 sorry about that. Can you check version with 2.3-3.0.1