diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/GcNotificationMetricCollector.java b/processing/src/main/java/org/apache/druid/java/util/metrics/GcNotificationMetricCollector.java new file mode 100644 index 000000000000..26c6a80d34bd --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/GcNotificationMetricCollector.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.sun.management.GarbageCollectionNotificationInfo; +import com.sun.management.GcInfo; +import org.apache.druid.java.util.common.logger.Logger; + +import javax.management.ListenerNotFoundException; +import javax.management.Notification; +import javax.management.NotificationEmitter; +import javax.management.NotificationListener; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryUsage; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Collects Spectator-style GC metrics via JMX GC notifications. Captures per-event + * pause/concurrent phase durations, allocation rate, promotion rate, and live/max data sizes. + */ +public class GcNotificationMetricCollector implements NotificationListener +{ + private static final Logger log = new Logger(GcNotificationMetricCollector.class); + + private final AtomicLong allocationRateBytes = new AtomicLong(0); + private final AtomicLong promotionRateBytes = new AtomicLong(0); + private final AtomicLong liveDataSizeBytes = new AtomicLong(0); + private final AtomicLong maxDataSizeBytes = new AtomicLong(0); + private final ConcurrentLinkedQueue pauseEvents = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue concurrentPhaseEvents = new ConcurrentLinkedQueue<>(); + + // Tracks young gen size after previous GC for allocation rate calculation + private volatile long youngGenSizeAfterPreviousGc = -1; + + private final List gcBeans; + + public GcNotificationMetricCollector() + { + this(ManagementFactory.getGarbageCollectorMXBeans()); + } + + @VisibleForTesting + GcNotificationMetricCollector(List gcBeans) + { + this.gcBeans = gcBeans; + } + + public void start() + { + for (GarbageCollectorMXBean bean : gcBeans) { + if (bean instanceof NotificationEmitter) { + ((NotificationEmitter) bean).addNotificationListener(this, null, null); + } + } + } + + public void stop() + { + for (GarbageCollectorMXBean bean : gcBeans) { + if (bean instanceof NotificationEmitter) { + try { + ((NotificationEmitter) bean).removeNotificationListener(this); + } + catch (ListenerNotFoundException e) { + log.warn(e, "Failed to remove GC notification listener from [%s]", bean.getName()); + } + } + } + } + + @Override + public void handleNotification(Notification notification, Object handback) + { + if (!GarbageCollectionNotificationInfo.GARBAGE_COLLECTION_NOTIFICATION.equals(notification.getType())) { + return; + } + + GarbageCollectionNotificationInfo info = GarbageCollectionNotificationInfo.from( + (javax.management.openmbean.CompositeData) notification.getUserData() + ); + + processGcEvent(info); + } + + @VisibleForTesting + void processGcEvent(GarbageCollectionNotificationInfo info) + { + GcInfo gcInfo = info.getGcInfo(); + Map beforeUsages = gcInfo.getMemoryUsageBeforeGc(); + Map afterUsages = gcInfo.getMemoryUsageAfterGc(); + + long youngGenBefore = 0; + long youngGenAfter = 0; + long oldGenBefore = 0; + long oldGenAfter = 0; + long oldGenMax = 0; + + for (Map.Entry entry : afterUsages.entrySet()) { + String poolName = entry.getKey(); + MemoryUsage after = entry.getValue(); + MemoryUsage before = beforeUsages.get(poolName); + + if (isYoungGenPool(poolName)) { + youngGenBefore += before != null ? before.getUsed() : 0; + youngGenAfter += after.getUsed(); + } else if (isOldGenPool(poolName)) { + oldGenBefore += before != null ? before.getUsed() : 0; + oldGenAfter += after.getUsed(); + oldGenMax += after.getMax() > 0 ? after.getMax() : 0; + } + } + + // Allocation rate: youngGenBefore (this GC) - youngGenAfter (previous GC) + long prevYoungGenAfter = youngGenSizeAfterPreviousGc; + if (prevYoungGenAfter >= 0) { + long allocated = youngGenBefore - prevYoungGenAfter; + if (allocated > 0) { + allocationRateBytes.addAndGet(allocated); + } + } + youngGenSizeAfterPreviousGc = youngGenAfter; + + // Promotion rate: old gen growth during this GC + long promoted = oldGenAfter - oldGenBefore; + if (promoted > 0) { + promotionRateBytes.addAndGet(promoted); + } + + // Live data size and max data size: updated after major (old gen) GC + if (!isConcurrentPhase(info)) { + boolean isOldGc = isOldGenGc(info); + if (isOldGc) { + liveDataSizeBytes.set(oldGenAfter); + maxDataSizeBytes.set(oldGenMax); + } + } + + // Record pause or concurrent phase event + double durationSeconds = gcInfo.getDuration() / 1000.0; + String gcAction = info.getGcAction(); + String gcCause = info.getGcCause(); + + if (isConcurrentPhase(info)) { + concurrentPhaseEvents.add(new GcPauseEvent(durationSeconds, gcAction, gcCause)); + } else { + pauseEvents.add(new GcPauseEvent(durationSeconds, gcAction, gcCause)); + } + } + + /** + * Atomically drains accumulated metrics. Counters (allocationRate, promotionRate) are + * reset to zero. Gauges (liveDataSize, maxDataSize) retain their values. + */ + public DrainResult drain() + { + long allocRate = allocationRateBytes.getAndSet(0); + long promoRate = promotionRateBytes.getAndSet(0); + long liveData = liveDataSizeBytes.get(); + long maxData = maxDataSizeBytes.get(); + + List pauses = new ArrayList<>(); + GcPauseEvent event; + while ((event = pauseEvents.poll()) != null) { + pauses.add(event); + } + + List concurrentPhases = new ArrayList<>(); + while ((event = concurrentPhaseEvents.poll()) != null) { + concurrentPhases.add(event); + } + + return new DrainResult( + allocRate, + promoRate, + liveData, + maxData, + Collections.unmodifiableList(pauses), + Collections.unmodifiableList(concurrentPhases) + ); + } + + @VisibleForTesting + static boolean isYoungGenPool(String poolName) + { + String lower = poolName.toLowerCase(Locale.ROOT); + return lower.contains("eden") + || lower.endsWith("young gen") + || "shenandoah".equals(lower) + || "zheap".equals(lower); + } + + @VisibleForTesting + static boolean isOldGenPool(String poolName) + { + String lower = poolName.toLowerCase(Locale.ROOT); + return lower.contains("old gen") + || lower.contains("tenured") + || "shenandoah".equals(lower) + || "zheap".equals(lower); + } + + @VisibleForTesting + static boolean isConcurrentPhase(GarbageCollectionNotificationInfo info) + { + String gcCause = info.getGcCause(); + String gcName = info.getGcName(); + return "No GC".equals(gcCause) + || "G1 Concurrent GC".equals(gcName) + || gcName.endsWith("Cycles"); + } + + /** + * Heuristic: old gen GC if cause is NOT "No GC" and the GC action contains "major" + * or the GC name suggests an old generation collector. + */ + private static boolean isOldGenGc(GarbageCollectionNotificationInfo info) + { + String action = info.getGcAction(); + String name = info.getGcName(); + return action.contains("major") + || name.contains("Old") + || name.contains("Tenured") + || name.contains("MarkSweep") + || "ZGC".equals(name) + || "Shenandoah Cycles".equals(name); + } + + /** + * Holds the result of a single drain() call. + */ + public static class DrainResult + { + private final long allocationRateBytes; + private final long promotionRateBytes; + private final long liveDataSizeBytes; + private final long maxDataSizeBytes; + private final List pauseEvents; + private final List concurrentPhaseEvents; + + DrainResult( + long allocationRateBytes, + long promotionRateBytes, + long liveDataSizeBytes, + long maxDataSizeBytes, + List pauseEvents, + List concurrentPhaseEvents + ) + { + this.allocationRateBytes = allocationRateBytes; + this.promotionRateBytes = promotionRateBytes; + this.liveDataSizeBytes = liveDataSizeBytes; + this.maxDataSizeBytes = maxDataSizeBytes; + this.pauseEvents = pauseEvents; + this.concurrentPhaseEvents = concurrentPhaseEvents; + } + + public long getAllocationRateBytes() + { + return allocationRateBytes; + } + + public long getPromotionRateBytes() + { + return promotionRateBytes; + } + + public long getLiveDataSizeBytes() + { + return liveDataSizeBytes; + } + + public long getMaxDataSizeBytes() + { + return maxDataSizeBytes; + } + + public List getPauseEvents() + { + return pauseEvents; + } + + public List getConcurrentPhaseEvents() + { + return concurrentPhaseEvents; + } + } + + /** + * A single GC pause or concurrent phase event with duration and identifying dimensions. + */ + public static class GcPauseEvent + { + private final double durationSeconds; + private final String gcAction; + private final String gcCause; + + public GcPauseEvent(double durationSeconds, String gcAction, String gcCause) + { + this.durationSeconds = durationSeconds; + this.gcAction = gcAction; + this.gcCause = gcCause; + } + + public double getDurationSeconds() + { + return durationSeconds; + } + + public String getGcAction() + { + return gcAction; + } + + public String getGcCause() + { + return gcCause; + } + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmMonitor.java index 08a9ab5bfc8e..da17566e5c18 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmMonitor.java @@ -24,6 +24,8 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.GcNotificationMetricCollector.DrainResult; +import org.apache.druid.java.util.metrics.GcNotificationMetricCollector.GcPauseEvent; import javax.annotation.Nullable; import java.lang.management.BufferPoolMXBean; @@ -46,6 +48,7 @@ public class JvmMonitor extends FeedDefiningMonitor final GcCollectors gcCollectors; @Nullable private final AllocationMetricCollector collector; + private final GcNotificationMetricCollector gcNotificationCollector; public JvmMonitor() { @@ -57,6 +60,30 @@ public JvmMonitor(String feed) super(feed); this.collector = AllocationMetricCollectors.getAllocationMetricCollector(); this.gcCollectors = new GcCollectors(); + this.gcNotificationCollector = new GcNotificationMetricCollector(); + } + + @VisibleForTesting + JvmMonitor(String feed, GcNotificationMetricCollector gcNotificationCollector) + { + super(feed); + this.collector = AllocationMetricCollectors.getAllocationMetricCollector(); + this.gcCollectors = new GcCollectors(); + this.gcNotificationCollector = gcNotificationCollector; + } + + @Override + public void start() + { + super.start(); + gcNotificationCollector.start(); + } + + @Override + public void stop() + { + gcNotificationCollector.stop(); + super.stop(); } @Override @@ -65,6 +92,7 @@ public boolean doMonitor(ServiceEmitter emitter) emitJvmMemMetrics(emitter); emitDirectMemMetrics(emitter); emitGcMetrics(emitter); + emitGcNotificationMetrics(emitter); emitThreadAllocationMetrics(emitter); return true; @@ -139,6 +167,36 @@ private void emitGcMetrics(ServiceEmitter emitter) gcCollectors.emit(emitter); } + private void emitGcNotificationMetrics(ServiceEmitter emitter) + { + DrainResult result = gcNotificationCollector.drain(); + + final ServiceMetricEvent.Builder rateBuilder = builder(); + rateBuilder.setDimension(JVM_VERSION, JAVA_VERSION); + emitter.emit(rateBuilder.setMetric("jvm/gc/allocationRate/bytes", result.getAllocationRateBytes())); + emitter.emit(rateBuilder.setMetric("jvm/gc/promotionRate/bytes", result.getPromotionRateBytes())); + emitter.emit(rateBuilder.setMetric("jvm/gc/liveDataSize/bytes", result.getLiveDataSizeBytes())); + emitter.emit(rateBuilder.setMetric("jvm/gc/maxDataSize/bytes", result.getMaxDataSizeBytes())); + + for (GcPauseEvent event : result.getPauseEvents()) { + final ServiceMetricEvent.Builder pauseBuilder = builder(); + pauseBuilder + .setDimension(JVM_VERSION, JAVA_VERSION) + .setDimension("gcAction", event.getGcAction()) + .setDimension("gcCause", event.getGcCause()); + emitter.emit(pauseBuilder.setMetric("jvm/gc/pause", event.getDurationSeconds())); + } + + for (GcPauseEvent event : result.getConcurrentPhaseEvents()) { + final ServiceMetricEvent.Builder concurrentBuilder = builder(); + concurrentBuilder + .setDimension(JVM_VERSION, JAVA_VERSION) + .setDimension("gcAction", event.getGcAction()) + .setDimension("gcCause", event.getGcCause()); + emitter.emit(concurrentBuilder.setMetric("jvm/gc/concurrentPhaseTime", event.getDurationSeconds())); + } + } + private class GcCollectors { private final List generationCollectors = new ArrayList<>(); @@ -184,8 +242,11 @@ private class GcGenerationCollector private static final String SERIAL_COLLECTOR_NAME = "serial"; private static final String ZGC_COLLECTOR_NAME = "zgc"; private static final String SHENANDOAN_COLLECTOR_NAME = "shenandoah"; + private static final String GC_PAUSE_TYPE_STW = "stw"; + private static final String GC_PAUSE_TYPE_CONCURRENT = "concurrent"; private final String generation; private final String collectorName; + private final String pauseType; private final GarbageCollectorMXBean gcBean; private long lastInvocations = 0; private long lastCpuMillis = 0; @@ -195,9 +256,34 @@ private class GcGenerationCollector Pair gcNamePair = getReadableName(gcBean.getName()); this.generation = gcNamePair.lhs; this.collectorName = gcNamePair.rhs; + this.pauseType = getPauseType(gcBean.getName()); this.gcBean = gcBean; } + private String getPauseType(String name) + { + switch (name) { + case "ConcurrentMarkSweep": + case "ZGC": + case "ZGC Cycles": + case "G1 Concurrent GC": + case "Shenandoah Cycles": + return GC_PAUSE_TYPE_CONCURRENT; + case "ParNew": + case "G1 Young Generation": + case "G1 Old Generation": + case "PS Scavenge": + case "PS MarkSweep": + case "Copy": + case "MarkSweepCompact": + case "ZGC Pauses": + case "Shenandoah Pauses": + return GC_PAUSE_TYPE_STW; + default: + return name; + } + } + private Pair getReadableName(String name) { switch (name) { @@ -212,6 +298,8 @@ private Pair getReadableName(String name) return new Pair<>(GC_YOUNG_GENERATION_NAME, G1_COLLECTOR_NAME); case "G1 Old Generation": return new Pair<>(GC_OLD_GENERATION_NAME, G1_COLLECTOR_NAME); + case "G1 Concurrent GC": + return new Pair<>(GC_OLD_GENERATION_NAME, G1_COLLECTOR_NAME); // Parallel case "PS Scavenge": @@ -227,6 +315,8 @@ private Pair getReadableName(String name) //zgc case "ZGC": + case "ZGC Cycles": + case "ZGC Pauses": return new Pair<>(GC_ZGC_GENERATION_NAME, ZGC_COLLECTOR_NAME); //Shenandoah @@ -247,6 +337,7 @@ void emit(ServiceEmitter emitter) .put("gcGen", new String[]{generation}); dimensionsCopyBuilder.put("gcName", new String[]{collectorName}); + dimensionsCopyBuilder.put("gcPauseType", new String[]{pauseType}); Map dimensionsCopy = dimensionsCopyBuilder.build(); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/GcNotificationMetricCollectorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/GcNotificationMetricCollectorTest.java new file mode 100644 index 000000000000..a0e36ef71ab9 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/GcNotificationMetricCollectorTest.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import com.sun.management.GarbageCollectionNotificationInfo; +import com.sun.management.GcInfo; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.metrics.GcNotificationMetricCollector.DrainResult; +import org.apache.druid.java.util.metrics.GcNotificationMetricCollector.GcPauseEvent; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.management.MemoryUsage; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public class GcNotificationMetricCollectorTest +{ + private GcNotificationMetricCollector collector; + + @Before + public void setUp() + { + collector = new GcNotificationMetricCollector(Collections.emptyList()); + } + + // Pool classification tests + + @Test + public void testIsYoungGenPool() + { + Assert.assertTrue(GcNotificationMetricCollector.isYoungGenPool("PS Eden Space")); + Assert.assertTrue(GcNotificationMetricCollector.isYoungGenPool("G1 Eden Space")); + Assert.assertTrue(GcNotificationMetricCollector.isYoungGenPool("Par Eden Space")); + Assert.assertTrue(GcNotificationMetricCollector.isYoungGenPool("Copy Young Gen")); + Assert.assertTrue(GcNotificationMetricCollector.isYoungGenPool("Shenandoah")); + Assert.assertTrue(GcNotificationMetricCollector.isYoungGenPool("ZHeap")); + + Assert.assertFalse(GcNotificationMetricCollector.isYoungGenPool("PS Old Gen")); + Assert.assertFalse(GcNotificationMetricCollector.isYoungGenPool("G1 Survivor Space")); + Assert.assertFalse(GcNotificationMetricCollector.isYoungGenPool("Metaspace")); + } + + @Test + public void testIsOldGenPool() + { + Assert.assertTrue(GcNotificationMetricCollector.isOldGenPool("PS Old Gen")); + Assert.assertTrue(GcNotificationMetricCollector.isOldGenPool("G1 Old Gen")); + Assert.assertTrue(GcNotificationMetricCollector.isOldGenPool("Tenured Gen")); + Assert.assertTrue(GcNotificationMetricCollector.isOldGenPool("Shenandoah")); + Assert.assertTrue(GcNotificationMetricCollector.isOldGenPool("ZHeap")); + + Assert.assertFalse(GcNotificationMetricCollector.isOldGenPool("PS Eden Space")); + Assert.assertFalse(GcNotificationMetricCollector.isOldGenPool("G1 Survivor Space")); + Assert.assertFalse(GcNotificationMetricCollector.isOldGenPool("Metaspace")); + } + + // Concurrent phase detection tests + + @Test + public void testIsConcurrentPhase() + { + Assert.assertTrue(GcNotificationMetricCollector.isConcurrentPhase( + mockNotificationInfo("G1 Concurrent GC", "No GC", "end of minor GC") + )); + Assert.assertTrue(GcNotificationMetricCollector.isConcurrentPhase( + mockNotificationInfo("ZGC Cycles", "Proactive", "end of major GC") + )); + Assert.assertTrue(GcNotificationMetricCollector.isConcurrentPhase( + mockNotificationInfo("Shenandoah Cycles", "Allocation Failure", "end of minor GC") + )); + Assert.assertTrue(GcNotificationMetricCollector.isConcurrentPhase( + mockNotificationInfo("G1 Young Generation", "No GC", "end of minor GC") + )); + + Assert.assertFalse(GcNotificationMetricCollector.isConcurrentPhase( + mockNotificationInfo("G1 Young Generation", "Allocation Failure", "end of minor GC") + )); + Assert.assertFalse(GcNotificationMetricCollector.isConcurrentPhase( + mockNotificationInfo("PS MarkSweep", "Ergonomics", "end of major GC") + )); + } + + // Simulated notification handling tests + + @Test + public void testAllocationRateCalculation() + { + // First GC: eden 100MB before, 0 after + GarbageCollectionNotificationInfo info1 = createYoungGcInfo( + 100 * 1024 * 1024L, // eden before + 0, // eden after + 50 * 1024 * 1024L, // old before + 50 * 1024 * 1024L, // old after + 200 * 1024 * 1024L, // old max + 50 // duration ms + ); + collector.processGcEvent(info1); + + // No allocation rate on first GC (no previous young gen size) + DrainResult result1 = collector.drain(); + Assert.assertEquals(0, result1.getAllocationRateBytes()); + + // Second GC: eden 80MB before (i.e. 80MB allocated since last GC left eden at 0), 0 after + GarbageCollectionNotificationInfo info2 = createYoungGcInfo( + 80 * 1024 * 1024L, + 0, + 50 * 1024 * 1024L, + 52 * 1024 * 1024L, + 200 * 1024 * 1024L, + 30 + ); + collector.processGcEvent(info2); + + DrainResult result2 = collector.drain(); + Assert.assertEquals(80 * 1024 * 1024L, result2.getAllocationRateBytes()); + } + + @Test + public void testPromotionRate() + { + GarbageCollectionNotificationInfo info = createYoungGcInfo( + 100 * 1024 * 1024L, + 0, + 50 * 1024 * 1024L, // old before + 60 * 1024 * 1024L, // old after (10MB promoted) + 200 * 1024 * 1024L, + 50 + ); + collector.processGcEvent(info); + + DrainResult result = collector.drain(); + Assert.assertEquals(10 * 1024 * 1024L, result.getPromotionRateBytes()); + } + + @Test + public void testNoPromotionWhenOldGenShrinks() + { + GarbageCollectionNotificationInfo info = createYoungGcInfo( + 100 * 1024 * 1024L, + 0, + 60 * 1024 * 1024L, // old before + 50 * 1024 * 1024L, // old after (shrunk, no promotion) + 200 * 1024 * 1024L, + 50 + ); + collector.processGcEvent(info); + + DrainResult result = collector.drain(); + Assert.assertEquals(0, result.getPromotionRateBytes()); + } + + @Test + public void testLiveAndMaxDataSizeUpdatedAfterMajorGc() + { + GarbageCollectionNotificationInfo info = createOldGcInfo( + 0, + 0, + 100 * 1024 * 1024L, + 40 * 1024 * 1024L, // old after = live data size + 512 * 1024 * 1024L, // old max = max data size + 200 + ); + collector.processGcEvent(info); + + DrainResult result = collector.drain(); + Assert.assertEquals(40 * 1024 * 1024L, result.getLiveDataSizeBytes()); + Assert.assertEquals(512 * 1024 * 1024L, result.getMaxDataSizeBytes()); + } + + @Test + public void testLiveDataSizeNotUpdatedAfterYoungGc() + { + GarbageCollectionNotificationInfo info = createYoungGcInfo( + 100 * 1024 * 1024L, + 0, + 50 * 1024 * 1024L, + 55 * 1024 * 1024L, + 200 * 1024 * 1024L, + 30 + ); + collector.processGcEvent(info); + + DrainResult result = collector.drain(); + // live data size not set by young GC + Assert.assertEquals(0, result.getLiveDataSizeBytes()); + } + + @Test + public void testPauseEventsCollected() + { + GarbageCollectionNotificationInfo info = createYoungGcInfo( + 100 * 1024 * 1024L, + 0, + 50 * 1024 * 1024L, + 50 * 1024 * 1024L, + 200 * 1024 * 1024L, + 42 + ); + collector.processGcEvent(info); + + DrainResult result = collector.drain(); + Assert.assertEquals(1, result.getPauseEvents().size()); + Assert.assertEquals(0, result.getConcurrentPhaseEvents().size()); + + GcPauseEvent event = result.getPauseEvents().get(0); + Assert.assertEquals(0.042, event.getDurationSeconds(), 0.0001); + Assert.assertEquals("end of minor GC", event.getGcAction()); + Assert.assertEquals("Allocation Failure", event.getGcCause()); + } + + @Test + public void testConcurrentPhaseEventsCollected() + { + GarbageCollectionNotificationInfo info = createConcurrentGcInfo(150); + collector.processGcEvent(info); + + DrainResult result = collector.drain(); + Assert.assertEquals(0, result.getPauseEvents().size()); + Assert.assertEquals(1, result.getConcurrentPhaseEvents().size()); + + GcPauseEvent event = result.getConcurrentPhaseEvents().get(0); + Assert.assertEquals(0.15, event.getDurationSeconds(), 0.0001); + Assert.assertEquals("end of minor GC", event.getGcAction()); + Assert.assertEquals("No GC", event.getGcCause()); + } + + // Drain behavior tests + + @Test + public void testDrainResetsCountersButNotGauges() + { + // Major GC to set both counters and gauges + GarbageCollectionNotificationInfo info1 = createOldGcInfo( + 50 * 1024 * 1024L, + 0, + 80 * 1024 * 1024L, + 30 * 1024 * 1024L, + 512 * 1024 * 1024L, + 100 + ); + collector.processGcEvent(info1); + + // First drain: should have promotion and live data + DrainResult result1 = collector.drain(); + Assert.assertTrue(result1.getLiveDataSizeBytes() > 0); + Assert.assertTrue(result1.getMaxDataSizeBytes() > 0); + + // Second drain: counters should be zero, gauges should persist + DrainResult result2 = collector.drain(); + Assert.assertEquals(0, result2.getAllocationRateBytes()); + Assert.assertEquals(0, result2.getPromotionRateBytes()); + Assert.assertEquals(result1.getLiveDataSizeBytes(), result2.getLiveDataSizeBytes()); + Assert.assertEquals(result1.getMaxDataSizeBytes(), result2.getMaxDataSizeBytes()); + Assert.assertTrue(result2.getPauseEvents().isEmpty()); + Assert.assertTrue(result2.getConcurrentPhaseEvents().isEmpty()); + } + + // Thread safety test + + @Test(timeout = 30_000L) + public void testConcurrentNotificationsAndDrain() throws InterruptedException + { + int numThreads = 4; + int eventsPerThread = 1000; + ExecutorService executor = Execs.multiThreaded(numThreads, "gc-test-%d"); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numThreads); + + for (int t = 0; t < numThreads; t++) { + executor.submit(() -> { + try { + startLatch.await(); + for (int i = 0; i < eventsPerThread; i++) { + GarbageCollectionNotificationInfo info = createYoungGcInfo( + 1024L, + 0, + 100L, + 110L, + 1024L, + 1 + ); + collector.processGcEvent(info); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + doneLatch.await(10, TimeUnit.SECONDS); + executor.shutdown(); + + // Drain all accumulated events + DrainResult result = collector.drain(); + + // All events should be accounted for (each event promotes 10 bytes) + Assert.assertEquals((long) numThreads * eventsPerThread * 10L, result.getPromotionRateBytes()); + Assert.assertEquals(numThreads * eventsPerThread, result.getPauseEvents().size()); + } + + // Helper methods to create mock GC notification info + + private static GarbageCollectionNotificationInfo mockNotificationInfo( + String gcName, + String gcCause, + String gcAction + ) + { + GarbageCollectionNotificationInfo info = Mockito.mock(GarbageCollectionNotificationInfo.class); + Mockito.when(info.getGcName()).thenReturn(gcName); + Mockito.when(info.getGcCause()).thenReturn(gcCause); + Mockito.when(info.getGcAction()).thenReturn(gcAction); + return info; + } + + private static GarbageCollectionNotificationInfo createYoungGcInfo( + long edenBefore, + long edenAfter, + long oldBefore, + long oldAfter, + long oldMax, + long durationMs + ) + { + return createGcInfo( + "G1 Young Generation", + "Allocation Failure", + "end of minor GC", + edenBefore, + edenAfter, + oldBefore, + oldAfter, + oldMax, + durationMs + ); + } + + private static GarbageCollectionNotificationInfo createOldGcInfo( + long edenBefore, + long edenAfter, + long oldBefore, + long oldAfter, + long oldMax, + long durationMs + ) + { + return createGcInfo( + "G1 Old Generation", + "Ergonomics", + "end of major GC", + edenBefore, + edenAfter, + oldBefore, + oldAfter, + oldMax, + durationMs + ); + } + + private static GarbageCollectionNotificationInfo createConcurrentGcInfo(long durationMs) + { + return createGcInfo( + "G1 Concurrent GC", + "No GC", + "end of minor GC", + 0, 0, + 50 * 1024 * 1024L, + 50 * 1024 * 1024L, + 200 * 1024 * 1024L, + durationMs + ); + } + + private static GarbageCollectionNotificationInfo createGcInfo( + String gcName, + String gcCause, + String gcAction, + long edenBefore, + long edenAfter, + long oldBefore, + long oldAfter, + long oldMax, + long durationMs + ) + { + Map beforeMap = new HashMap<>(); + beforeMap.put("G1 Eden Space", new MemoryUsage(0, edenBefore, edenBefore, edenBefore)); + beforeMap.put("G1 Old Gen", new MemoryUsage(0, oldBefore, oldBefore, oldMax)); + + Map afterMap = new HashMap<>(); + afterMap.put("G1 Eden Space", new MemoryUsage(0, edenAfter, edenAfter, edenAfter)); + afterMap.put("G1 Old Gen", new MemoryUsage(0, oldAfter, oldAfter, oldMax)); + + GcInfo gcInfo = Mockito.mock(GcInfo.class); + Mockito.when(gcInfo.getMemoryUsageBeforeGc()).thenReturn(beforeMap); + Mockito.when(gcInfo.getMemoryUsageAfterGc()).thenReturn(afterMap); + Mockito.when(gcInfo.getDuration()).thenReturn(durationMs); + + GarbageCollectionNotificationInfo info = Mockito.mock(GarbageCollectionNotificationInfo.class); + Mockito.when(info.getGcName()).thenReturn(gcName); + Mockito.when(info.getGcCause()).thenReturn(gcCause); + Mockito.when(info.getGcAction()).thenReturn(gcAction); + Mockito.when(info.getGcInfo()).thenReturn(gcInfo); + + return info; + } +} diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/JvmMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/JvmMonitorTest.java index 24af5db4c56f..f8d2ed9a14b2 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/JvmMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/JvmMonitorTest.java @@ -27,11 +27,17 @@ import org.junit.Assume; import org.junit.Test; +import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public class JvmMonitorTest { + private static final Set VALID_PAUSE_TYPES = Set.of("stw", "concurrent"); + @Test(timeout = 60_000L) public void testGcCounts() throws InterruptedException { @@ -56,6 +62,109 @@ public void testGcCounts() throws InterruptedException } } + @Test + public void testGcNotificationMetricsEmitted() + { + EventCollectingEmitter emitter = new EventCollectingEmitter(); + final ServiceEmitter serviceEmitter = new ServiceEmitter("test", "localhost", emitter); + serviceEmitter.start(); + + final JvmMonitor jvmMonitor = new JvmMonitor(); + jvmMonitor.start(); + jvmMonitor.doMonitor(serviceEmitter); + + Set metricNames = emitter.events.stream() + .map(e -> ((ServiceMetricEvent) e).getMetric()) + .collect(Collectors.toSet()); + + // Rate/gauge metrics should always be emitted (even if zero) + Assert.assertTrue("Expected jvm/gc/allocationRate/bytes", metricNames.contains("jvm/gc/allocationRate/bytes")); + Assert.assertTrue("Expected jvm/gc/promotionRate/bytes", metricNames.contains("jvm/gc/promotionRate/bytes")); + Assert.assertTrue("Expected jvm/gc/liveDataSize/bytes", metricNames.contains("jvm/gc/liveDataSize/bytes")); + Assert.assertTrue("Expected jvm/gc/maxDataSize/bytes", metricNames.contains("jvm/gc/maxDataSize/bytes")); + + // Verify jvmVersion dimension on the new metrics + for (Event e : emitter.events) { + ServiceMetricEvent event = (ServiceMetricEvent) e; + String metric = event.getMetric(); + if (metric.startsWith("jvm/gc/allocationRate") + || metric.startsWith("jvm/gc/promotionRate") + || metric.startsWith("jvm/gc/liveDataSize") + || metric.startsWith("jvm/gc/maxDataSize")) { + Map map = event.toMap(); + Assert.assertNotNull("jvmVersion dimension should be set on " + metric, map.get("jvmVersion")); + } + } + + jvmMonitor.stop(); + } + + @Test(timeout = 60_000L) + public void testPauseMetricsHaveDimensions() throws InterruptedException + { + EventCollectingEmitter emitter = new EventCollectingEmitter(); + final ServiceEmitter serviceEmitter = new ServiceEmitter("test", "localhost", emitter); + serviceEmitter.start(); + + final JvmMonitor jvmMonitor = new JvmMonitor(); + jvmMonitor.start(); + + // Generate garbage to trigger GC events + while (true) { + @SuppressWarnings("unused") + byte[] b = new byte[1024 * 1024 * 50]; + emitter.events.clear(); + jvmMonitor.doMonitor(serviceEmitter); + + boolean hasPauseOrConcurrent = emitter.events.stream() + .map(e -> ((ServiceMetricEvent) e).getMetric()) + .anyMatch(m -> "jvm/gc/pause".equals(m) || "jvm/gc/concurrentPhaseTime".equals(m)); + + if (hasPauseOrConcurrent) { + // Verify dimensions on pause/concurrentPhaseTime events + for (Event e : emitter.events) { + ServiceMetricEvent event = (ServiceMetricEvent) e; + String metric = event.getMetric(); + if ("jvm/gc/pause".equals(metric) || "jvm/gc/concurrentPhaseTime".equals(metric)) { + Map map = event.toMap(); + Assert.assertNotNull("gcAction should be set on " + metric, map.get("gcAction")); + Assert.assertNotNull("gcCause should be set on " + metric, map.get("gcCause")); + Assert.assertNotNull("jvmVersion should be set on " + metric, map.get("jvmVersion")); + } + } + jvmMonitor.stop(); + return; + } + Thread.sleep(10); + } + } + + private static class EventCollectingEmitter implements Emitter + { + final List events = new ArrayList<>(); + + @Override + public void start() + { + } + + @Override + public void emit(Event event) + { + events.add(event); + } + + @Override + public void flush() + { + } + + @Override + public void close() + { + } + } + private static class GcTrackingEmitter implements Emitter { private Number oldGcCount; @@ -86,6 +195,14 @@ public void emit(Event e) gcGen = ((List) event.toMap().get("gcGen")).get(0).toString(); } + if (event.toMap().get("gcPauseType") != null) { + String pauseType = ((List) event.toMap().get("gcPauseType")).get(0).toString(); + Assert.assertTrue( + "expected gcPauseType to be 'stw' or 'concurrent', got: " + pauseType, + VALID_PAUSE_TYPES.contains(pauseType) + ); + } + switch (event.getMetric() + "/" + gcGen) { case "jvm/gc/count/old": oldGcCount = event.getValue();