Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ public void setStorageExecutionQueueLen(int storageExecutionQueueLen) {
public void incrementMultiChunkLargeValueCount() {

}

@Override
public void incrementKeyNotFoundCount() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,6 @@ public interface ReadResponseStats {
void setStorageExecutionQueueLen(int storageExecutionQueueLen);

void incrementMultiChunkLargeValueCount();

void incrementKeyNotFoundCount();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package com.linkedin.venice.storagenode;

import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.tehuti.MetricsUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import java.util.AbstractMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class KeyNotFoundMetricIntegrationTest {
private VeniceClusterWrapper veniceCluster;
private String storeName;
private AvroGenericStoreClient<String, GenericRecord> client;

@BeforeClass
public void setUp() {
veniceCluster = ServiceFactory.getVeniceCluster(new VeniceClusterCreateOptions.Builder().build());
storeName = Utils.getUniqueString("test-store");
String valueSchemaStr = "{" + " \"type\": \"record\"," + " \"name\": \"User\"," + " \"fields\": ["
+ " {\"name\": \"name\", \"type\": \"string\"}" + " ]" + "}";
veniceCluster.useControllerClient(controllerClient -> {
controllerClient.createNewStore(storeName, "owner", "\"string\"", valueSchemaStr);
controllerClient.updateStore(storeName, new UpdateStoreQueryParams().setReadComputationEnabled(true));
});

Schema valueSchema = Schema.parse(valueSchemaStr);
GenericRecord value1 = new GenericData.Record(valueSchema);
value1.put("name", "value1");

veniceCluster.createVersion(
storeName,
"\"string\"",
valueSchemaStr,
Stream.of(new AbstractMap.SimpleEntry<>("key1", value1)));

client = ClientFactory.getAndStartGenericAvroClient(
ClientConfig.defaultGenericClientConfig(storeName).setVeniceURL(veniceCluster.getRandomRouterURL()));
}

@AfterClass
public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(client);
Utils.closeQuietlyWithErrorLogged(veniceCluster);
}

@Test(singleThreaded = true)
public void testKeyNotFoundMetric() {
// Single Get - Key Found
try {
client.get("key1").get();
} catch (Exception e) {
Assert.fail("Get failed", e);
}

// Single Get - Key Not Found
try {
client.get("key2").get();
} catch (Exception e) {
Assert.fail("Get failed", e);
}

// Multi Get - 1 Key Found, 1 Key Not Found
Set<String> keys = new HashSet<>();
keys.add("key1");
keys.add("key3");
try {
client.batchGet(keys).get();
} catch (Exception e) {
Assert.fail("Batch Get failed", e);
}

// Compute - 1 Key Found, 1 Key Not Found
try {
client.compute().project("name").execute(keys).get();
} catch (Exception e) {
Assert.fail("Compute failed", e);
}

String singleGetMetricName = "." + storeName + "--key_not_found.Rate";
String multiGetMetricName = "." + storeName + "--multiget_key_not_found.Rate";
String computeMetricName = "." + storeName + "--compute_key_not_found.Rate";
String totalSingleGetMetricName = ".total--key_not_found.Rate";
String totalMultiGetMetricName = ".total--multiget_key_not_found.Rate";
String totalComputeMetricName = ".total--compute_key_not_found.Rate";

TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> {
double singleGetNotFoundRate = MetricsUtils.getMax(singleGetMetricName, veniceCluster.getVeniceServers());
double multiGetNotFoundRate = MetricsUtils.getMax(multiGetMetricName, veniceCluster.getVeniceServers());
double computeNotFoundRate = MetricsUtils.getMax(computeMetricName, veniceCluster.getVeniceServers());
double totalSingleGetNotFoundRate =
MetricsUtils.getMax(totalSingleGetMetricName, veniceCluster.getVeniceServers());
double totalMultiGetNotFoundRate = MetricsUtils.getMax(totalMultiGetMetricName, veniceCluster.getVeniceServers());
double totalComputeNotFoundRate = MetricsUtils.getMax(totalComputeMetricName, veniceCluster.getVeniceServers());

Assert.assertTrue(
singleGetNotFoundRate > 0,
"Single Get key_not_found metric should be positive. Metric: " + singleGetMetricName);
Assert.assertTrue(
multiGetNotFoundRate > 0,
"Multi Get key_not_found metric should be positive. Metric: " + multiGetMetricName);
Assert.assertTrue(
computeNotFoundRate > 0,
"Compute key_not_found metric should be positive. Metric: " + computeMetricName);
Assert.assertTrue(
totalSingleGetNotFoundRate > 0,
"Total Single Get key_not_found metric should be positive. Metric: " + totalSingleGetMetricName);
Assert.assertTrue(
totalMultiGetNotFoundRate > 0,
"Total Multi Get key_not_found metric should be positive. Metric: " + totalMultiGetMetricName);
Assert.assertTrue(
totalComputeNotFoundRate > 0,
"Total Compute key_not_found metric should be positive. Metric: " + totalComputeMetricName);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,10 @@ public CompletableFuture<ReadResponse> handleSingleGetRequest(GetRouterRequest r
SingleGetChunkingAdapter.get(storageEngine, request.getPartition(), key, isChunked, response.getStats());
response.setValueRecord(valueRecord);

if (valueRecord == null) {
response.getStats().incrementKeyNotFoundCount();
}

response.getStats().addKeySize(key.length);
response.getStats().setStorageExecutionSubmissionWaitTime(submissionWaitTime);
response.getStats().setStorageExecutionQueueLen(queueLen);
Expand Down Expand Up @@ -587,6 +591,7 @@ record = BatchGetChunkingAdapter.get(
requestContext.isChunked,
response.getStats());
if (record == null) {
response.getStats().incrementKeyNotFoundCount();
if (requestContext.isStreaming) {
// For streaming, we would like to send back non-existing keys since the end-user won't know the status of
// non-existing keys in the response if the response is partial.
Expand Down Expand Up @@ -783,13 +788,16 @@ record = new ComputeResponseRecordV1();

response.addRecord(record);
hits++;
} else if (requestContext.isStreaming) {
// For streaming, we need to send back non-existing keys
record = new ComputeResponseRecordV1();
// Negative key index to indicate non-existing key
record.keyIndex = Math.negateExact(key.getKeyIndex());
record.value = StreamingUtils.EMPTY_BYTE_BUFFER;
response.addRecord(record);
} else {
response.getStats().incrementKeyNotFoundCount();
if (requestContext.isStreaming) {
// For streaming, we need to send back non-existing keys
record = new ComputeResponseRecordV1();
// Negative key index to indicate non-existing key
record.keyIndex = Math.negateExact(key.getKeyIndex());
record.value = StreamingUtils.EMPTY_BYTE_BUFFER;
response.addRecord(record);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public abstract class AbstractReadResponseStats implements ReadResponseStats, Re
private double storageExecutionSubmissionWaitTime = UNINITIALIZED;
private int storageExecutionQueueLen = UNINITIALIZED;
protected int multiChunkLargeValueCount = 0;
private int keyNotFoundCount = 0;

protected abstract int getRecordCount();

Expand Down Expand Up @@ -57,6 +58,15 @@ public void incrementMultiChunkLargeValueCount() {
this.multiChunkLargeValueCount++;
}

@Override
public void incrementKeyNotFoundCount() {
this.keyNotFoundCount++;
}

public int getKeyNotFoundCount() {
return this.keyNotFoundCount;
}

@Override
public void recordMetrics(ServerHttpRequestStats stats) {
consumeDoubleAndBooleanIfAbove(
Expand All @@ -66,6 +76,7 @@ public void recordMetrics(ServerHttpRequestStats stats) {
0);
consumeIntIfAbove(stats::recordMultiChunkLargeValueCount, this.multiChunkLargeValueCount, 0);
consumeIntIfAbove(stats::recordStorageExecutionQueueLen, this.storageExecutionQueueLen, UNINITIALIZED);
consumeIntIfAbove(stats::recordKeyNotFoundCount, this.keyNotFoundCount, 0);

recordUnmergedMetrics(stats);

Expand All @@ -86,6 +97,7 @@ public void merge(ReadResponseStatsRecorder other) {
AbstractReadResponseStats otherStats = (AbstractReadResponseStats) other;
this.databaseLookupLatency += otherStats.databaseLookupLatency;
this.multiChunkLargeValueCount += otherStats.multiChunkLargeValueCount;
this.keyNotFoundCount += otherStats.keyNotFoundCount;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,8 @@ public void recordErrorRequestLatency(double latency) {
public void recordMisroutedStoreVersionRequest() {
totalStats.recordMisroutedStoreVersionRequest();
}

public void recordKeyNotFoundCount(int count) {
totalStats.recordKeyNotFoundCount(count);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class ServerHttpRequestStats extends AbstractVeniceHttpStats {
private final Sensor databaseLookupLatencyForLargeValueSensor;
private final Sensor multiChunkLargeValueCountSensor;
private final Sensor requestKeyCountSensor;
private final Sensor keyNotFoundSensor;
private final Sensor requestSizeInBytesSensor;
private final Sensor storageExecutionHandlerSubmissionWaitTime;
private final Sensor storageExecutionQueueLenSensor;
Expand Down Expand Up @@ -183,6 +184,8 @@ public ServerHttpRequestStats(
} else {
requestKeyCountSensor = null;
}
keyNotFoundSensor =
registerPerStoreAndTotal("key_not_found", totalStats, () -> totalStats.keyNotFoundSensor, new Rate());
requestSizeInBytesSensor = registerPerStoreAndTotal(
"request_size_in_bytes",
totalStats,
Expand Down Expand Up @@ -357,6 +360,10 @@ public void recordRequestKeyCount(int keyCount) {
}
}

public void recordKeyNotFoundCount(int count) {
keyNotFoundSensor.record(count);
}

public void recordRequestSizeInBytes(int requestSizeInBytes) {
requestSizeInBytesSensor.record(requestSizeInBytes);
}
Expand Down
Loading
Loading