diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala index 2ab49eae8cd8..e48eefda6ed4 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala @@ -59,71 +59,116 @@ private[spark] class PrometheusServlet( def getMetricsSnapshot(): String = { import scala.jdk.CollectionConverters._ - val gaugesLabel = """{type="gauges"}""" - val countersLabel = """{type="counters"}""" - val metersLabel = countersLabel - val histogramslabels = """{type="histograms"}""" - val timersLabels = """{type="timers"}""" + val PERCENTILE_P50 = "0.5" + val PERCENTILE_P75 = "0.75" + val PERCENTILE_P95 = "0.95" + val PERCENTILE_P98 = "0.98" + val PERCENTILE_P99 = "0.99" + val PERCENTILE_P999 = "0.999" val sb = new StringBuilder() registry.getGauges.asScala.foreach { case (k, v) => - if (!v.getValue.isInstanceOf[String]) { - sb.append(s"${normalizeKey(k)}Number$gaugesLabel ${v.getValue}\n") - sb.append(s"${normalizeKey(k)}Value$gaugesLabel ${v.getValue}\n") + v.getValue match { + case n: Number => + sb.append(s"# HELP ${normalizeKey(k)} Gauge metric\n") + sb.append(s"# TYPE ${normalizeKey(k)} gauge\n") + sb.append(s"${normalizeKey(k)} ${n.doubleValue()}\n") + case _ => // non-numeric gauges } } registry.getCounters.asScala.foreach { case (k, v) => - sb.append(s"${normalizeKey(k)}Count$countersLabel ${v.getCount}\n") + val name = s"${normalizeKey(k)}_total" + sb.append(s"# HELP ${name} Counter metric\n") + sb.append(s"# TYPE ${name} counter\n") + sb.append(s"${name} ${v.getCount}\n") } registry.getHistograms.asScala.foreach { case (k, h) => val snapshot = h.getSnapshot + val values = snapshot.getValues.map(_.toDouble) val prefix = normalizeKey(k) - sb.append(s"${prefix}Count$histogramslabels ${h.getCount}\n") - sb.append(s"${prefix}Max$histogramslabels ${snapshot.getMax}\n") - sb.append(s"${prefix}Mean$histogramslabels ${snapshot.getMean}\n") - sb.append(s"${prefix}Min$histogramslabels ${snapshot.getMin}\n") - sb.append(s"${prefix}50thPercentile$histogramslabels ${snapshot.getMedian}\n") - sb.append(s"${prefix}75thPercentile$histogramslabels ${snapshot.get75thPercentile}\n") - sb.append(s"${prefix}95thPercentile$histogramslabels ${snapshot.get95thPercentile}\n") - sb.append(s"${prefix}98thPercentile$histogramslabels ${snapshot.get98thPercentile}\n") - sb.append(s"${prefix}99thPercentile$histogramslabels ${snapshot.get99thPercentile}\n") - sb.append(s"${prefix}999thPercentile$histogramslabels ${snapshot.get999thPercentile}\n") - sb.append(s"${prefix}StdDev$histogramslabels ${snapshot.getStdDev}\n") + sb.append(s"# HELP ${prefix} Histogram metric\n") + sb.append(s"# TYPE ${prefix} summary\n") + sb.append(s"${prefix}{quantile=\"${PERCENTILE_P50}\"} ${snapshot.getMedian}\n") + sb.append(s"${prefix}{quantile=\"${PERCENTILE_P75}\"} ${snapshot.get75thPercentile}\n") + sb.append(s"${prefix}{quantile=\"${PERCENTILE_P95}\"} ${snapshot.get95thPercentile}\n") + sb.append(s"${prefix}{quantile=\"${PERCENTILE_P98}\"} ${snapshot.get98thPercentile}\n") + sb.append(s"${prefix}{quantile=\"${PERCENTILE_P99}\"} ${snapshot.get99thPercentile}\n") + sb.append(s"${prefix}{quantile=\"${PERCENTILE_P999}\"} ${snapshot.get999thPercentile}\n") + sb.append(s"${prefix}_count ${h.getCount}\n") + sb.append(s"${prefix}_sum ${values.sum}\n") + sb.append(s"# HELP ${prefix}_min Minimum value\n") + sb.append(s"# TYPE ${prefix}_min gauge\n") + sb.append(s"${prefix}_min ${snapshot.getMin}\n") + sb.append(s"# HELP ${prefix}_max Maximal value\n") + sb.append(s"# TYPE ${prefix}_max gauge\n") + sb.append(s"${prefix}_max ${snapshot.getMax}\n") + sb.append(s"# HELP ${prefix}_mean Mean value\n") + sb.append(s"# TYPE ${prefix}_mean gauge\n") + sb.append(s"${prefix}_mean ${snapshot.getMedian}\n") + sb.append(s"# HELP ${prefix}_stddev Standard deviation value\n") + sb.append(s"# TYPE ${prefix}_stddev gauge\n") + sb.append(s"${prefix}_stddev ${snapshot.getStdDev}\n") } registry.getMeters.entrySet.iterator.asScala.foreach { kv => val prefix = normalizeKey(kv.getKey) val meter = kv.getValue - sb.append(s"${prefix}Count$metersLabel ${meter.getCount}\n") - sb.append(s"${prefix}MeanRate$metersLabel ${meter.getMeanRate}\n") - sb.append(s"${prefix}OneMinuteRate$metersLabel ${meter.getOneMinuteRate}\n") - sb.append(s"${prefix}FiveMinuteRate$metersLabel ${meter.getFiveMinuteRate}\n") - sb.append(s"${prefix}FifteenMinuteRate$metersLabel ${meter.getFifteenMinuteRate}\n") + sb.append(s"# HELP ${prefix}_count_cumulative Meter counts metric\n") + sb.append(s"# TYPE ${prefix}_count_cumulative gauge\n") + sb.append(s"${prefix}_count_cumulative ${meter.getCount}\n") + sb.append(s"# HELP ${prefix}_mean_rate total counts metric\n") + sb.append(s"# TYPE ${prefix}_mean_rate gauge\n") + sb.append(s"${prefix}_mean_rate ${meter.getMeanRate}\n") + sb.append(s"# HELP ${prefix}_m1_rate 1-min moving avg metric\n") + sb.append(s"# TYPE ${prefix}_m1_rate gauge\n") + sb.append(s"${prefix}_m1_rate ${meter.getOneMinuteRate}\n") + sb.append(s"# HELP ${prefix}_m5_rate 5-min moving avg metric\n") + sb.append(s"# TYPE ${prefix}_m5_rate gauge\n") + sb.append(s"${prefix}_m5_rate ${meter.getFiveMinuteRate}\n") + sb.append(s"# HELP ${prefix}_m15_rate 15-min moving avg metric\n") + sb.append(s"# TYPE ${prefix}_m15_rate gauge\n") + sb.append(s"${prefix}_m15_rate ${meter.getFifteenMinuteRate}\n") } + registry.getTimers.entrySet.iterator.asScala.foreach { kv => val prefix = normalizeKey(kv.getKey) val timer = kv.getValue val snapshot = timer.getSnapshot - sb.append(s"${prefix}Count$timersLabels ${timer.getCount}\n") - sb.append(s"${prefix}Max$timersLabels ${snapshot.getMax}\n") - sb.append(s"${prefix}Mean$timersLabels ${snapshot.getMean}\n") - sb.append(s"${prefix}Min$timersLabels ${snapshot.getMin}\n") - sb.append(s"${prefix}50thPercentile$timersLabels ${snapshot.getMedian}\n") - sb.append(s"${prefix}75thPercentile$timersLabels ${snapshot.get75thPercentile}\n") - sb.append(s"${prefix}95thPercentile$timersLabels ${snapshot.get95thPercentile}\n") - sb.append(s"${prefix}98thPercentile$timersLabels ${snapshot.get98thPercentile}\n") - sb.append(s"${prefix}99thPercentile$timersLabels ${snapshot.get99thPercentile}\n") - sb.append(s"${prefix}999thPercentile$timersLabels ${snapshot.get999thPercentile}\n") - sb.append(s"${prefix}StdDev$timersLabels ${snapshot.getStdDev}\n") - sb.append(s"${prefix}FifteenMinuteRate$timersLabels ${timer.getFifteenMinuteRate}\n") - sb.append(s"${prefix}FiveMinuteRate$timersLabels ${timer.getFiveMinuteRate}\n") - sb.append(s"${prefix}OneMinuteRate$timersLabels ${timer.getOneMinuteRate}\n") - sb.append(s"${prefix}MeanRate$timersLabels ${timer.getMeanRate}\n") + val NANOS_TO_SECONDS_UNIT = 1e9 + def nanosToSeconds(n: Double): Double = n / NANOS_TO_SECONDS_UNIT + val medianValue = nanosToSeconds(snapshot.getMedian) + val p75Value = nanosToSeconds(snapshot.get75thPercentile) + val p95Value = nanosToSeconds(snapshot.get95thPercentile) + val p98Value = nanosToSeconds(snapshot.get98thPercentile) + val p99Value = nanosToSeconds(snapshot.get99thPercentile) + val p999Value = nanosToSeconds(snapshot.get999thPercentile) + + val durationSecondsName = s"${prefix}_duration_seconds" + sb.append(s"# HELP $durationSecondsName Timer summary metric\n") + sb.append(s"# TYPE $durationSecondsName summary\n") + sb.append(s"${durationSecondsName}{quantile=\"${PERCENTILE_P50}\"} ${medianValue}\n") + sb.append(s"${durationSecondsName}{quantile=\"${PERCENTILE_P75}\"} ${p75Value}\n") + sb.append(s"${durationSecondsName}{quantile=\"${PERCENTILE_P95}\"} ${p95Value}\n") + sb.append(s"${durationSecondsName}{quantile=\"${PERCENTILE_P98}\"} ${p98Value}\n") + sb.append(s"${durationSecondsName}{quantile=\"${PERCENTILE_P99}\"} ${p99Value}\n") + sb.append(s"${durationSecondsName}{quantile=\"${PERCENTILE_P999}\"} ${p999Value}\n") + sb.append(s"${durationSecondsName}_count ${timer.getCount}\n") + sb.append(s"${durationSecondsName}_sum " + + s"${snapshot.getValues.map(_.toDouble / NANOS_TO_SECONDS_UNIT).sum}\n") + sb.append(s"# HELP ${prefix}_m1_rate Timer rate 1-min moving avg metric\n") + sb.append(s"# TYPE ${prefix}_m1_rate gauge\n") + sb.append(s"${prefix}_m1_rate ${timer.getOneMinuteRate}\n") + sb.append(s"# HELP ${prefix}_m5_rate Timer rate 5-min moving avg metric\n") + sb.append(s"# TYPE ${prefix}_m5_rate gauge\n") + sb.append(s"${prefix}_m5_rate ${timer.getFiveMinuteRate}\n") + sb.append(s"# HELP ${prefix}_m15_rate Timer rate 15-min moving avg metric\n") + sb.append(s"# TYPE ${prefix}_m15_rate gauge\n") + sb.append(s"${prefix}_m15_rate ${timer.getFifteenMinuteRate}\n") } sb.toString() } private def normalizeKey(key: String): String = { - s"metrics_${key.replaceAll("[^a-zA-Z0-9]", "_")}_" + s"metrics_${key.replaceAll("[^a-zA-Z0-9]", "_")}" } override def start(): Unit = { } diff --git a/core/src/test/scala/org/apache/spark/metrics/sink/PrometheusServletSuite.scala b/core/src/test/scala/org/apache/spark/metrics/sink/PrometheusServletSuite.scala index c794eccee425..6c94a80f89ae 100644 --- a/core/src/test/scala/org/apache/spark/metrics/sink/PrometheusServletSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/sink/PrometheusServletSuite.scala @@ -19,9 +19,10 @@ package org.apache.spark.metrics.sink import java.util.Properties +import scala.concurrent.duration.MILLISECONDS import scala.jdk.CollectionConverters._ -import com.codahale.metrics.{Counter, Gauge, MetricRegistry} +import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer} import org.scalatest.PrivateMethodTester import org.apache.spark.SparkFunSuite @@ -65,9 +66,95 @@ class PrometheusServletSuite extends SparkFunSuite with PrivateMethodTester { val sink = createPrometheusServlet() val suffix = sink invokePrivate PrivateMethod[String](Symbol("normalizeKey"))(key) assert(suffix == "metrics_local_1592132938718_driver_LiveListenerBus_" + - "listenerProcessingTime_org_apache_spark_HeartbeatReceiver_") + "listenerProcessingTime_org_apache_spark_HeartbeatReceiver") + } + + test("Counter should emit Prometheus counter") { + val sink = createPrometheusServlet() + val counter = new Counter + sink.registry.register("counter1", counter) + counter.inc(42) + + val snapshot = sink.getMetricsSnapshot() + + assert(snapshot.contains("metrics_counter1_total 42")) + assert(snapshot.contains("# TYPE metrics_counter1_total counter")) + } + + test("Gauge should emit Prometheus gauge") { + val sink = createPrometheusServlet() + val gauge = new Gauge[Double] { + override def getValue: Double = 5.123 + } + sink.registry.register("gauge1", gauge) + + val snapshot = sink.getMetricsSnapshot() + assert(snapshot.contains("metrics_gauge1 5.123")) + assert(snapshot.contains("# TYPE metrics_gauge1 gauge")) + validateNumericLinesFormat(snapshot) + } + + test("Timer should emit summary and rates") { + val sink = createPrometheusServlet() + val timer = new Timer + timer.update(500, MILLISECONDS) + timer.update(1500, MILLISECONDS) + sink.registry.register("test.timer", timer) + + val snapshot = sink.getMetricsSnapshot() + + // Summary + assert(snapshot.contains("metrics_test_timer_duration_seconds_count 2")) + assert(snapshot.contains("metrics_test_timer_duration_seconds_sum")) + assert(snapshot.contains("""metrics_test_timer_duration_seconds{quantile="0.5"}""")) + + // Rate + assert(snapshot.contains("metrics_test_timer_m1_rate")) + assert(snapshot.contains("metrics_test_timer_m5_rate")) + assert(snapshot.contains("metrics_test_timer_m15_rate")) + + validateNumericLinesFormat(snapshot) + } + + test("Histogram should emit summary") { + val sink = createPrometheusServlet() + val histogram = sink.registry.histogram("test.hist") + + histogram.update(25) + histogram.update(75) + histogram.update(150) + + val snapshot = sink.getMetricsSnapshot() + + assert(snapshot.contains("metrics_test_hist_count 3")) + assert(snapshot.contains("metrics_test_hist_sum")) + assert(snapshot.contains("""metrics_test_hist{quantile="0.5"}""")) + validateNumericLinesFormat(snapshot) + } + + test("Meter should emit count and rates") { + val sink = createPrometheusServlet() + val meter = sink.registry.meter("test.meter") + meter.mark(5) + + val snapshot = sink.getMetricsSnapshot() + + assert(snapshot.contains("metrics_test_meter_count_cumulative 5")) + assert(snapshot.contains("metrics_test_meter_m1_rate")) + assert(snapshot.contains("metrics_test_meter_m5_rate")) + assert(snapshot.contains("metrics_test_meter_m15_rate")) + validateNumericLinesFormat(snapshot) } private def createPrometheusServlet(): PrometheusServlet = new PrometheusServlet(new Properties, new MetricRegistry) + + private def validateNumericLinesFormat(formattedOutput: String): Unit = { + val numericLinePattern = """^metrics_.*\s+([-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?)$""".r + val lines = formattedOutput.linesIterator.filterNot(_.startsWith("#")).toList + lines.foreach { + case numericLinePattern(_, _) => // valid + case badLine => fail(s"Invalid metric value format: $badLine") + } + } }