Skip to content

Commit

Permalink
Add additional merge metrics (#607)
Browse files Browse the repository at this point in the history
  • Loading branch information
aprudhomme authored Nov 2, 2023
1 parent a71635a commit b43244c
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ private void registerMetrics(GlobalState globalState) {
// register directory size metrics
new DirSizeCollector(globalState).register(collectorRegistry);
new ProcStatCollector().register(collectorRegistry);
new MergeSchedulerCollector(globalState).register(collectorRegistry);
}

/** Main launches the server from the command line. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ protected void preCopyMergedSegmentFiles(
return;
}

NrtMetrics.nrtMergeCopyStartCount.labels(indexName).inc();

long maxMergePreCopyDurationSec = getCurrentMaxMergePreCopyDurationSec();
Deadline deadline;
if (maxMergePreCopyDurationSec > 0) {
Expand Down Expand Up @@ -354,6 +356,7 @@ protected void preCopyMergedSegmentFiles(
NrtMetrics.nrtPrimaryMergeTime
.labels(indexName)
.observe((System.nanoTime() - mergeStartNS) / 1000000.0);
NrtMetrics.nrtMergeCopyEndCount.labels(indexName).inc();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,15 @@ public void waitForGeneration(long gen) throws InterruptedException {
}
}

/**
* Get shard index writer.
*
* @return Index writer, or null if replica
*/
public IndexWriter getWriter() {
return writer;
}

/**
* Constructor.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2022 Yelp Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yelp.nrtsearch.server.monitoring;

import com.yelp.nrtsearch.server.luceneserver.GlobalState;
import io.prometheus.client.Collector;
import io.prometheus.client.GaugeMetricFamily;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergeScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Collector to for metrics from the {@link IndexWriter}'s {@link MergeScheduler}. */
public class MergeSchedulerCollector extends Collector {
private static final Logger logger = LoggerFactory.getLogger(MergeSchedulerCollector.class);
private final GlobalState globalState;

public MergeSchedulerCollector(GlobalState globalState) {
this.globalState = globalState;
}

@Override
public List<MetricFamilySamples> collect() {
List<MetricFamilySamples> mfs = new ArrayList<>();

GaugeMetricFamily indexPendingMergeCount =
new GaugeMetricFamily(
"nrt_pending_merge_count",
"Current number of pending merges",
Collections.singletonList("index"));
mfs.add(indexPendingMergeCount);

GaugeMetricFamily indexMaxMergeThreadCount =
new GaugeMetricFamily(
"nrt_max_merge_thread_count",
"Max running merge threads",
Collections.singletonList("index"));
mfs.add(indexMaxMergeThreadCount);

GaugeMetricFamily indexMaxMergeCount =
new GaugeMetricFamily(
"nrt_max_merge_count",
"Max existing merge threads",
Collections.singletonList("index"));
mfs.add(indexMaxMergeCount);

try {
Set<String> indexNames = globalState.getIndexNames();
for (String indexName : indexNames) {
IndexWriter writer = globalState.getIndex(indexName).getShard(0).getWriter();
List<String> labels = Collections.singletonList(indexName);
if (writer != null) {
MergeScheduler mergeScheduler = writer.getConfig().getMergeScheduler();
if (mergeScheduler instanceof ConcurrentMergeScheduler) {
ConcurrentMergeScheduler concurrentMergeScheduler =
(ConcurrentMergeScheduler) mergeScheduler;
indexPendingMergeCount.addMetric(labels, concurrentMergeScheduler.mergeThreadCount());
indexMaxMergeThreadCount.addMetric(
labels, concurrentMergeScheduler.getMaxThreadCount());
indexMaxMergeCount.addMetric(labels, concurrentMergeScheduler.getMaxMergeCount());
}
}
}
} catch (Exception e) {
logger.warn("Error getting merge scheduler metrics: ", e);
}
return mfs;
}
}
16 changes: 16 additions & 0 deletions src/main/java/com/yelp/nrtsearch/server/monitoring/NrtMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ public class NrtMetrics {
.labelNames("index")
.create();

public static final Counter nrtMergeCopyStartCount =
Counter.build()
.name("nrt_merge_copy_start_count")
.help("Number of merge copies started")
.labelNames("index")
.create();

public static final Counter nrtMergeCopyEndCount =
Counter.build()
.name("nrt_merge_copy_end_count")
.help("Number of merge copies ended")
.labelNames("index")
.create();

/**
* Add all nrt metrics to the collector registry.
*
Expand All @@ -113,5 +127,7 @@ public static void register(CollectorRegistry registry) {
registry.register(nrtMergeFailure);
registry.register(nrtMergeSize);
registry.register(nrtMergeTime);
registry.register(nrtMergeCopyStartCount);
registry.register(nrtMergeCopyEndCount);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2023 Yelp Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yelp.nrtsearch.server.monitoring;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.yelp.nrtsearch.server.luceneserver.GlobalState;
import com.yelp.nrtsearch.server.luceneserver.IndexState;
import com.yelp.nrtsearch.server.luceneserver.ShardState;
import io.prometheus.client.Collector.MetricFamilySamples;
import io.prometheus.client.Collector.MetricFamilySamples.Sample;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class MergeSchedulerCollectorTest {
@Rule public final TemporaryFolder folder = new TemporaryFolder();

@Test
public void testCollectMetrics() throws IOException {
GlobalState mockGlobalState = mock(GlobalState.class);
IndexState mockIndexState = mock(IndexState.class);
ShardState mockShardState = mock(ShardState.class);
IndexWriter mockIndexWriter = mock(IndexWriter.class);
LiveIndexWriterConfig mockLiveConfig = mock(LiveIndexWriterConfig.class);
ConcurrentMergeScheduler mockMergeScheduler = mock(ConcurrentMergeScheduler.class);

when(mockMergeScheduler.mergeThreadCount()).thenReturn(3);
when(mockMergeScheduler.getMaxThreadCount()).thenReturn(4);
when(mockMergeScheduler.getMaxMergeCount()).thenReturn(6);
when(mockLiveConfig.getMergeScheduler()).thenReturn(mockMergeScheduler);
when(mockIndexWriter.getConfig()).thenReturn(mockLiveConfig);
when(mockShardState.getWriter()).thenReturn(mockIndexWriter);
when(mockIndexState.getShard(0)).thenReturn(mockShardState);
when(mockGlobalState.getIndexNames()).thenReturn(Collections.singleton("test_index"));
when(mockGlobalState.getIndex("test_index")).thenReturn(mockIndexState);

MergeSchedulerCollector collector = new MergeSchedulerCollector(mockGlobalState);
List<MetricFamilySamples> metrics = collector.collect();

assertEquals(3, metrics.size());
Map<String, MetricFamilySamples> metricsMap = new HashMap<>();
for (MetricFamilySamples samples : metrics) {
metricsMap.put(samples.name, samples);
}

MetricFamilySamples samples = metricsMap.get("nrt_pending_merge_count");
assertNotNull(samples);
assertEquals(1, samples.samples.size());
Sample sample = samples.samples.get(0);
assertEquals(Collections.singletonList("index"), sample.labelNames);
assertEquals(Collections.singletonList("test_index"), sample.labelValues);
assertEquals(3, sample.value, 0);

samples = metricsMap.get("nrt_max_merge_thread_count");
assertNotNull(samples);
assertEquals(1, samples.samples.size());
sample = samples.samples.get(0);
assertEquals(Collections.singletonList("index"), sample.labelNames);
assertEquals(Collections.singletonList("test_index"), sample.labelValues);
assertEquals(4, sample.value, 0);

samples = metricsMap.get("nrt_max_merge_count");
assertNotNull(samples);
assertEquals(1, samples.samples.size());
sample = samples.samples.get(0);
assertEquals(Collections.singletonList("index"), sample.labelNames);
assertEquals(Collections.singletonList("test_index"), sample.labelValues);
assertEquals(6, sample.value, 0);
}

@Test
public void testNoIndex() {
GlobalState mockGlobalState = mock(GlobalState.class);
when(mockGlobalState.getIndexNames()).thenReturn(Collections.emptySet());
MergeSchedulerCollector collector = new MergeSchedulerCollector(mockGlobalState);
List<MetricFamilySamples> metrics = collector.collect();

assertEquals(3, metrics.size());
for (MetricFamilySamples samples : metrics) {
assertTrue(samples.samples.isEmpty());
}
}
}

0 comments on commit b43244c

Please sign in to comment.