From 874ceec475d520d3955614f56768fa661c1fb562 Mon Sep 17 00:00:00 2001 From: Andrew Prudhomme Date: Thu, 2 Nov 2023 14:30:52 -0700 Subject: [PATCH] Add additional merge metrics --- .../nrtsearch/server/grpc/LuceneServer.java | 1 + .../server/luceneserver/NRTPrimaryNode.java | 3 + .../server/luceneserver/ShardState.java | 9 ++ .../monitoring/MergeSchedulerCollector.java | 87 ++++++++++++++ .../server/monitoring/NrtMetrics.java | 16 +++ .../MergeSchedulerCollectorTest.java | 109 ++++++++++++++++++ 6 files changed, 225 insertions(+) create mode 100644 src/main/java/com/yelp/nrtsearch/server/monitoring/MergeSchedulerCollector.java create mode 100644 src/test/java/com/yelp/nrtsearch/server/monitoring/MergeSchedulerCollectorTest.java diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java index 1ef910608..84d559c1e 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java @@ -234,6 +234,7 @@ private void registerMetrics(GlobalState globalState) { // register directory size metrics new DirSizeCollector(globalState).register(collectorRegistry); new ProcStatCollector().register(collectorRegistry); + new MergeSchedulerCollector(globalState).register(collectorRegistry); } /** Main launches the server from the command line. */ diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/NRTPrimaryNode.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/NRTPrimaryNode.java index e880f5f0f..e0ed5ab3e 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/NRTPrimaryNode.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/NRTPrimaryNode.java @@ -268,6 +268,8 @@ protected void preCopyMergedSegmentFiles( return; } + NrtMetrics.nrtMergeCopyStartCount.labels(indexName).inc(); + long maxMergePreCopyDurationSec = getCurrentMaxMergePreCopyDurationSec(); Deadline deadline; if (maxMergePreCopyDurationSec > 0) { @@ -354,6 +356,7 @@ protected void preCopyMergedSegmentFiles( NrtMetrics.nrtPrimaryMergeTime .labels(indexName) .observe((System.nanoTime() - mergeStartNS) / 1000000.0); + NrtMetrics.nrtMergeCopyEndCount.labels(indexName).inc(); } } diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/ShardState.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/ShardState.java index e67ed70ab..114890ed5 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/ShardState.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/ShardState.java @@ -278,6 +278,15 @@ public void waitForGeneration(long gen) throws InterruptedException { } } + /** + * Get shard index writer. + * + * @return Index writer, or null if replica + */ + public IndexWriter getWriter() { + return writer; + } + /** * Constructor. * diff --git a/src/main/java/com/yelp/nrtsearch/server/monitoring/MergeSchedulerCollector.java b/src/main/java/com/yelp/nrtsearch/server/monitoring/MergeSchedulerCollector.java new file mode 100644 index 000000000..ca317f488 --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/monitoring/MergeSchedulerCollector.java @@ -0,0 +1,87 @@ +/* + * Copyright 2022 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.monitoring; + +import com.yelp.nrtsearch.server.luceneserver.GlobalState; +import io.prometheus.client.Collector; +import io.prometheus.client.GaugeMetricFamily; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import org.apache.lucene.index.ConcurrentMergeScheduler; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.MergeScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Collector to for metrics from the {@link IndexWriter}'s {@link MergeScheduler}. */ +public class MergeSchedulerCollector extends Collector { + private static final Logger logger = LoggerFactory.getLogger(MergeSchedulerCollector.class); + private final GlobalState globalState; + + public MergeSchedulerCollector(GlobalState globalState) { + this.globalState = globalState; + } + + @Override + public List collect() { + List mfs = new ArrayList<>(); + + GaugeMetricFamily indexPendingMergeCount = + new GaugeMetricFamily( + "nrt_pending_merge_count", + "Current number of pending merges", + Collections.singletonList("index")); + mfs.add(indexPendingMergeCount); + + GaugeMetricFamily indexMaxMergeThreadCount = + new GaugeMetricFamily( + "nrt_max_merge_thread_count", + "Max running merge threads", + Collections.singletonList("index")); + mfs.add(indexMaxMergeThreadCount); + + GaugeMetricFamily indexMaxMergeCount = + new GaugeMetricFamily( + "nrt_max_merge_count", + "Max existing merge threads", + Collections.singletonList("index")); + mfs.add(indexMaxMergeCount); + + try { + Set indexNames = globalState.getIndexNames(); + for (String indexName : indexNames) { + IndexWriter writer = globalState.getIndex(indexName).getShard(0).getWriter(); + List labels = Collections.singletonList(indexName); + if (writer != null) { + MergeScheduler mergeScheduler = writer.getConfig().getMergeScheduler(); + if (mergeScheduler instanceof ConcurrentMergeScheduler) { + ConcurrentMergeScheduler concurrentMergeScheduler = + (ConcurrentMergeScheduler) mergeScheduler; + indexPendingMergeCount.addMetric(labels, concurrentMergeScheduler.mergeThreadCount()); + indexMaxMergeThreadCount.addMetric( + labels, concurrentMergeScheduler.getMaxThreadCount()); + indexMaxMergeCount.addMetric(labels, concurrentMergeScheduler.getMaxMergeCount()); + } + } + } + } catch (Exception e) { + logger.warn("Error getting merge scheduler metrics: ", e); + } + return mfs; + } +} diff --git a/src/main/java/com/yelp/nrtsearch/server/monitoring/NrtMetrics.java b/src/main/java/com/yelp/nrtsearch/server/monitoring/NrtMetrics.java index f2ff3822f..9067ab3c1 100644 --- a/src/main/java/com/yelp/nrtsearch/server/monitoring/NrtMetrics.java +++ b/src/main/java/com/yelp/nrtsearch/server/monitoring/NrtMetrics.java @@ -98,6 +98,20 @@ public class NrtMetrics { .labelNames("index") .create(); + public static final Counter nrtMergeCopyStartCount = + Counter.build() + .name("nrt_merge_copy_start_count") + .help("Number of merge copies started") + .labelNames("index") + .create(); + + public static final Counter nrtMergeCopyEndCount = + Counter.build() + .name("nrt_merge_copy_end_count") + .help("Number of merge copies ended") + .labelNames("index") + .create(); + /** * Add all nrt metrics to the collector registry. * @@ -113,5 +127,7 @@ public static void register(CollectorRegistry registry) { registry.register(nrtMergeFailure); registry.register(nrtMergeSize); registry.register(nrtMergeTime); + registry.register(nrtMergeCopyStartCount); + registry.register(nrtMergeCopyEndCount); } } diff --git a/src/test/java/com/yelp/nrtsearch/server/monitoring/MergeSchedulerCollectorTest.java b/src/test/java/com/yelp/nrtsearch/server/monitoring/MergeSchedulerCollectorTest.java new file mode 100644 index 000000000..a1ad5e881 --- /dev/null +++ b/src/test/java/com/yelp/nrtsearch/server/monitoring/MergeSchedulerCollectorTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2023 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.monitoring; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.yelp.nrtsearch.server.luceneserver.GlobalState; +import com.yelp.nrtsearch.server.luceneserver.IndexState; +import com.yelp.nrtsearch.server.luceneserver.ShardState; +import io.prometheus.client.Collector.MetricFamilySamples; +import io.prometheus.client.Collector.MetricFamilySamples.Sample; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.lucene.index.ConcurrentMergeScheduler; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.LiveIndexWriterConfig; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class MergeSchedulerCollectorTest { + @Rule public final TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void testCollectMetrics() throws IOException { + GlobalState mockGlobalState = mock(GlobalState.class); + IndexState mockIndexState = mock(IndexState.class); + ShardState mockShardState = mock(ShardState.class); + IndexWriter mockIndexWriter = mock(IndexWriter.class); + LiveIndexWriterConfig mockLiveConfig = mock(LiveIndexWriterConfig.class); + ConcurrentMergeScheduler mockMergeScheduler = mock(ConcurrentMergeScheduler.class); + + when(mockMergeScheduler.mergeThreadCount()).thenReturn(3); + when(mockMergeScheduler.getMaxThreadCount()).thenReturn(4); + when(mockMergeScheduler.getMaxMergeCount()).thenReturn(6); + when(mockLiveConfig.getMergeScheduler()).thenReturn(mockMergeScheduler); + when(mockIndexWriter.getConfig()).thenReturn(mockLiveConfig); + when(mockShardState.getWriter()).thenReturn(mockIndexWriter); + when(mockIndexState.getShard(0)).thenReturn(mockShardState); + when(mockGlobalState.getIndexNames()).thenReturn(Collections.singleton("test_index")); + when(mockGlobalState.getIndex("test_index")).thenReturn(mockIndexState); + + MergeSchedulerCollector collector = new MergeSchedulerCollector(mockGlobalState); + List metrics = collector.collect(); + + assertEquals(3, metrics.size()); + Map metricsMap = new HashMap<>(); + for (MetricFamilySamples samples : metrics) { + metricsMap.put(samples.name, samples); + } + + MetricFamilySamples samples = metricsMap.get("nrt_pending_merge_count"); + assertNotNull(samples); + assertEquals(1, samples.samples.size()); + Sample sample = samples.samples.get(0); + assertEquals(Collections.singletonList("index"), sample.labelNames); + assertEquals(Collections.singletonList("test_index"), sample.labelValues); + assertEquals(3, sample.value, 0); + + samples = metricsMap.get("nrt_max_merge_thread_count"); + assertNotNull(samples); + assertEquals(1, samples.samples.size()); + sample = samples.samples.get(0); + assertEquals(Collections.singletonList("index"), sample.labelNames); + assertEquals(Collections.singletonList("test_index"), sample.labelValues); + assertEquals(4, sample.value, 0); + + samples = metricsMap.get("nrt_max_merge_count"); + assertNotNull(samples); + assertEquals(1, samples.samples.size()); + sample = samples.samples.get(0); + assertEquals(Collections.singletonList("index"), sample.labelNames); + assertEquals(Collections.singletonList("test_index"), sample.labelValues); + assertEquals(6, sample.value, 0); + } + + @Test + public void testNoIndex() { + GlobalState mockGlobalState = mock(GlobalState.class); + when(mockGlobalState.getIndexNames()).thenReturn(Collections.emptySet()); + MergeSchedulerCollector collector = new MergeSchedulerCollector(mockGlobalState); + List metrics = collector.collect(); + + assertEquals(3, metrics.size()); + for (MetricFamilySamples samples : metrics) { + assertTrue(samples.samples.isEmpty()); + } + } +}