Skip to content

Commit

Permalink
Metrics missing tags fixed (#163)
Browse files Browse the repository at this point in the history
* Micrometer missing/optional tags as NONE by default if missing to avoid metric collision when reporting by micrometer, empty string as prometheus contract for null value
  • Loading branch information
GoodforGod authored Nov 5, 2024
1 parent 9518131 commit e21ddc9
Show file tree
Hide file tree
Showing 24 changed files with 188 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,48 @@

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import ru.tinkoff.kora.cache.telemetry.CacheMetrics;
import ru.tinkoff.kora.cache.telemetry.CacheTelemetryOperation;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public final class MicrometerCacheMetrics implements CacheMetrics {

record Key(String cacheName, String origin, String operationName, String status) {}

record RatioKey(String cacheName, String origin, String type) {}

record OpKey(String cacheName, String origin) {}

private static final String METRIC_CACHE_DURATION = "cache.duration";
private static final String METRIC_CACHE_RATIO = "cache.ratio";
private static final String METRIC_CACHE_HIT = "cache.hit";
private static final String METRIC_CACHE_MISS = "cache.miss";

private static final String TAG_OPERATION = "operation";
private static final String TAG_CACHE_NAME = "cache";
private static final String TAG_ORIGIN = "origin";
private static final String TAG_STATUS = "status";
private static final String TAG_TYPE = "type";

private static final String STATUS_SUCCESS = "success";
private static final String STATUS_FAILED = "failed";

private static final String TYPE_HIT = "hit";
private static final String TYPE_MISS = "miss";

private final ConcurrentHashMap<Key, Timer> durations = new ConcurrentHashMap<>();
private final ConcurrentHashMap<RatioKey, Counter> counters = new ConcurrentHashMap<>();
@Deprecated(forRemoval = true)
private final ConcurrentHashMap<OpKey, Counter> missCounters = new ConcurrentHashMap<>();
@Deprecated(forRemoval = true)
private final ConcurrentHashMap<OpKey, Counter> hitCounters = new ConcurrentHashMap<>();

private final MeterRegistry meterRegistry;

public MicrometerCacheMetrics(MeterRegistry meterRegistry) {
Expand All @@ -34,35 +52,75 @@ public MicrometerCacheMetrics(MeterRegistry meterRegistry) {

@Override
public void recordSuccess(@Nonnull CacheTelemetryOperation operation, long durationInNanos, @Nullable Object valueFromCache) {
final Timer timer = meterRegistry.timer(METRIC_CACHE_DURATION, Tags.of(
TAG_CACHE_NAME, operation.cacheName(),
TAG_OPERATION, operation.name(),
TAG_ORIGIN, operation.origin(),
TAG_STATUS, STATUS_SUCCESS
));
final Key key = new Key(operation.cacheName(), operation.origin(), operation.name(), STATUS_SUCCESS);
final Timer timer = durations.computeIfAbsent(key, k -> {
var builder = Timer.builder(METRIC_CACHE_DURATION)
.tag(TAG_CACHE_NAME, k.cacheName())
.tag(TAG_OPERATION, k.operationName())
.tag(TAG_ORIGIN, k.origin())
.tag(TAG_STATUS, k.status());

return builder.register(meterRegistry);
});

timer.record(durationInNanos, TimeUnit.NANOSECONDS);

if ("GET".startsWith(operation.name())) {
final String metricName = (valueFromCache == null || valueFromCache instanceof Collection<?> vc && !vc.isEmpty())
? METRIC_CACHE_MISS
: METRIC_CACHE_HIT;

final Counter counter = meterRegistry.counter(metricName, Tags.of(
TAG_CACHE_NAME, operation.cacheName(),
TAG_ORIGIN, operation.origin()
));
final String ratioType;
var operationKey = new OpKey(operation.cacheName(), operation.origin());
if (valueFromCache == null || valueFromCache instanceof Collection<?> vc && !vc.isEmpty()) {
ratioType = TYPE_MISS;

var counter = missCounters.computeIfAbsent(operationKey, k -> {
var builder = Counter.builder(METRIC_CACHE_MISS)
.description("!!! DEPRECATED !!! Please use cache.ratio metric")
.tag(TAG_CACHE_NAME, k.cacheName())
.tag(TAG_ORIGIN, k.origin());

return builder.register(meterRegistry);
});
counter.increment();
} else {
ratioType = TYPE_HIT;

var counter = hitCounters.computeIfAbsent(operationKey, k -> {
var builder = Counter.builder(METRIC_CACHE_HIT)
.description("!!! DEPRECATED !!! Please use cache.ratio metric")
.tag(TAG_CACHE_NAME, k.cacheName())
.tag(TAG_ORIGIN, k.origin());

return builder.register(meterRegistry);
});
counter.increment();
}

final RatioKey ratioKey = new RatioKey(operation.cacheName(), operation.origin(), ratioType);
var counter = counters.computeIfAbsent(ratioKey, k -> {
var builder = Counter.builder(METRIC_CACHE_RATIO)
.tag(TAG_CACHE_NAME, k.cacheName())
.tag(TAG_ORIGIN, k.origin())
.tag(TAG_TYPE, ratioType);

return builder.register(meterRegistry);
});

counter.increment();
}
}

@Override
public void recordFailure(@Nonnull CacheTelemetryOperation operation, long durationInNanos, @Nullable Throwable throwable) {
final Timer timer = meterRegistry.timer(METRIC_CACHE_DURATION, Tags.of(
TAG_CACHE_NAME, operation.cacheName(),
TAG_OPERATION, operation.name(),
TAG_ORIGIN, operation.origin(),
TAG_STATUS, STATUS_FAILED
));
final Key key = new Key(operation.cacheName(), operation.origin(), operation.name(), STATUS_FAILED);
final Timer timer = durations.computeIfAbsent(key, k -> {
var builder = Timer.builder(METRIC_CACHE_DURATION)
.tag(TAG_CACHE_NAME, k.cacheName())
.tag(TAG_OPERATION, k.operationName())
.tag(TAG_ORIGIN, k.origin())
.tag(TAG_STATUS, k.status());

return builder.register(meterRegistry);
});

timer.record(durationInNanos, TimeUnit.NANOSECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ private DistributionSummary requestDuration(Key key) {
var list = new ArrayList<Tag>(3);
if (key.errorType() != null) {
list.add(Tag.of(SemanticAttributes.ERROR_TYPE.getKey(), key.errorType().getCanonicalName()));
} else {
list.add(Tag.of(SemanticAttributes.ERROR_TYPE.getKey(), ""));
}

list.add(Tag.of("delegate", key.javaDelegateName()));
list.add(Tag.of("business.key", key.businessKey()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ private DistributionSummary requestDuration(Key key) {
var list = new ArrayList<Tag>(3);
if (key.errorType() != null) {
list.add(Tag.of(SemanticAttributes.ERROR_TYPE.getKey(), key.errorType().getCanonicalName()));
} else {
list.add(Tag.of(SemanticAttributes.ERROR_TYPE.getKey(), ""));
}

list.add(Tag.of("delegate", key.javaDelegateName()));
list.add(Tag.of("business.key", key.businessKey()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ private DistributionSummary requestDuration(DurationKey key) {
var list = new ArrayList<Tag>(3);
if (key.errorType() != null) {
list.add(Tag.of(SemanticAttributes.ERROR_TYPE.getKey(), key.errorType().getCanonicalName()));
} else {
list.add(Tag.of(SemanticAttributes.ERROR_TYPE.getKey(), ""));
}
list.add(Tag.of(SemanticAttributes.HTTP_REQUEST_METHOD.getKey(), key.method()));
list.add(Tag.of(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE.getKey(), Integer.toString(key.statusCode())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import io.opentelemetry.semconv.SemanticAttributes;
import jakarta.annotation.Nullable;
import ru.tinkoff.kora.database.common.QueryContext;
import ru.tinkoff.kora.database.common.telemetry.DataBaseMetricWriter;
Expand Down Expand Up @@ -46,9 +47,13 @@ private DbMetrics metrics(DbKey key) {
.tag("pool", this.poolName)
.tag("query.id", key.queryId())
.tag("query.operation", key.operation());

if (key.error != null) {
builder.tag("error", key.error.getCanonicalName());
} else {
builder.tag("error", "");
}

return new DbMetrics(builder.register(this.meterRegistry));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.opentelemetry.semconv.SemanticAttributes;
import jakarta.annotation.Nullable;
import ru.tinkoff.kora.database.common.QueryContext;
Expand Down Expand Up @@ -47,9 +48,13 @@ private DbMetrics metrics(DbKey key) {
.tag(SemanticAttributes.POOL_NAME.getKey(), this.poolName)
.tag(SemanticAttributes.DB_STATEMENT.getKey(), key.queryId())
.tag(SemanticAttributes.DB_OPERATION.getKey(), key.operation());

if (key.error != null) {
builder.tag(SemanticAttributes.ERROR_TYPE.getKey(), key.error.getCanonicalName());
} else {
builder.tag(SemanticAttributes.ERROR_TYPE.getKey(), "");
}

return new DbMetrics(builder.register(this.meterRegistry));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,16 @@ public <RespT, ReqT> void recordEnd(MethodDescriptor<ReqT, RespT> method, long s
private Metrics buildMetrics(MetricsKey method) {
var tags = tags(method);


var duration = DistributionSummary.builder("rpc.client.duration")
.tags(tags)
.serviceLevelObjectives(this.config.slo(TelemetryConfig.MetricsConfig.OpentelemetrySpec.V120))
.register(this.registry);

var requestsByRpc = DistributionSummary.builder("rpc.client.requests_per_rpc")
.tags(tags)
.serviceLevelObjectives(this.config.slo(TelemetryConfig.MetricsConfig.OpentelemetrySpec.V120))
.register(this.registry);

var responsesByRpc = DistributionSummary.builder("rpc.client.responses_per_rpc")
.tags(tags)
.serviceLevelObjectives(this.config.slo(TelemetryConfig.MetricsConfig.OpentelemetrySpec.V120))
Expand All @@ -81,18 +82,26 @@ private List<Tag> tags(MetricsKey key) {
if (serverPort == -1) {
serverPort = 80;
}

var list = new ArrayList<Tag>(7);
list.add(Tag.of(SemanticAttributes.RPC_METHOD.getKey(), key.bareMethodName()));
list.add(Tag.of(SemanticAttributes.RPC_SERVICE.getKey(), rpcService));
list.add(Tag.of(SemanticAttributes.RPC_SYSTEM.getKey(), SemanticAttributes.RpcSystemValues.GRPC));
list.add(Tag.of(SemanticAttributes.SERVER_ADDRESS.getKey(), serverAddress));
list.add(Tag.of(SemanticAttributes.SERVER_PORT.getKey(), String.valueOf(serverPort)));

if (key.code != null) {
list.add(Tag.of(SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey(), String.valueOf(key.code)));
} else {
list.add(Tag.of(SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey(), ""));
}

if (key.errorType != null) {
list.add(Tag.of(SemanticAttributes.ERROR_TYPE.getKey(), key.errorType.getCanonicalName()));
} else {
list.add(Tag.of(SemanticAttributes.ERROR_TYPE.getKey(), ""));
}

return list;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,17 @@ public <RespT, ReqT> void recordEnd(MethodDescriptor<ReqT, RespT> method, long s
private Metrics buildMetrics(MetricsKey method) {
var tags = tags(method);


var duration = DistributionSummary.builder("rpc.client.duration")
.tags(tags)
.baseUnit("s")
.serviceLevelObjectives(this.config.slo(TelemetryConfig.MetricsConfig.OpentelemetrySpec.V123))
.register(this.registry);

var requestsByRpc = DistributionSummary.builder("rpc.client.requests_per_rpc")
.tags(tags)
.serviceLevelObjectives(this.config.slo(TelemetryConfig.MetricsConfig.OpentelemetrySpec.V123))
.register(this.registry);

var responsesByRpc = DistributionSummary.builder("rpc.client.responses_per_rpc")
.tags(tags)
.serviceLevelObjectives(this.config.slo(TelemetryConfig.MetricsConfig.OpentelemetrySpec.V123))
Expand All @@ -88,12 +89,19 @@ private List<Tag> tags(MetricsKey key) {
list.add(Tag.of(SemanticAttributes.RPC_SYSTEM.getKey(), SemanticAttributes.RpcSystemValues.GRPC));
list.add(Tag.of(SemanticAttributes.SERVER_ADDRESS.getKey(), serverAddress));
list.add(Tag.of(SemanticAttributes.SERVER_PORT.getKey(), String.valueOf(serverPort)));

if (key.code != null) {
list.add(Tag.of(SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey(), String.valueOf(key.code)));
} else {
list.add(Tag.of(SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey(), ""));
}

if (key.errorType != null) {
list.add(Tag.of(SemanticAttributes.ERROR_TYPE.getKey(), key.errorType.getCanonicalName()));
} else {
list.add(Tag.of(SemanticAttributes.ERROR_TYPE.getKey(), ""));
}

return list;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,21 @@ private GrpcServerMetrics buildMetrics(TelemetryConfig.MetricsConfig config, Met
.tag(SemanticAttributes.RPC_METHOD.getKey(), metricsKey.methodName)
.tag(SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey(), Integer.toString(code))
.register(this.meterRegistry);

var requestsPerRpc = Counter.builder("rpc.server.requests_per_rpc")
.baseUnit("messages")
.tag(SemanticAttributes.RPC_SYSTEM.getKey(), SemanticAttributes.RpcSystemValues.GRPC)
.tag(SemanticAttributes.RPC_SERVICE.getKey(), metricsKey.serviceName)
.tag(SemanticAttributes.RPC_METHOD.getKey(), metricsKey.methodName)
.register(this.meterRegistry);

var responsesPerRpc = Counter.builder("rpc.server.responses_per_rpc")
.baseUnit("messages")
.tag(SemanticAttributes.RPC_SYSTEM.getKey(), SemanticAttributes.RpcSystemValues.GRPC)
.tag(SemanticAttributes.RPC_SERVICE.getKey(), metricsKey.serviceName)
.tag(SemanticAttributes.RPC_METHOD.getKey(), metricsKey.methodName)
.register(this.meterRegistry);

return switch (metricsConfig.opentelemetrySpec()) {
case V120 -> new Opentelemetry120GrpcServerMetrics(duration, requestsPerRpc, responsesPerRpc);
case V123 -> new Opentelemetry123GrpcServerMetrics(duration, requestsPerRpc, responsesPerRpc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ private DistributionSummary duration(DurationKey key) {
.tag(SemanticAttributes.HTTP_TARGET.getKey(), key.target)
.tag(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE.getKey(), Integer.toString(key.statusCode()))
.tag("http.status_code", Integer.toString(key.statusCode()));

return builder.register(meterRegistry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ private DistributionSummary duration(DurationKey key) {
.tag(SemanticAttributes.URL_SCHEME.getKey(), key.scheme)
.tag(SemanticAttributes.HTTP_ROUTE.getKey(), key.target)
.tag("http.status_code", Integer.toString(key.statusCode()));

return builder.register(meterRegistry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public Iterable<Tag> getDurationTags(DurationKey key) {
var list = new ArrayList<Tag>(6);
if (key.errorType() != null) {
list.add(Tag.of(SemanticAttributes.ERROR_TYPE.getKey(), key.errorType().getCanonicalName()));
} else {
list.add(Tag.of(SemanticAttributes.ERROR_TYPE.getKey(), ""));
}
list.add(Tag.of(SemanticAttributes.HTTP_REQUEST_METHOD.getKey(), key.method()));
list.add(Tag.of(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE.getKey(), Integer.toString(key.statusCode())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public JmsConsumerMetrics get(TelemetryConfig.MetricsConfig config, String queue
.tag(SemanticAttributes.MESSAGING_SYSTEM.getKey(), "jms")
.tag(SemanticAttributes.MESSAGING_DESTINATION_NAME.getKey(), queueName)
.tag(SemanticAttributes.MESSAGING_DESTINATION_KIND.getKey(), "queue");

var distributionSummary = builder.register(this.meterRegistry);
return new MicrometerJmsConsumerMetrics(distributionSummary);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,17 @@ public Opentelemetry120KafkaConsumerMetrics(MeterRegistry meterRegistry, Propert
private record DurationKey(String topic, int partition) {}

private DistributionSummary metrics(DurationKey key) {
var clientId = driverProperties.get(ProducerConfig.CLIENT_ID_CONFIG);
var groupId = driverProperties.get(ConsumerConfig.GROUP_ID_CONFIG);

var builder = DistributionSummary.builder("messaging.receive.duration")
.serviceLevelObjectives(this.config.slo(TelemetryConfig.MetricsConfig.OpentelemetrySpec.V120))
.baseUnit("milliseconds")
.tag(SemanticAttributes.MESSAGING_SYSTEM.getKey(), "kafka")
.tag(SemanticAttributes.MESSAGING_DESTINATION_NAME.getKey(), key.topic());
.tag(SemanticAttributes.MESSAGING_DESTINATION_NAME.getKey(), key.topic())
.tag(SemanticAttributes.MESSAGING_CLIENT_ID.getKey(), Objects.requireNonNullElse(clientId, "").toString())
.tag(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP.getKey(), Objects.requireNonNullElse(groupId, "").toString());

var clientId = driverProperties.get(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId != null) {
builder.tag(SemanticAttributes.MESSAGING_CLIENT_ID.getKey(), clientId.toString());
}
var groupId = driverProperties.get(ConsumerConfig.GROUP_ID_CONFIG);
if (groupId != null) {
builder.tag(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP.getKey(), groupId.toString());
}
return builder.register(this.meterRegistry);
}

Expand Down
Loading

0 comments on commit e21ddc9

Please sign in to comment.