Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,71 +59,116 @@ private[spark] class PrometheusServlet(
def getMetricsSnapshot(): String = {
import scala.jdk.CollectionConverters._

val gaugesLabel = """{type="gauges"}"""
val countersLabel = """{type="counters"}"""
val metersLabel = countersLabel
val histogramslabels = """{type="histograms"}"""
val timersLabels = """{type="timers"}"""
val PERCENTILE_P50 = "0.5"
val PERCENTILE_P75 = "0.75"
val PERCENTILE_P95 = "0.95"
val PERCENTILE_P98 = "0.98"
val PERCENTILE_P99 = "0.99"
val PERCENTILE_P999 = "0.999"

val sb = new StringBuilder()
registry.getGauges.asScala.foreach { case (k, v) =>
if (!v.getValue.isInstanceOf[String]) {
sb.append(s"${normalizeKey(k)}Number$gaugesLabel ${v.getValue}\n")
sb.append(s"${normalizeKey(k)}Value$gaugesLabel ${v.getValue}\n")
v.getValue match {
case n: Number =>
sb.append(s"# HELP ${normalizeKey(k)} Gauge metric\n")
sb.append(s"# TYPE ${normalizeKey(k)} gauge\n")
sb.append(s"${normalizeKey(k)} ${n.doubleValue()}\n")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for pointing this out!

I was considering that even if this is a Number - it may not be correctly formatted with simple getValue if we are dealing something complex, like BigDecimal, or AtomicLong. etc. By doing a doubleValue we avoid the possible toString gives us a string instead of number.

spark-kubernetes-operator is relatively new and we are sure there's no such gauges - but IMO we need to be taking this into consideration as well. I'll fix that.

case _ => // non-numeric gauges
}
}
registry.getCounters.asScala.foreach { case (k, v) =>
sb.append(s"${normalizeKey(k)}Count$countersLabel ${v.getCount}\n")
val name = s"${normalizeKey(k)}_total"
sb.append(s"# HELP ${name} Counter metric\n")
sb.append(s"# TYPE ${name} counter\n")
sb.append(s"${name} ${v.getCount}\n")
}
registry.getHistograms.asScala.foreach { case (k, h) =>
val snapshot = h.getSnapshot
val values = snapshot.getValues.map(_.toDouble)
val prefix = normalizeKey(k)
sb.append(s"${prefix}Count$histogramslabels ${h.getCount}\n")
sb.append(s"${prefix}Max$histogramslabels ${snapshot.getMax}\n")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it cause any issues if we omit max, min and mean values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the catch!

Though openmetrics consider these as optional, we shall not be ruling them out as they were present currently. I'll add them

sb.append(s"${prefix}Mean$histogramslabels ${snapshot.getMean}\n")
sb.append(s"${prefix}Min$histogramslabels ${snapshot.getMin}\n")
sb.append(s"${prefix}50thPercentile$histogramslabels ${snapshot.getMedian}\n")
sb.append(s"${prefix}75thPercentile$histogramslabels ${snapshot.get75thPercentile}\n")
sb.append(s"${prefix}95thPercentile$histogramslabels ${snapshot.get95thPercentile}\n")
sb.append(s"${prefix}98thPercentile$histogramslabels ${snapshot.get98thPercentile}\n")
sb.append(s"${prefix}99thPercentile$histogramslabels ${snapshot.get99thPercentile}\n")
sb.append(s"${prefix}999thPercentile$histogramslabels ${snapshot.get999thPercentile}\n")
sb.append(s"${prefix}StdDev$histogramslabels ${snapshot.getStdDev}\n")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

sb.append(s"# HELP ${prefix} Histogram metric\n")
sb.append(s"# TYPE ${prefix} summary\n")
sb.append(s"${prefix}{quantile=\"${PERCENTILE_P50}\"} ${snapshot.getMedian}\n")
sb.append(s"${prefix}{quantile=\"${PERCENTILE_P75}\"} ${snapshot.get75thPercentile}\n")
sb.append(s"${prefix}{quantile=\"${PERCENTILE_P95}\"} ${snapshot.get95thPercentile}\n")
sb.append(s"${prefix}{quantile=\"${PERCENTILE_P98}\"} ${snapshot.get98thPercentile}\n")
sb.append(s"${prefix}{quantile=\"${PERCENTILE_P99}\"} ${snapshot.get99thPercentile}\n")
sb.append(s"${prefix}{quantile=\"${PERCENTILE_P999}\"} ${snapshot.get999thPercentile}\n")
sb.append(s"${prefix}_count ${h.getCount}\n")
sb.append(s"${prefix}_sum ${values.sum}\n")
sb.append(s"# HELP ${prefix}_min Minimum value\n")
sb.append(s"# TYPE ${prefix}_min gauge\n")
sb.append(s"${prefix}_min ${snapshot.getMin}\n")
sb.append(s"# HELP ${prefix}_max Maximal value\n")
sb.append(s"# TYPE ${prefix}_max gauge\n")
sb.append(s"${prefix}_max ${snapshot.getMax}\n")
sb.append(s"# HELP ${prefix}_mean Mean value\n")
sb.append(s"# TYPE ${prefix}_mean gauge\n")
sb.append(s"${prefix}_mean ${snapshot.getMedian}\n")
sb.append(s"# HELP ${prefix}_stddev Standard deviation value\n")
sb.append(s"# TYPE ${prefix}_stddev gauge\n")
sb.append(s"${prefix}_stddev ${snapshot.getStdDev}\n")
}
registry.getMeters.entrySet.iterator.asScala.foreach { kv =>
val prefix = normalizeKey(kv.getKey)
val meter = kv.getValue
sb.append(s"${prefix}Count$metersLabel ${meter.getCount}\n")
sb.append(s"${prefix}MeanRate$metersLabel ${meter.getMeanRate}\n")
sb.append(s"${prefix}OneMinuteRate$metersLabel ${meter.getOneMinuteRate}\n")
sb.append(s"${prefix}FiveMinuteRate$metersLabel ${meter.getFiveMinuteRate}\n")
sb.append(s"${prefix}FifteenMinuteRate$metersLabel ${meter.getFifteenMinuteRate}\n")
sb.append(s"# HELP ${prefix}_count_cumulative Meter counts metric\n")
sb.append(s"# TYPE ${prefix}_count_cumulative gauge\n")
sb.append(s"${prefix}_count_cumulative ${meter.getCount}\n")
sb.append(s"# HELP ${prefix}_mean_rate total counts metric\n")
sb.append(s"# TYPE ${prefix}_mean_rate gauge\n")
sb.append(s"${prefix}_mean_rate ${meter.getMeanRate}\n")
sb.append(s"# HELP ${prefix}_m1_rate 1-min moving avg metric\n")
sb.append(s"# TYPE ${prefix}_m1_rate gauge\n")
sb.append(s"${prefix}_m1_rate ${meter.getOneMinuteRate}\n")
sb.append(s"# HELP ${prefix}_m5_rate 5-min moving avg metric\n")
sb.append(s"# TYPE ${prefix}_m5_rate gauge\n")
sb.append(s"${prefix}_m5_rate ${meter.getFiveMinuteRate}\n")
sb.append(s"# HELP ${prefix}_m15_rate 15-min moving avg metric\n")
sb.append(s"# TYPE ${prefix}_m15_rate gauge\n")
sb.append(s"${prefix}_m15_rate ${meter.getFifteenMinuteRate}\n")
}

registry.getTimers.entrySet.iterator.asScala.foreach { kv =>
val prefix = normalizeKey(kv.getKey)
val timer = kv.getValue
val snapshot = timer.getSnapshot
sb.append(s"${prefix}Count$timersLabels ${timer.getCount}\n")
sb.append(s"${prefix}Max$timersLabels ${snapshot.getMax}\n")
sb.append(s"${prefix}Mean$timersLabels ${snapshot.getMean}\n")
sb.append(s"${prefix}Min$timersLabels ${snapshot.getMin}\n")
sb.append(s"${prefix}50thPercentile$timersLabels ${snapshot.getMedian}\n")
sb.append(s"${prefix}75thPercentile$timersLabels ${snapshot.get75thPercentile}\n")
sb.append(s"${prefix}95thPercentile$timersLabels ${snapshot.get95thPercentile}\n")
sb.append(s"${prefix}98thPercentile$timersLabels ${snapshot.get98thPercentile}\n")
sb.append(s"${prefix}99thPercentile$timersLabels ${snapshot.get99thPercentile}\n")
sb.append(s"${prefix}999thPercentile$timersLabels ${snapshot.get999thPercentile}\n")
sb.append(s"${prefix}StdDev$timersLabels ${snapshot.getStdDev}\n")
sb.append(s"${prefix}FifteenMinuteRate$timersLabels ${timer.getFifteenMinuteRate}\n")
sb.append(s"${prefix}FiveMinuteRate$timersLabels ${timer.getFiveMinuteRate}\n")
sb.append(s"${prefix}OneMinuteRate$timersLabels ${timer.getOneMinuteRate}\n")
sb.append(s"${prefix}MeanRate$timersLabels ${timer.getMeanRate}\n")
val NANOS_TO_SECONDS_UNIT = 1e9
def nanosToSeconds(n: Double): Double = n / NANOS_TO_SECONDS_UNIT
val medianValue = nanosToSeconds(snapshot.getMedian)
val p75Value = nanosToSeconds(snapshot.get75thPercentile)
val p95Value = nanosToSeconds(snapshot.get95thPercentile)
val p98Value = nanosToSeconds(snapshot.get98thPercentile)
val p99Value = nanosToSeconds(snapshot.get99thPercentile)
val p999Value = nanosToSeconds(snapshot.get999thPercentile)

val durationSecondsName = s"${prefix}_duration_seconds"
sb.append(s"# HELP $durationSecondsName Timer summary metric\n")
sb.append(s"# TYPE $durationSecondsName summary\n")
sb.append(s"${durationSecondsName}{quantile=\"${PERCENTILE_P50}\"} ${medianValue}\n")
sb.append(s"${durationSecondsName}{quantile=\"${PERCENTILE_P75}\"} ${p75Value}\n")
sb.append(s"${durationSecondsName}{quantile=\"${PERCENTILE_P95}\"} ${p95Value}\n")
sb.append(s"${durationSecondsName}{quantile=\"${PERCENTILE_P98}\"} ${p98Value}\n")
sb.append(s"${durationSecondsName}{quantile=\"${PERCENTILE_P99}\"} ${p99Value}\n")
sb.append(s"${durationSecondsName}{quantile=\"${PERCENTILE_P999}\"} ${p999Value}\n")
sb.append(s"${durationSecondsName}_count ${timer.getCount}\n")
sb.append(s"${durationSecondsName}_sum " +
s"${snapshot.getValues.map(_.toDouble / NANOS_TO_SECONDS_UNIT).sum}\n")
sb.append(s"# HELP ${prefix}_m1_rate Timer rate 1-min moving avg metric\n")
sb.append(s"# TYPE ${prefix}_m1_rate gauge\n")
sb.append(s"${prefix}_m1_rate ${timer.getOneMinuteRate}\n")
sb.append(s"# HELP ${prefix}_m5_rate Timer rate 5-min moving avg metric\n")
sb.append(s"# TYPE ${prefix}_m5_rate gauge\n")
sb.append(s"${prefix}_m5_rate ${timer.getFiveMinuteRate}\n")
sb.append(s"# HELP ${prefix}_m15_rate Timer rate 15-min moving avg metric\n")
sb.append(s"# TYPE ${prefix}_m15_rate gauge\n")
sb.append(s"${prefix}_m15_rate ${timer.getFifteenMinuteRate}\n")
}
sb.toString()
}

private def normalizeKey(key: String): String = {
s"metrics_${key.replaceAll("[^a-zA-Z0-9]", "_")}_"
s"metrics_${key.replaceAll("[^a-zA-Z0-9]", "_")}"
}

override def start(): Unit = { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.spark.metrics.sink

import java.util.Properties

import scala.concurrent.duration.MILLISECONDS
import scala.jdk.CollectionConverters._

import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
import org.scalatest.PrivateMethodTester

import org.apache.spark.SparkFunSuite
Expand Down Expand Up @@ -65,9 +66,95 @@ class PrometheusServletSuite extends SparkFunSuite with PrivateMethodTester {
val sink = createPrometheusServlet()
val suffix = sink invokePrivate PrivateMethod[String](Symbol("normalizeKey"))(key)
assert(suffix == "metrics_local_1592132938718_driver_LiveListenerBus_" +
"listenerProcessingTime_org_apache_spark_HeartbeatReceiver_")
"listenerProcessingTime_org_apache_spark_HeartbeatReceiver")
}

test("Counter should emit Prometheus counter") {
val sink = createPrometheusServlet()
val counter = new Counter
sink.registry.register("counter1", counter)
counter.inc(42)

val snapshot = sink.getMetricsSnapshot()

assert(snapshot.contains("metrics_counter1_total 42"))
assert(snapshot.contains("# TYPE metrics_counter1_total counter"))
}

test("Gauge should emit Prometheus gauge") {
val sink = createPrometheusServlet()
val gauge = new Gauge[Double] {
override def getValue: Double = 5.123
}
sink.registry.register("gauge1", gauge)

val snapshot = sink.getMetricsSnapshot()
assert(snapshot.contains("metrics_gauge1 5.123"))
assert(snapshot.contains("# TYPE metrics_gauge1 gauge"))
validateNumericLinesFormat(snapshot)
}

test("Timer should emit summary and rates") {
val sink = createPrometheusServlet()
val timer = new Timer
timer.update(500, MILLISECONDS)
timer.update(1500, MILLISECONDS)
sink.registry.register("test.timer", timer)

val snapshot = sink.getMetricsSnapshot()

// Summary
assert(snapshot.contains("metrics_test_timer_duration_seconds_count 2"))
assert(snapshot.contains("metrics_test_timer_duration_seconds_sum"))
assert(snapshot.contains("""metrics_test_timer_duration_seconds{quantile="0.5"}"""))

// Rate
assert(snapshot.contains("metrics_test_timer_m1_rate"))
assert(snapshot.contains("metrics_test_timer_m5_rate"))
assert(snapshot.contains("metrics_test_timer_m15_rate"))

validateNumericLinesFormat(snapshot)
}

test("Histogram should emit summary") {
val sink = createPrometheusServlet()
val histogram = sink.registry.histogram("test.hist")

histogram.update(25)
histogram.update(75)
histogram.update(150)

val snapshot = sink.getMetricsSnapshot()

assert(snapshot.contains("metrics_test_hist_count 3"))
assert(snapshot.contains("metrics_test_hist_sum"))
assert(snapshot.contains("""metrics_test_hist{quantile="0.5"}"""))
validateNumericLinesFormat(snapshot)
}

test("Meter should emit count and rates") {
val sink = createPrometheusServlet()
val meter = sink.registry.meter("test.meter")
meter.mark(5)

val snapshot = sink.getMetricsSnapshot()

assert(snapshot.contains("metrics_test_meter_count_cumulative 5"))
assert(snapshot.contains("metrics_test_meter_m1_rate"))
assert(snapshot.contains("metrics_test_meter_m5_rate"))
assert(snapshot.contains("metrics_test_meter_m15_rate"))
validateNumericLinesFormat(snapshot)
}

private def createPrometheusServlet(): PrometheusServlet =
new PrometheusServlet(new Properties, new MetricRegistry)

private def validateNumericLinesFormat(formattedOutput: String): Unit = {
val numericLinePattern = """^metrics_.*\s+([-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?)$""".r
val lines = formattedOutput.linesIterator.filterNot(_.startsWith("#")).toList
lines.foreach {
case numericLinePattern(_, _) => // valid
case badLine => fail(s"Invalid metric value format: $badLine")
}
}
}