diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java index 75124b6070b..757b3f4a956 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java @@ -7,6 +7,7 @@ import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.stats.dimensions.VersionRole; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.util.ArrayList; import java.util.Collections; @@ -321,7 +322,7 @@ public CurrentVersionConsumerPoolStrategy() { @Override public KafkaConsumerService delegateKafkaConsumerServiceFor( PartitionReplicaIngestionContext topicPartitionReplicaRole) { - PartitionReplicaIngestionContext.VersionRole versionRole = topicPartitionReplicaRole.getVersionRole(); + VersionRole versionRole = topicPartitionReplicaRole.getVersionRole(); PartitionReplicaIngestionContext.WorkloadType workloadType = topicPartitionReplicaRole.getWorkloadType(); PubSubTopicPartition pubSubTopicPartition = topicPartitionReplicaRole.getPubSubTopicPartition(); PubSubTopic pubSubTopic = pubSubTopicPartition.getPubSubTopic(); @@ -331,8 +332,7 @@ public KafkaConsumerService delegateKafkaConsumerServiceFor( * 2. If the workload type is active-active leader and write-computer leader. * 3. If the replica is ready to serve. */ - if (versionRole.equals(PartitionReplicaIngestionContext.VersionRole.CURRENT) - && topicPartitionReplicaRole.isReadyToServe()) { + if (versionRole.equals(VersionRole.CURRENT) && topicPartitionReplicaRole.isReadyToServe()) { if (workloadType.equals(PartitionReplicaIngestionContext.WorkloadType.AA_OR_WRITE_COMPUTE) && pubSubTopic.isRealTime()) { return pubSubTopic.isSeparateRealTimeTopic() diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionReplicaIngestionContext.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionReplicaIngestionContext.java index eb275185c07..25c35210292 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionReplicaIngestionContext.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionReplicaIngestionContext.java @@ -2,6 +2,7 @@ import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.stats.dimensions.VersionRole; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -22,10 +23,6 @@ public class PartitionReplicaIngestionContext { private final WorkloadType workloadType; private final boolean isReadyToServe; - public enum VersionRole { - CURRENT, BACKUP, FUTURE - } - // TODO: Add more workload types if needed, here we only care about active active or write compute workload. public enum WorkloadType { AA_OR_WRITE_COMPUTE, NON_AA_OR_WRITE_COMPUTE diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 15a910d3254..6c7102aa0a2 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -119,6 +119,7 @@ import com.linkedin.venice.serializer.AvroGenericDeserializer; import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; import com.linkedin.venice.serializer.RecordDeserializer; +import com.linkedin.venice.stats.dimensions.VersionRole; import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.system.store.MetaStoreWriter; import com.linkedin.venice.utils.ByteUtils; @@ -404,7 +405,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { private final String[] msgForLagMeasurement; protected final AtomicBoolean recordLevelMetricEnabled; protected final boolean isGlobalRtDivEnabled; - protected volatile PartitionReplicaIngestionContext.VersionRole versionRole; + protected volatile VersionRole versionRole; protected volatile PartitionReplicaIngestionContext.WorkloadType workloadType; protected final boolean batchReportIncPushStatusEnabled; @@ -1679,7 +1680,7 @@ protected void refreshIngestionContextIfChanged(Store store) throws InterruptedE } boolean isWriteComputeEnabled = store.isWriteComputationEnabled(); - PartitionReplicaIngestionContext.VersionRole newVersionRole = + VersionRole newVersionRole = PartitionReplicaIngestionContext.determineStoreVersionRole(versionNumber, currentVersionNumber); PartitionReplicaIngestionContext.WorkloadType newWorkloadType = PartitionReplicaIngestionContext.determineWorkloadType(isActiveActiveReplicationEnabled, isWriteComputeEnabled); @@ -5013,7 +5014,7 @@ Lazy getGracefulShutdownLatch() { } // For unit test purpose. - void setVersionRole(PartitionReplicaIngestionContext.VersionRole versionRole) { + void setVersionRole(VersionRole versionRole) { this.versionRole = versionRole; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AbstractVeniceAggVersionedStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AbstractVeniceAggVersionedStats.java index 07d450784c7..9c0c9851349 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AbstractVeniceAggVersionedStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AbstractVeniceAggVersionedStats.java @@ -33,6 +33,10 @@ public abstract class AbstractVeniceAggVersionedStats> aggStats; private final boolean unregisterMetricForDeletedStoreEnabled; + protected MetricsRepository getMetricsRepository() { + return metricsRepository; + } + public AbstractVeniceAggVersionedStats( MetricsRepository metricsRepository, ReadOnlyStoreRepository metadataRepository, @@ -135,6 +139,9 @@ protected void updateStatsVersionInfo(String storeName, List existingVe versionedStats.setFutureVersion(futureVersion); } + // Notify subclasses that version info has changed + onVersionInfoUpdated(storeName, versionedStats.getCurrentVersion(), versionedStats.getFutureVersion()); + /** * Since versions are changed, update the total stats accordingly. */ @@ -186,4 +193,16 @@ protected int getCurrentVersion(String storeName) { protected void updateTotalStats(String storeName) { // no-op } + + /** + * Hook method called when version info is updated for a store. + * Subclasses can override this to react to version changes. + * + * @param storeName The store whose version info changed + * @param currentVersion The new current version + * @param futureVersion The new future version + */ + protected void onVersionInfoUpdated(String storeName, int currentVersion, int futureVersion) { + // no-op by default + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ServerMetricEntity.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ServerMetricEntity.java new file mode 100644 index 00000000000..ee6ac9005e5 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ServerMetricEntity.java @@ -0,0 +1,53 @@ +package com.linkedin.davinci.stats; + +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_STATE; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_TYPE; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_VERSION_ROLE; +import static com.linkedin.venice.utils.Utils.setOf; + +import com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions; +import com.linkedin.venice.stats.metrics.MetricEntity; +import com.linkedin.venice.stats.metrics.MetricType; +import com.linkedin.venice.stats.metrics.MetricUnit; +import com.linkedin.venice.stats.metrics.ModuleMetricEntityInterface; +import java.util.Set; + + +/** + * List all metric entities for Venice server (storage node). + */ +public enum ServerMetricEntity implements ModuleMetricEntityInterface { + /** + * Heartbeat replication delay: Tracks nearline replication lag in milliseconds. + */ + INGESTION_HEARTBEAT_DELAY( + "ingestion.replication.heartbeat.delay", MetricType.HISTOGRAM, MetricUnit.MILLISECOND, + "Nearline ingestion replication lag", + setOf( + VENICE_STORE_NAME, + VENICE_CLUSTER_NAME, + VENICE_REGION_NAME, + VENICE_VERSION_ROLE, + VENICE_REPLICA_TYPE, + VENICE_REPLICA_STATE) + ); + + private final MetricEntity metricEntity; + + ServerMetricEntity( + String name, + MetricType metricType, + MetricUnit unit, + String description, + Set dimensionsList) { + this.metricEntity = new MetricEntity(name, metricType, unit, description, dimensionsList); + } + + @Override + public MetricEntity getMetricEntity() { + return metricEntity; + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java index a691c091291..3e5816c0329 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java @@ -103,7 +103,8 @@ public HeartbeatMonitoringService( storeName, regionNames), leaderHeartbeatTimeStamps, - followerHeartbeatTimeStamps); + followerHeartbeatTimeStamps, + serverConfig.getClusterName()); this.heartbeatMonitoringServiceStats = heartbeatMonitoringServiceStats; this.customizedViewRepositoryFuture = customizedViewRepositoryFuture; this.nodeId = Utils.getHelixNodeIdentifier(serverConfig.getListenerHostname(), serverConfig.getListenerPort()); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatOtelStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatOtelStats.java new file mode 100644 index 00000000000..37ce7461d19 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatOtelStats.java @@ -0,0 +1,147 @@ +package com.linkedin.davinci.stats.ingestion.heartbeat; + +import static com.linkedin.davinci.stats.ServerMetricEntity.INGESTION_HEARTBEAT_DELAY; +import static com.linkedin.venice.meta.Store.NON_EXISTING_VERSION; +import static com.linkedin.venice.stats.metrics.ModuleMetricEntityInterface.getUniqueMetricEntities; + +import com.google.common.annotations.VisibleForTesting; +import com.linkedin.davinci.stats.ServerMetricEntity; +import com.linkedin.venice.stats.OpenTelemetryMetricsSetup; +import com.linkedin.venice.stats.VeniceOpenTelemetryMetricsRepository; +import com.linkedin.venice.stats.dimensions.ReplicaState; +import com.linkedin.venice.stats.dimensions.ReplicaType; +import com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions; +import com.linkedin.venice.stats.dimensions.VersionRole; +import com.linkedin.venice.stats.metrics.MetricEntity; +import com.linkedin.venice.stats.metrics.MetricEntityStateThreeEnums; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import io.tehuti.metrics.MetricsRepository; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + + +/** + * OpenTelemetry metrics for heartbeat monitoring. + * Note: Tehuti metrics are managed separately in {@link HeartbeatStatReporter}. + */ +public class HeartbeatOtelStats { + public static final Collection SERVER_METRIC_ENTITIES = + getUniqueMetricEntities(ServerMetricEntity.class); + private final boolean emitOtelMetrics; + private final VeniceOpenTelemetryMetricsRepository otelRepository; + private final Map baseDimensionsMap; + + // Per-region metric entity states + private final Map> metricsByRegion; + + private static class VersionInfo { + private final int currentVersion; + private final int futureVersion; + + VersionInfo(int currentVersion, int futureVersion) { + this.currentVersion = currentVersion; + this.futureVersion = futureVersion; + } + } + + private volatile VersionInfo versionInfo = new VersionInfo(NON_EXISTING_VERSION, NON_EXISTING_VERSION); + + public HeartbeatOtelStats(MetricsRepository metricsRepository, String storeName, String clusterName) { + this.metricsByRegion = new VeniceConcurrentHashMap<>(); + + OpenTelemetryMetricsSetup.OpenTelemetryMetricsSetupInfo otelSetup = + OpenTelemetryMetricsSetup.builder(metricsRepository) + .setStoreName(storeName) + .setClusterName(clusterName) + .build(); + + this.emitOtelMetrics = otelSetup.emitOpenTelemetryMetrics(); + this.otelRepository = otelSetup.getOtelRepository(); + this.baseDimensionsMap = otelSetup.getBaseDimensionsMap(); + } + + /** + * Returns true if OTel metrics are emitted. + */ + public boolean emitOtelMetrics() { + return emitOtelMetrics; + } + + /** + * Updates the current and future version for this store. + * + * @param currentVersion The current serving version + * @param futureVersion The future/upcoming version + */ + public void updateVersionInfo(int currentVersion, int futureVersion) { + this.versionInfo = new VersionInfo(currentVersion, futureVersion); + } + + /** + * Records a heartbeat delay with all dimensional attributes to OTel metrics. + * Returns early if OTel metrics are disabled or version is invalid. + * + * @param version The version number + * @param region The region name + * @param replicaType The replica type {@link ReplicaType} + * @param replicaState The replica state {@link ReplicaState} + * @param delayMs The delay in milliseconds + */ + public void recordHeartbeatDelayOtelMetrics( + int version, + String region, + ReplicaType replicaType, + ReplicaState replicaState, + long delayMs) { + if (!emitOtelMetrics()) { + return; + } + VersionRole versionRole = classifyVersion(version, this.versionInfo); + + MetricEntityStateThreeEnums metricState = getOrCreateMetricState(region); + + // Records to OTel metrics only + metricState.record(delayMs, versionRole, replicaType, replicaState); + } + + /** + * Gets or creates a metric entity state for a specific region. + */ + private MetricEntityStateThreeEnums getOrCreateMetricState(String region) { + return metricsByRegion.computeIfAbsent(region, r -> { + // Add region to base dimensions + Map regionBaseDimensions = new HashMap<>(baseDimensionsMap); + regionBaseDimensions.put(VeniceMetricsDimensions.VENICE_REGION_NAME, r); + + return MetricEntityStateThreeEnums.create( + INGESTION_HEARTBEAT_DELAY.getMetricEntity(), + otelRepository, + regionBaseDimensions, + VersionRole.class, + ReplicaType.class, + ReplicaState.class); + }); + } + + /** + * Classifies a version as CURRENT or FUTURE or BACKUP + * + * @param version The version number to classify + * @param versionInfo The current/future version (cached) + * @return {@link VersionRole} + */ + static VersionRole classifyVersion(int version, VersionInfo versionInfo) { + if (version == versionInfo.currentVersion) { + return VersionRole.CURRENT; + } else if (version == versionInfo.futureVersion) { + return VersionRole.FUTURE; + } + return VersionRole.BACKUP; + } + + @VisibleForTesting + public VersionInfo getVersionInfo() { + return versionInfo; + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStat.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStat.java index fd2a628e3e0..81cb62b3b69 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStat.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStat.java @@ -34,19 +34,37 @@ public HeartbeatStat(MetricConfig metricConfig, Set regions) { defaultSensor = new WritePathLatencySensor(localRepository, metricConfig, "default-"); } - public void recordReadyToServeLeaderLag(String region, long startTime) { - long endTime = System.currentTimeMillis(); - readyToServeLeaderSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime); + /** + * Records the heartbeat lag for a ready-to-serve leader replica. + * + * @param region The region name + * @param delay The pre-calculated delay in milliseconds + * @param endTime The pre-calculated end time + */ + public void recordReadyToServeLeaderLag(String region, long delay, long endTime) { + readyToServeLeaderSensors.computeIfAbsent(region, k -> defaultSensor).record(delay, endTime); } - public void recordReadyToServeFollowerLag(String region, long startTime) { - long endTime = System.currentTimeMillis(); - readyToServeFollowerSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime); + /** + * Records the heartbeat lag for a ready-to-serve follower replica. + * + * @param region The region name + * @param delay The pre-calculated delay in milliseconds + * @param endTime The pre-calculated end time + */ + public void recordReadyToServeFollowerLag(String region, long delay, long endTime) { + readyToServeFollowerSensors.computeIfAbsent(region, k -> defaultSensor).record(delay, endTime); } - public void recordCatchingUpFollowerLag(String region, long startTime) { - long endTime = System.currentTimeMillis(); - catchingUpFollowerSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime); + /** + * Records the heartbeat lag for a catching-up follower replica. + * + * @param region The region name + * @param delay The pre-calculated delay in milliseconds (0 for squelching) + * @param endTime The pre-calculated end time + */ + public void recordCatchingUpFollowerLag(String region, long delay, long endTime) { + catchingUpFollowerSensors.computeIfAbsent(region, k -> defaultSensor).record(delay, endTime); } public WritePathLatencySensor getReadyToServeLeaderLag(String region) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java index a1f0c9994cc..10d46285ad2 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java @@ -1,9 +1,13 @@ package com.linkedin.davinci.stats.ingestion.heartbeat; +import com.google.common.annotations.VisibleForTesting; import com.linkedin.davinci.stats.AbstractVeniceAggVersionedStats; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.meta.Store; import com.linkedin.venice.stats.StatsSupplier; +import com.linkedin.venice.stats.dimensions.ReplicaState; +import com.linkedin.venice.stats.dimensions.ReplicaType; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import io.tehuti.metrics.MetricsRepository; import java.util.Map; import java.util.function.Supplier; @@ -13,20 +17,43 @@ public class HeartbeatVersionedStats extends AbstractVeniceAggVersionedStats>>> leaderMonitors; private final Map>>> followerMonitors; + // OpenTelemetry metrics per store + private final Map otelStatsMap; + private final String clusterName; + + // Time supplier for testability: defaults to System.currentTimeMillis() + private Supplier currentTimeSupplier = System::currentTimeMillis; + public HeartbeatVersionedStats( MetricsRepository metricsRepository, ReadOnlyStoreRepository metadataRepository, Supplier statsInitiator, StatsSupplier reporterSupplier, Map>>> leaderMonitors, - Map>>> followerMonitors) { + Map>>> followerMonitors, + String clusterName) { super(metricsRepository, metadataRepository, statsInitiator, reporterSupplier, true); this.leaderMonitors = leaderMonitors; this.followerMonitors = followerMonitors; + this.clusterName = clusterName; + this.otelStatsMap = new VeniceConcurrentHashMap<>(); } public void recordLeaderLag(String storeName, int version, String region, long heartbeatTs) { - getStats(storeName, version).recordReadyToServeLeaderLag(region, heartbeatTs); + // Calculate current time and delay once for both Tehuti and OTel metrics + long currentTime = currentTimeSupplier.get(); + long delay = currentTime - heartbeatTs; + + // Tehuti metrics + getStats(storeName, version).recordReadyToServeLeaderLag(region, delay, currentTime); + + // OTel metrics + getOrCreateOtelStats(storeName).recordHeartbeatDelayOtelMetrics( + version, + region, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, // Leaders are always ready to serve + delay); } public void recordFollowerLag( @@ -35,17 +62,34 @@ public void recordFollowerLag( String region, long heartbeatTs, boolean isReadyToServe) { + // Calculate current time and delay once for all metrics + long currentTime = currentTimeSupplier.get(); + long delay = currentTime - heartbeatTs; + // If the partition is ready to serve, report it's lag to the main lag metric. Otherwise, report it // to the catch up metric. - // The metric which isn't updated is squelched by reporting the currentTime (so as to appear caught up and mute - // alerts) - if (isReadyToServe) { - getStats(storeName, version).recordReadyToServeFollowerLag(region, heartbeatTs); - getStats(storeName, version).recordCatchingUpFollowerLag(region, System.currentTimeMillis()); - } else { - getStats(storeName, version).recordReadyToServeFollowerLag(region, System.currentTimeMillis()); - getStats(storeName, version).recordCatchingUpFollowerLag(region, heartbeatTs); - } + // The metric which isn't updated is squelched by reporting delay=0 (to appear caught up and mute alerts) + long readyToServeDelay = isReadyToServe ? delay : 0; + long catchingUpDelay = isReadyToServe ? 0 : delay; + + // Record to both Tehuti sensors (one gets actual delay, other gets 0 for squelching) + getStats(storeName, version).recordReadyToServeFollowerLag(region, readyToServeDelay, currentTime); + getStats(storeName, version).recordCatchingUpFollowerLag(region, catchingUpDelay, currentTime); + + // Record to both OTel dimensions (one gets actual delay, other gets 0 for squelching) + HeartbeatOtelStats otelStats = getOrCreateOtelStats(storeName); + otelStats.recordHeartbeatDelayOtelMetrics( + version, + region, + ReplicaType.FOLLOWER, + ReplicaState.READY_TO_SERVE, + readyToServeDelay); + otelStats.recordHeartbeatDelayOtelMetrics( + version, + region, + ReplicaType.FOLLOWER, + ReplicaState.CATCHING_UP, + catchingUpDelay); } @Override @@ -65,6 +109,15 @@ public void handleStoreChanged(Store store) { } } + @Override + protected void onVersionInfoUpdated(String storeName, int currentVersion, int futureVersion) { + // Update OTel stats version cache when versions change + otelStatsMap.computeIfPresent(storeName, (store, stats) -> { + stats.updateVersionInfo(currentVersion, futureVersion); + return stats; + }); + } + boolean isStoreAssignedToThisNode(String store) { if (leaderMonitors == null || followerMonitors == null) { // TODO: We have to do this because theres a self call in the constructor @@ -73,4 +126,27 @@ boolean isStoreAssignedToThisNode(String store) { } return leaderMonitors.containsKey(store) || followerMonitors.containsKey(store); } + + /** + * Gets or creates OTel stats for a store. + * Version info will be initialized via onVersionInfoUpdated() callback. + */ + private HeartbeatOtelStats getOrCreateOtelStats(String storeName) { + return otelStatsMap.computeIfAbsent(storeName, key -> { + HeartbeatOtelStats stats = new HeartbeatOtelStats(getMetricsRepository(), storeName, clusterName); + // Initialize version cache with current values + stats.updateVersionInfo(getCurrentVersion(storeName), getFutureVersion(storeName)); + return stats; + }); + } + + @VisibleForTesting + HeartbeatStat getStatsForTesting(String storeName, int version) { + return getStats(storeName, version); + } + + @VisibleForTesting + void setCurrentTimeSupplier(Supplier timeSupplier) { + this.currentTimeSupplier = timeSupplier; + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java index e892c7e83d1..295df67308f 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java @@ -30,6 +30,7 @@ import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.pubsub.manager.TopicManager; import com.linkedin.venice.pubsub.manager.TopicManagerContext.PubSubPropertiesSupplier; +import com.linkedin.venice.stats.dimensions.VersionRole; import com.linkedin.venice.utils.SystemTime; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; @@ -139,7 +140,7 @@ public void testSubscribeConsumerFor() { PartitionReplicaIngestionContext partitionReplicaIngestionContext = new PartitionReplicaIngestionContext( topic, topicPartition, - PartitionReplicaIngestionContext.VersionRole.CURRENT, + VersionRole.CURRENT, PartitionReplicaIngestionContext.WorkloadType.NON_AA_OR_WRITE_COMPUTE); StorePartitionDataReceiver dataReceiver = (StorePartitionDataReceiver) aggKafkaConsumerServiceSpy.subscribeConsumerFor( diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java index 2248d06ec26..dff5d49ee75 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java @@ -31,6 +31,7 @@ import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.stats.dimensions.VersionRole; import com.linkedin.venice.utils.SystemTime; import com.linkedin.venice.utils.Utils; import io.tehuti.metrics.MetricsRepository; @@ -98,13 +99,13 @@ public void batchUnsubscribe_start_stop_getMaxElapsedTimeMSSinceLastPollInConsum PartitionReplicaIngestionContext topicPartitionIngestionContextForVT = new PartitionReplicaIngestionContext( versionTopic, topicPartitionForVT, - PartitionReplicaIngestionContext.VersionRole.CURRENT, + VersionRole.CURRENT, PartitionReplicaIngestionContext.WorkloadType.NON_AA_OR_WRITE_COMPUTE); delegator.startConsumptionIntoDataReceiver(topicPartitionIngestionContextForVT, position0, dataReceiver, false); PartitionReplicaIngestionContext topicPartitionIngestionContextForRT = new PartitionReplicaIngestionContext( versionTopic, topicPartitionForRT, - PartitionReplicaIngestionContext.VersionRole.CURRENT, + VersionRole.CURRENT, PartitionReplicaIngestionContext.WorkloadType.NON_AA_OR_WRITE_COMPUTE); delegator.startConsumptionIntoDataReceiver(topicPartitionIngestionContextForRT, position0, dataReceiver, false); @@ -161,24 +162,24 @@ public void startConsumptionIntoDataReceiverTest() { PartitionReplicaIngestionContext tpForCurrentAAWCLeader = new PartitionReplicaIngestionContext( versionTopic, new PubSubTopicPartitionImpl(rtTopic, PARTITION_ID), - PartitionReplicaIngestionContext.VersionRole.CURRENT, + VersionRole.CURRENT, PartitionReplicaIngestionContext.WorkloadType.AA_OR_WRITE_COMPUTE); PartitionReplicaIngestionContext tpForCurrentAAWCFollower = new PartitionReplicaIngestionContext( versionTopic, new PubSubTopicPartitionImpl(versionTopic, PARTITION_ID), - PartitionReplicaIngestionContext.VersionRole.CURRENT, + VersionRole.CURRENT, PartitionReplicaIngestionContext.WorkloadType.AA_OR_WRITE_COMPUTE); PubSubTopic futureVersionTopic = TOPIC_REPOSITORY.getTopic("test_store_v2"); PartitionReplicaIngestionContext tpForNonCurrentAAWCLeader = new PartitionReplicaIngestionContext( versionTopic, new PubSubTopicPartitionImpl(rtTopic, PARTITION_ID), - PartitionReplicaIngestionContext.VersionRole.FUTURE, + VersionRole.FUTURE, PartitionReplicaIngestionContext.WorkloadType.AA_OR_WRITE_COMPUTE); PartitionReplicaIngestionContext tpForNonCurrentAAWCFollower = new PartitionReplicaIngestionContext( versionTopic, new PubSubTopicPartitionImpl(futureVersionTopic, PARTITION_ID), - PartitionReplicaIngestionContext.VersionRole.BACKUP, + VersionRole.BACKUP, PartitionReplicaIngestionContext.WorkloadType.AA_OR_WRITE_COMPUTE); doReturn(true).when(mockConfig).isResubscriptionTriggeredByVersionIngestionContextChangeEnabled(); @@ -334,7 +335,7 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception PartitionReplicaIngestionContext partitionReplicaIngestionContext = new PartitionReplicaIngestionContext( versionTopicForStoreName3, realTimeTopicPartition, - PartitionReplicaIngestionContext.VersionRole.CURRENT, + VersionRole.CURRENT, PartitionReplicaIngestionContext.WorkloadType.AA_OR_WRITE_COMPUTE); ConsumedDataReceiver consumedDataReceiver = mock(ConsumedDataReceiver.class); when(consumedDataReceiver.destinationIdentifier()).thenReturn(versionTopicForStoreName3); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.java index 30987ebc219..3d0bdeb1294 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.java @@ -34,6 +34,7 @@ import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.stats.dimensions.VersionRole; import com.linkedin.venice.utils.RandomAccessDaemonThreadFactory; import com.linkedin.venice.utils.SystemTime; import com.linkedin.venice.utils.TestUtils; @@ -99,7 +100,7 @@ public void testGetTopicPartitionIngestionInformation() throws Exception { PartitionReplicaIngestionContext partitionReplicaIngestionContext = new PartitionReplicaIngestionContext( versionTopic, topicPartition, - PartitionReplicaIngestionContext.VersionRole.CURRENT, + VersionRole.CURRENT, PartitionReplicaIngestionContext.WorkloadType.NON_AA_OR_WRITE_COMPUTE); ConsumedDataReceiver consumedDataReceiver = mock(ConsumedDataReceiver.class); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 09b556c27f8..b63b9389a96 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -203,6 +203,7 @@ import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; import com.linkedin.venice.serializer.RecordSerializer; +import com.linkedin.venice.stats.dimensions.VersionRole; import com.linkedin.venice.throttle.EventThrottler; import com.linkedin.venice.unit.matchers.ExceptionClassMatcher; import com.linkedin.venice.unit.matchers.NonEmptyStringMatcher; @@ -3440,7 +3441,7 @@ public void testRecordsCanBeThrottledPerRegion() throws ExecutionException, Inte PartitionReplicaIngestionContext fooRtPartitionReplicaIngestionContext = new PartitionReplicaIngestionContext( pubSubTopic, fooRtPartition, - PartitionReplicaIngestionContext.VersionRole.CURRENT, + VersionRole.CURRENT, PartitionReplicaIngestionContext.WorkloadType.NON_AA_OR_WRITE_COMPUTE); inMemoryLocalKafkaBroker.createTopic(rtTopic, partitionCount); inMemoryRemoteKafkaBroker.createTopic(rtTopic, partitionCount); @@ -4454,14 +4455,14 @@ private Consumer getObserver( topicPartitionOffset.getPubSubTopicPartition(), topicPartitionOffset.getPubSubPosition()); if (pubSubTopicPartition.getPubSubTopic().isVersionTopic() && resubscriptionOffsetForVT.contains(position)) { - storeIngestionTaskUnderTest.setVersionRole(PartitionReplicaIngestionContext.VersionRole.BACKUP); + storeIngestionTaskUnderTest.setVersionRole(VersionRole.BACKUP); resubscriptionLatch.countDown(); LOGGER.info( "Trigger re-subscription after consuming message for {} at position {} ", pubSubTopicPartition, position); } else if (pubSubTopicPartition.getPubSubTopic().isRealTime() && resubscriptionOffsetForRT.contains(position)) { - storeIngestionTaskUnderTest.setVersionRole(PartitionReplicaIngestionContext.VersionRole.BACKUP); + storeIngestionTaskUnderTest.setVersionRole(VersionRole.BACKUP); resubscriptionLatch.countDown(); LOGGER.info( "Trigger re-subscription after consuming message for {} at position {}.", @@ -4683,7 +4684,7 @@ public void testResubscribeForStaleVersion() throws Exception { null)); // Simulate the version has been deleted. - ingestionTask.setVersionRole(PartitionReplicaIngestionContext.VersionRole.BACKUP); + ingestionTask.setVersionRole(VersionRole.BACKUP); doReturn(null).when(store).getVersion(eq(1)); ingestionTask.refreshIngestionContextIfChanged(store); verify(ingestionTask, never()).resubscribeForAllPartitions(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java new file mode 100644 index 00000000000..f66e017babb --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java @@ -0,0 +1,66 @@ +package com.linkedin.davinci.stats; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions; +import com.linkedin.venice.stats.metrics.MetricEntity; +import com.linkedin.venice.stats.metrics.MetricType; +import com.linkedin.venice.stats.metrics.MetricUnit; +import com.linkedin.venice.utils.Utils; +import java.util.HashMap; +import java.util.Map; +import org.testng.annotations.Test; + + +public class ServerMetricEntityTest { + @Test + public void testServerMetricEntities() { + Map expectedMetrics = new HashMap<>(); + expectedMetrics.put( + ServerMetricEntity.INGESTION_HEARTBEAT_DELAY, + new MetricEntity( + "ingestion.replication.heartbeat.delay", + MetricType.HISTOGRAM, + MetricUnit.MILLISECOND, + "Nearline ingestion replication lag", + Utils.setOf( + VeniceMetricsDimensions.VENICE_STORE_NAME, + VeniceMetricsDimensions.VENICE_CLUSTER_NAME, + VeniceMetricsDimensions.VENICE_REGION_NAME, + VeniceMetricsDimensions.VENICE_VERSION_ROLE, + VeniceMetricsDimensions.VENICE_REPLICA_TYPE, + VeniceMetricsDimensions.VENICE_REPLICA_STATE))); + + for (ServerMetricEntity metric: ServerMetricEntity.values()) { + MetricEntity actual = metric.getMetricEntity(); + MetricEntity expected = expectedMetrics.get(metric); + + assertNotNull(expected, "No expected definition for " + metric.name()); + assertNotNull(actual.getMetricName(), "Metric name should not be null for " + metric.name()); + assertEquals(actual.getMetricName(), expected.getMetricName(), "Unexpected metric name for " + metric.name()); + assertNotNull(actual.getMetricType(), "Metric type should not be null for " + metric.name()); + assertEquals(actual.getMetricType(), expected.getMetricType(), "Unexpected metric type for " + metric.name()); + assertNotNull(actual.getUnit(), "Metric unit should not be null for " + metric.name()); + assertEquals(actual.getUnit(), expected.getUnit(), "Unexpected metric unit for " + metric.name()); + assertNotNull(actual.getDescription(), "Metric description should not be null for " + metric.name()); + assertEquals( + actual.getDescription(), + expected.getDescription(), + "Unexpected metric description for " + metric.name()); + assertNotNull(actual.getDimensionsList(), "Metric dimensions should not be null for " + metric.name()); + assertEquals( + actual.getDimensionsList(), + expected.getDimensionsList(), + "Unexpected metric dimensions for " + metric.name()); + } + } + + @Test + public void testGetMetricEntityNotNull() { + // Verify that getMetricEntity() never returns null for any enum value + for (ServerMetricEntity entity: ServerMetricEntity.values()) { + assertNotNull(entity.getMetricEntity(), "getMetricEntity() should not return null for " + entity.name()); + } + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatOtelStatsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatOtelStatsTest.java new file mode 100644 index 00000000000..e2b7ba5faf7 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatOtelStatsTest.java @@ -0,0 +1,583 @@ +package com.linkedin.davinci.stats.ingestion.heartbeat; + +import static com.linkedin.davinci.stats.ServerMetricEntity.INGESTION_HEARTBEAT_DELAY; +import static com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatOtelStats.SERVER_METRIC_ENTITIES; +import static com.linkedin.venice.meta.Store.NON_EXISTING_VERSION; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_STATE; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_TYPE; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_VERSION_ROLE; +import static com.linkedin.venice.utils.OpenTelemetryDataTestUtils.validateExponentialHistogramPointData; +import static org.testng.Assert.*; + +import com.linkedin.venice.stats.VeniceMetricsConfig; +import com.linkedin.venice.stats.VeniceMetricsRepository; +import com.linkedin.venice.stats.dimensions.ReplicaState; +import com.linkedin.venice.stats.dimensions.ReplicaType; +import com.linkedin.venice.stats.dimensions.VersionRole; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.tehuti.metrics.MetricsRepository; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + + +public class HeartbeatOtelStatsTest { + private static final String STORE_NAME = "test_store"; + private static final String CLUSTER_NAME = "test_cluster"; + private static final String REGION_US_WEST = "us-west"; + private static final String REGION_US_EAST = "us-east"; + private static final int BACKUP_VERSION = 1; + private static final int CURRENT_VERSION = 2; + private static final int FUTURE_VERSION = 3; + private static final String TEST_PREFIX = "test_prefix"; + + private InMemoryMetricReader inMemoryMetricReader; + private HeartbeatOtelStats heartbeatOtelStats; + + @BeforeMethod + public void setUp() { + inMemoryMetricReader = InMemoryMetricReader.create(); + VeniceMetricsRepository metricsRepository = new VeniceMetricsRepository( + new VeniceMetricsConfig.Builder().setMetricEntities(SERVER_METRIC_ENTITIES) + .setMetricPrefix(TEST_PREFIX) + .setEmitOtelMetrics(true) + .setOtelAdditionalMetricsReader(inMemoryMetricReader) + .build()); + heartbeatOtelStats = new HeartbeatOtelStats(metricsRepository, STORE_NAME, CLUSTER_NAME); + } + + @Test + public void testConstructorWithOtelEnabled() { + // Verify OTel metrics are enabled + assertTrue(heartbeatOtelStats.emitOtelMetrics(), "OTel metrics should be enabled"); + } + + @Test + public void testConstructorWithOtelDisabled() { + // Create with OTel disabled + VeniceMetricsRepository disabledMetricsRepository = new VeniceMetricsRepository( + new VeniceMetricsConfig.Builder().setMetricEntities(SERVER_METRIC_ENTITIES) + .setEmitOtelMetrics(false) + .setOtelAdditionalMetricsReader(inMemoryMetricReader) + .build()); + + HeartbeatOtelStats stats = new HeartbeatOtelStats(disabledMetricsRepository, STORE_NAME, CLUSTER_NAME); + + // Verify OTel metrics are disabled + assertFalse(stats.emitOtelMetrics(), "OTel metrics should be disabled"); + } + + @Test + public void testConstructorWithNonVeniceMetricsRepository() { + // Create with regular MetricsRepository (not VeniceOpenTelemetryMetricsRepository) + MetricsRepository regularRepository = new MetricsRepository(); + HeartbeatOtelStats stats = new HeartbeatOtelStats(regularRepository, STORE_NAME, CLUSTER_NAME); + + // Verify OTel metrics are disabled (default for non-Venice repository) + assertFalse(stats.emitOtelMetrics(), "OTel metrics should be disabled for non-Venice repository"); + } + + @Test + public void testUpdateVersionInfo() { + // Update version info + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + // Record metrics - should work with updated version info + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100L); + + // Verify metric was recorded with CURRENT version type + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.CURRENT, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100.0, + 1); + } + + @Test + public void testRecordHeartbeatDelayOtelMetricsForCurrentVersion() { + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + long delay = 150L; + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + delay); + + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.CURRENT, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + (double) delay, + 1); + } + + @Test + public void testRecordHeartbeatDelayOtelMetricsForFutureVersion() { + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + long delay = 200L; + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + FUTURE_VERSION, + REGION_US_WEST, + ReplicaType.FOLLOWER, + ReplicaState.CATCHING_UP, + delay); + + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.FUTURE, + ReplicaType.FOLLOWER, + ReplicaState.CATCHING_UP, + (double) delay, + 1); + } + + @Test + public void testRecordHeartbeatDelayOtelMetricsForBackupVersion() { + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + long delay = 300L; + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + BACKUP_VERSION, + REGION_US_WEST, + ReplicaType.FOLLOWER, + ReplicaState.READY_TO_SERVE, + delay); + + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.BACKUP, + ReplicaType.FOLLOWER, + ReplicaState.READY_TO_SERVE, + (double) delay, + 1); + } + + @Test + public void testRecordHeartbeatDelayOtelMetricsForNonExistingVersion() { + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + // Record with NON_EXISTING_VERSION - should be classified as BACKUP + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + NON_EXISTING_VERSION, + REGION_US_WEST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100L); + + // Verify metric was recorded with BACKUP version type + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.BACKUP, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100.0, + 1); + } + + @Test + public void testRecordHeartbeatDelayOtelMetricsWithMultipleRegions() { + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + // Record for US-WEST + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100L); + + // Record for US-EAST + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_EAST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 200L); + + // Verify both regions recorded metrics + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.CURRENT, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100.0, + 1); + + validateHeartbeatMetric( + REGION_US_EAST, + VersionRole.CURRENT, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 200.0, + 1); + } + + @Test + public void testRecordMultipleHeartbeatsForSameRegion() { + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + // Record multiple heartbeats + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100L); + + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 150L); + + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 200L); + + // Verify aggregate metrics (min=100, max=200, sum=450, count=3) + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.CURRENT, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100.0, // min + 200.0, // max + 450.0, // sum + 3); // count + } + + @DataProvider(name = "versionRoleProvider") + public Object[][] versionRoleProvider() { + return new Object[][] { { VersionRole.CURRENT, CURRENT_VERSION }, { VersionRole.FUTURE, FUTURE_VERSION }, + { VersionRole.BACKUP, BACKUP_VERSION } }; + } + + @Test(dataProvider = "versionRoleProvider") + public void testAllVersionRoles(VersionRole expectedVersionRole, int version) { + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + version, + REGION_US_WEST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100L); + + validateHeartbeatMetric( + REGION_US_WEST, + expectedVersionRole, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100.0, + 1); + } + + @DataProvider(name = "replicaTypeProvider") + public Object[][] replicaTypeProvider() { + return new Object[][] { { ReplicaType.LEADER }, { ReplicaType.FOLLOWER } }; + } + + @Test(dataProvider = "replicaTypeProvider") + public void testAllReplicaTypes(ReplicaType replicaType) { + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + replicaType, + ReplicaState.READY_TO_SERVE, + 100L); + + validateHeartbeatMetric(REGION_US_WEST, VersionRole.CURRENT, replicaType, ReplicaState.READY_TO_SERVE, 100.0, 1); + } + + @DataProvider(name = "replicaStateProvider") + public Object[][] replicaStateProvider() { + return new Object[][] { { ReplicaState.READY_TO_SERVE }, { ReplicaState.CATCHING_UP } }; + } + + @Test(dataProvider = "replicaStateProvider") + public void testAllReplicaStates(ReplicaState replicaState) { + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + heartbeatOtelStats + .recordHeartbeatDelayOtelMetrics(CURRENT_VERSION, REGION_US_WEST, ReplicaType.FOLLOWER, replicaState, 100L); + + validateHeartbeatMetric(REGION_US_WEST, VersionRole.CURRENT, ReplicaType.FOLLOWER, replicaState, 100.0, 1); + } + + @Test + public void testVersionInfoUpdateChangesClassification() { + // Initially set version 1 as current + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + // Record for version 1 - should be CURRENT + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100L); + + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.CURRENT, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100.0, + 1); + + // Update version info - version 1 is now backup + heartbeatOtelStats.updateVersionInfo(FUTURE_VERSION, BACKUP_VERSION); + + // Record for version 1 again - should now be BACKUP + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 150L); + + // Verify version 1 is now classified as BACKUP + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.BACKUP, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 150.0, + 1); + } + + @Test + public void testNoMetricsRecordedWhenOtelDisabled() { + // Create stats with OTel disabled + VeniceMetricsRepository disabledMetricsRepository = new VeniceMetricsRepository( + new VeniceMetricsConfig.Builder().setMetricEntities(SERVER_METRIC_ENTITIES) + .setEmitOtelMetrics(false) + .setOtelAdditionalMetricsReader(inMemoryMetricReader) + .build()); + HeartbeatOtelStats stats = new HeartbeatOtelStats(disabledMetricsRepository, STORE_NAME, CLUSTER_NAME); + + stats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + // Try to record - should be no-op + stats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100L); + + // Verify no metrics were recorded + assertEquals( + inMemoryMetricReader.collectAllMetrics().size(), + 0, + "No metrics should be recorded when OTel disabled"); + } + + @Test + public void testVersionInfoWithNonExistingVersions() { + // Set both current and future as NON_EXISTING_VERSION + heartbeatOtelStats.updateVersionInfo(NON_EXISTING_VERSION, NON_EXISTING_VERSION); + + // Try to record for a valid version - should be classified as BACKUP + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100L); + + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.BACKUP, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100.0, + 1); + } + + @Test + public void testRecordZeroDelay() { + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + // Record with 0 delay (caught up) + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + ReplicaType.FOLLOWER, + ReplicaState.READY_TO_SERVE, + 0L); + + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.CURRENT, + ReplicaType.FOLLOWER, + ReplicaState.READY_TO_SERVE, + 0.0, + 1); + } + + @Test + public void testRecordLargeDelay() { + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + // Record with large delay + long largeDelay = 10000L; + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + ReplicaType.FOLLOWER, + ReplicaState.CATCHING_UP, + largeDelay); + + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.CURRENT, + ReplicaType.FOLLOWER, + ReplicaState.CATCHING_UP, + (double) largeDelay, + 1); + } + + @Test + public void testClassifyVersionWithNonExistingVersionInputReturnsBackup() { + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + assertSame( + HeartbeatOtelStats.classifyVersion(NON_EXISTING_VERSION, heartbeatOtelStats.getVersionInfo()), + VersionRole.BACKUP); + assertSame( + HeartbeatOtelStats.classifyVersion(CURRENT_VERSION, heartbeatOtelStats.getVersionInfo()), + VersionRole.CURRENT); + assertSame( + HeartbeatOtelStats.classifyVersion(FUTURE_VERSION, heartbeatOtelStats.getVersionInfo()), + VersionRole.FUTURE); + assertSame(HeartbeatOtelStats.classifyVersion(10, heartbeatOtelStats.getVersionInfo()), VersionRole.BACKUP); + } + + @Test + public void testDifferentDimensionCombinations() { + heartbeatOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + // Record different combinations + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + CURRENT_VERSION, + REGION_US_WEST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100L); + + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + FUTURE_VERSION, + REGION_US_WEST, + ReplicaType.FOLLOWER, + ReplicaState.CATCHING_UP, + 200L); + + heartbeatOtelStats.recordHeartbeatDelayOtelMetrics( + BACKUP_VERSION, + REGION_US_EAST, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 300L); + + // Verify each combination has its own metric + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.CURRENT, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100.0, + 1); + + validateHeartbeatMetric( + REGION_US_WEST, + VersionRole.FUTURE, + ReplicaType.FOLLOWER, + ReplicaState.CATCHING_UP, + 200.0, + 1); + + validateHeartbeatMetric( + REGION_US_EAST, + VersionRole.BACKUP, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 300.0, + 1); + } + + /** + * Helper method to validate heartbeat histogram metrics + * For single value recordings, min and max equal the value + */ + private void validateHeartbeatMetric( + String region, + VersionRole versionRole, + ReplicaType replicaType, + ReplicaState replicaState, + double expectedValue, + long expectedCount) { + // For single recordings, min/max/sum are all the same value + validateHeartbeatMetric( + region, + versionRole, + replicaType, + replicaState, + expectedValue, + expectedValue, + expectedValue, + expectedCount); + } + + /** + * Helper method to validate heartbeat histogram metrics with explicit min/max/sum + */ + private void validateHeartbeatMetric( + String region, + VersionRole versionRole, + ReplicaType replicaType, + ReplicaState replicaState, + double expectedMin, + double expectedMax, + double expectedSum, + long expectedCount) { + Attributes expectedAttributes = Attributes.builder() + .put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), STORE_NAME) + .put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME) + .put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), region) + .put(VENICE_VERSION_ROLE.getDimensionNameInDefaultFormat(), versionRole.getDimensionValue()) + .put(VENICE_REPLICA_TYPE.getDimensionNameInDefaultFormat(), replicaType.getDimensionValue()) + .put(VENICE_REPLICA_STATE.getDimensionNameInDefaultFormat(), replicaState.getDimensionValue()) + .build(); + + validateExponentialHistogramPointData( + inMemoryMetricReader, + expectedMin, + expectedMax, + expectedCount, + expectedSum, + expectedAttributes, + INGESTION_HEARTBEAT_DELAY.getMetricEntity().getMetricName(), + TEST_PREFIX); + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStatsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStatsTest.java new file mode 100644 index 00000000000..01be2b54830 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStatsTest.java @@ -0,0 +1,198 @@ +package com.linkedin.davinci.stats.ingestion.heartbeat; + +import static com.linkedin.davinci.stats.ServerMetricEntity.INGESTION_HEARTBEAT_DELAY; +import static com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatOtelStats.SERVER_METRIC_ENTITIES; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_STATE; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_TYPE; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_VERSION_ROLE; +import static com.linkedin.venice.utils.OpenTelemetryDataTestUtils.validateExponentialHistogramPointData; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +import com.linkedin.venice.meta.ReadOnlyStoreRepository; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.VersionImpl; +import com.linkedin.venice.meta.VersionStatus; +import com.linkedin.venice.stats.StatsSupplier; +import com.linkedin.venice.stats.VeniceMetricsConfig; +import com.linkedin.venice.stats.VeniceMetricsRepository; +import com.linkedin.venice.stats.dimensions.ReplicaState; +import com.linkedin.venice.stats.dimensions.ReplicaType; +import com.linkedin.venice.stats.dimensions.VersionRole; +import com.linkedin.venice.utils.DataProviderUtils; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.tehuti.metrics.MetricConfig; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +/** + * Unit tests for HeartbeatVersionedStats that verify both Tehuti and OTel metrics + * receive consistent data when recording heartbeat delays. + */ +public class HeartbeatVersionedStatsTest { + private static final String STORE_NAME = "test_store"; + private static final String CLUSTER_NAME = "test_cluster"; + private static final String REGION = "us-west"; + private static final int CURRENT_VERSION = 2; + private static final int FUTURE_VERSION = 3; + private static final String TEST_PREFIX = "test_prefix"; + private static final long FIXED_CURRENT_TIME = 1000000L; + + private InMemoryMetricReader inMemoryMetricReader; + private VeniceMetricsRepository metricsRepository; + private ReadOnlyStoreRepository mockMetadataRepository; + private HeartbeatVersionedStats heartbeatVersionedStats; + private Map>>> leaderMonitors; + private Map>>> followerMonitors; + private Set regions; + + @BeforeMethod + public void setUp() { + inMemoryMetricReader = InMemoryMetricReader.create(); + metricsRepository = new VeniceMetricsRepository( + new VeniceMetricsConfig.Builder().setMetricEntities(SERVER_METRIC_ENTITIES) + .setMetricPrefix(TEST_PREFIX) + .setEmitOtelMetrics(true) + .setOtelAdditionalMetricsReader(inMemoryMetricReader) + .build()); + + mockMetadataRepository = mock(ReadOnlyStoreRepository.class); + + // Setup store and versions + Store mockStore = mock(Store.class); + when(mockStore.getName()).thenReturn(STORE_NAME); + when(mockStore.getCurrentVersion()).thenReturn(CURRENT_VERSION); + + List versions = new ArrayList<>(); + Version currentVersion = new VersionImpl(STORE_NAME, CURRENT_VERSION, "push1"); + currentVersion.setStatus(VersionStatus.ONLINE); + Version futureVersion = new VersionImpl(STORE_NAME, FUTURE_VERSION, "push2"); + futureVersion.setStatus(VersionStatus.STARTED); + versions.add(currentVersion); + versions.add(futureVersion); + when(mockStore.getVersions()).thenReturn(versions); + + when(mockMetadataRepository.getStoreOrThrow(STORE_NAME)).thenReturn(mockStore); + when(mockMetadataRepository.getAllStores()).thenReturn(Collections.singletonList(mockStore)); + + leaderMonitors = new VeniceConcurrentHashMap<>(); + followerMonitors = new VeniceConcurrentHashMap<>(); + leaderMonitors.put(STORE_NAME, new VeniceConcurrentHashMap<>()); + + regions = new HashSet<>(); + regions.add(REGION); + + MetricConfig metricConfig = new MetricConfig(); + Supplier statsInitiator = () -> new HeartbeatStat(metricConfig, regions); + StatsSupplier reporterSupplier = + (repo, storeName, clusterName) -> new HeartbeatStatReporter(repo, storeName, regions); + + heartbeatVersionedStats = new HeartbeatVersionedStats( + metricsRepository, + mockMetadataRepository, + statsInitiator, + reporterSupplier, + leaderMonitors, + followerMonitors, + CLUSTER_NAME); + } + + @Test + public void testRecordLeaderLag() { + heartbeatVersionedStats.setCurrentTimeSupplier(() -> FIXED_CURRENT_TIME); + + // Record multiple leader lags (delays: 100ms, 200ms, 150ms) + heartbeatVersionedStats.recordLeaderLag(STORE_NAME, CURRENT_VERSION, REGION, FIXED_CURRENT_TIME - 100); + heartbeatVersionedStats.recordLeaderLag(STORE_NAME, CURRENT_VERSION, REGION, FIXED_CURRENT_TIME - 200); + heartbeatVersionedStats.recordLeaderLag(STORE_NAME, CURRENT_VERSION, REGION, FIXED_CURRENT_TIME - 150); + + // Verify Tehuti accumulated correctly + HeartbeatStat tehutiStats = heartbeatVersionedStats.getStatsForTesting(STORE_NAME, CURRENT_VERSION); + assertEquals(tehutiStats.getReadyToServeLeaderLag(REGION).getMax(), 200.0, "Tehuti max should be 200ms"); + + // Verify OTel accumulated correctly (min=100, max=200, count=3, sum=450) + validateOtelHistogram(ReplicaType.LEADER, ReplicaState.READY_TO_SERVE, 100.0, 200.0, 3, 450.0); + } + + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) + public void testRecordFollowerLag(boolean isReadyToServe) { + heartbeatVersionedStats.setCurrentTimeSupplier(() -> FIXED_CURRENT_TIME); + + // Record multiple follower lags (delays: 100ms, 200ms, 150ms) + heartbeatVersionedStats + .recordFollowerLag(STORE_NAME, CURRENT_VERSION, REGION, FIXED_CURRENT_TIME - 100, isReadyToServe); + heartbeatVersionedStats + .recordFollowerLag(STORE_NAME, CURRENT_VERSION, REGION, FIXED_CURRENT_TIME - 200, isReadyToServe); + heartbeatVersionedStats + .recordFollowerLag(STORE_NAME, CURRENT_VERSION, REGION, FIXED_CURRENT_TIME - 150, isReadyToServe); + + // Verify Tehuti metrics + HeartbeatStat tehutiStats = heartbeatVersionedStats.getStatsForTesting(STORE_NAME, CURRENT_VERSION); + double readyToServeMax = tehutiStats.getReadyToServeFollowerLag(REGION).getMax(); + double catchingUpMax = tehutiStats.getCatchingUpFollowerLag(REGION).getMax(); + + // Active metric should have max=200ms, squelched metric should be 0 + assertEquals(readyToServeMax, isReadyToServe ? 200.0 : 0.0); + assertEquals(catchingUpMax, isReadyToServe ? 0.0 : 200.0); + + // Verify OTel metrics: active has min=100, max=200, count=3, sum=450; squelched has all 0s + validateOtelHistogram( + ReplicaType.FOLLOWER, + ReplicaState.READY_TO_SERVE, + isReadyToServe ? 100.0 : 0.0, + isReadyToServe ? 200.0 : 0.0, + 3, + isReadyToServe ? 450.0 : 0.0); + validateOtelHistogram( + ReplicaType.FOLLOWER, + ReplicaState.CATCHING_UP, + isReadyToServe ? 0.0 : 100.0, + isReadyToServe ? 0.0 : 200.0, + 3, + isReadyToServe ? 0.0 : 450.0); + } + + private Attributes buildAttributes(ReplicaType replicaType, ReplicaState replicaState) { + return Attributes.builder() + .put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), STORE_NAME) + .put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME) + .put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), REGION) + .put(VENICE_VERSION_ROLE.getDimensionNameInDefaultFormat(), VersionRole.CURRENT.getDimensionValue()) + .put(VENICE_REPLICA_TYPE.getDimensionNameInDefaultFormat(), replicaType.getDimensionValue()) + .put(VENICE_REPLICA_STATE.getDimensionNameInDefaultFormat(), replicaState.getDimensionValue()) + .build(); + } + + private void validateOtelHistogram( + ReplicaType replicaType, + ReplicaState replicaState, + double expectedMin, + double expectedMax, + int expectedCount, + double expectedSum) { + validateExponentialHistogramPointData( + inMemoryMetricReader, + expectedMin, + expectedMax, + expectedCount, + expectedSum, + buildAttributes(replicaType, replicaState), + INGESTION_HEARTBEAT_DELAY.getMetricEntity().getMetricName(), + TEST_PREFIX); + } +} diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceMetricsRepository.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceMetricsRepository.java index 15198286c10..11e623a4bc2 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceMetricsRepository.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceMetricsRepository.java @@ -1,6 +1,7 @@ package com.linkedin.venice.stats; import com.linkedin.venice.stats.metrics.MetricEntity; +import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils; import io.opentelemetry.sdk.metrics.export.MetricReader; import io.tehuti.metrics.JmxReporter; import io.tehuti.metrics.MetricsRepository; @@ -62,12 +63,23 @@ public static VeniceMetricsRepository getVeniceMetricsRepository( String metricPrefix, Collection metricEntities, Map configs) { - VeniceMetricsRepository metricsRepository = new VeniceMetricsRepository( - new VeniceMetricsConfig.Builder().setServiceName(serviceName) - .setMetricPrefix(metricPrefix) - .setMetricEntities(metricEntities) - .extractAndSetOtelConfigs(configs) - .build()); + return getVeniceMetricsRepository(serviceName, metricPrefix, metricEntities, configs, false); + } + + public static VeniceMetricsRepository getVeniceMetricsRepository( + String serviceName, + String metricPrefix, + Collection metricEntities, + Map configs, + boolean useSingleThreadedMetricsRepository) { + VeniceMetricsConfig.Builder configBuilder = new VeniceMetricsConfig.Builder().setServiceName(serviceName) + .setMetricPrefix(metricPrefix) + .setMetricEntities(metricEntities) + .extractAndSetOtelConfigs(configs); + if (useSingleThreadedMetricsRepository) { + configBuilder.setTehutiMetricConfig(MetricsRepositoryUtils.createDefaultSingleThreadedMetricConfig()); + } + VeniceMetricsRepository metricsRepository = new VeniceMetricsRepository(configBuilder.build()); metricsRepository.addReporter(new JmxReporter(serviceName)); return metricsRepository; } diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/ReplicaState.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/ReplicaState.java new file mode 100644 index 00000000000..7eba864aeb1 --- /dev/null +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/ReplicaState.java @@ -0,0 +1,28 @@ +package com.linkedin.venice.stats.dimensions; + +/** + * Dimension to represent the replica state of a Venice storage node. + */ +public enum ReplicaState implements VeniceDimensionInterface { + READY_TO_SERVE, CATCHING_UP; + + private final String replicaState; + + ReplicaState() { + this.replicaState = name().toLowerCase(); + } + + /** + * All the instances of this Enum should have the same dimension name. + * Refer {@link VeniceDimensionInterface#getDimensionName()} for more details. + */ + @Override + public VeniceMetricsDimensions getDimensionName() { + return VeniceMetricsDimensions.VENICE_REPLICA_STATE; + } + + @Override + public String getDimensionValue() { + return this.replicaState; + } +} diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/ReplicaType.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/ReplicaType.java new file mode 100644 index 00000000000..5245bcf941e --- /dev/null +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/ReplicaType.java @@ -0,0 +1,28 @@ +package com.linkedin.venice.stats.dimensions; + +/** + * Dimension to represent the replica type of Venice storage node. + */ +public enum ReplicaType implements VeniceDimensionInterface { + LEADER, FOLLOWER; + + private final String replicaType; + + ReplicaType() { + this.replicaType = name().toLowerCase(); + } + + /** + * All the instances of this Enum should have the same dimension name. + * Refer {@link VeniceDimensionInterface#getDimensionName()} for more details. + */ + @Override + public VeniceMetricsDimensions getDimensionName() { + return VeniceMetricsDimensions.VENICE_REPLICA_TYPE; + } + + @Override + public String getDimensionValue() { + return this.replicaType; + } +} diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensions.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensions.java index 5a684d7c994..7a51cf3f068 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensions.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensions.java @@ -59,7 +59,19 @@ public enum VeniceMetricsDimensions { VENICE_INSTANCE_ERROR_TYPE("venice.instance.error_type"), /** Helix group id number */ - VENICE_HELIX_GROUP_ID("venice.helix_group.id"); + VENICE_HELIX_GROUP_ID("venice.helix_group.id"), + + /** Region/datacenter name */ + VENICE_REGION_NAME("venice.region.name"), + + /** {@link VersionRole} */ + VENICE_VERSION_ROLE("venice.version.role"), + + /** {@link ReplicaType} */ + VENICE_REPLICA_TYPE("venice.replica.type"), + + /** {@link ReplicaState} */ + VENICE_REPLICA_STATE("venice.replica.state"); private final String[] dimensionName = new String[VeniceOpenTelemetryMetricNamingFormat.SIZE]; diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VersionRole.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VersionRole.java new file mode 100644 index 00000000000..aeed972a517 --- /dev/null +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VersionRole.java @@ -0,0 +1,28 @@ +package com.linkedin.venice.stats.dimensions; + +/** + * Role of a store's version: Backup/Current/Future. + */ +public enum VersionRole implements VeniceDimensionInterface { + BACKUP, CURRENT, FUTURE; + + private final String versionRole; + + VersionRole() { + this.versionRole = name().toLowerCase(); + } + + /** + * All the instances of this Enum should have the same dimension name. + * Refer {@link VeniceDimensionInterface#getDimensionName()} for more details. + */ + @Override + public VeniceMetricsDimensions getDimensionName() { + return VeniceMetricsDimensions.VENICE_VERSION_ROLE; + } + + @Override + public String getDimensionValue() { + return this.versionRole; + } +} diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/metrics/MetricsRepositoryUtils.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/metrics/MetricsRepositoryUtils.java index 6c1d8112ab7..2d8d6850565 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/metrics/MetricsRepositoryUtils.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/metrics/MetricsRepositoryUtils.java @@ -67,6 +67,10 @@ public static MetricsRepository createSingleThreadedVeniceMetricsRepository( .build()); } + public static MetricConfig createDefaultSingleThreadedMetricConfig() { + return getMetricConfig(TimeUnit.MINUTES.toMillis(1), 100); + } + public static MetricConfig getMetricConfig( long maxMetricsMeasurementTimeoutMs, long initialMetricsMeasurementTimeoutMs) { diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/VeniceMetricsRepositoryTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/VeniceMetricsRepositoryTest.java index 60cbf8cd44d..144025065bc 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/VeniceMetricsRepositoryTest.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/VeniceMetricsRepositoryTest.java @@ -1,5 +1,6 @@ package com.linkedin.venice.stats; +import static io.tehuti.metrics.stats.AsyncGauge.DEFAULT_ASYNC_GAUGE_EXECUTOR; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -10,9 +11,11 @@ import com.linkedin.venice.stats.metrics.MetricEntity; import com.linkedin.venice.stats.metrics.MetricType; import com.linkedin.venice.stats.metrics.MetricUnit; +import com.linkedin.venice.utils.DataProviderUtils; import io.tehuti.metrics.MetricConfig; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import org.mockito.Mockito; @@ -105,4 +108,25 @@ public void testCloseMethod() { // Verify that close methods are called Mockito.verify(mockOpenTelemetryRepository).close(); } + + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) + public void testGetVeniceMetricsRepositoryWithSingleThreadedConfig(boolean useSingleThreadedMetricsRepository) { + VeniceMetricsRepository repository = VeniceMetricsRepository.getVeniceMetricsRepository( + "test_service", + "test_prefix", + Collections.emptyList(), + Collections.emptyMap(), + useSingleThreadedMetricsRepository); + + assertNotNull(repository, "Repository should not be null"); + assertNotNull(repository.getVeniceMetricsConfig(), "VeniceMetricsConfig should not be null"); + + MetricConfig tehutiConfig = repository.getVeniceMetricsConfig().getTehutiMetricConfig(); + assertNotNull(tehutiConfig, "TehutiMetricConfig should not be null"); + + assertEquals( + tehutiConfig.getAsyncGaugeExecutor().equals(DEFAULT_ASYNC_GAUGE_EXECUTOR), + !useSingleThreadedMetricsRepository); + repository.close(); + } } diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/ReplicaStateTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/ReplicaStateTest.java new file mode 100644 index 00000000000..6ff5f2174fa --- /dev/null +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/ReplicaStateTest.java @@ -0,0 +1,24 @@ +package com.linkedin.venice.stats.dimensions; + +import com.linkedin.venice.utils.CollectionUtils; +import java.util.Map; + + +public class ReplicaStateTest extends VeniceDimensionInterfaceTest { + protected ReplicaStateTest() { + super(ReplicaState.class); + } + + @Override + protected VeniceMetricsDimensions expectedDimensionName() { + return VeniceMetricsDimensions.VENICE_REPLICA_STATE; + } + + @Override + protected Map expectedDimensionValueMapping() { + return CollectionUtils.mapBuilder() + .put(ReplicaState.READY_TO_SERVE, "ready_to_serve") + .put(ReplicaState.CATCHING_UP, "catching_up") + .build(); + } +} diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/ReplicaTypeTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/ReplicaTypeTest.java new file mode 100644 index 00000000000..404480fb546 --- /dev/null +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/ReplicaTypeTest.java @@ -0,0 +1,24 @@ +package com.linkedin.venice.stats.dimensions; + +import com.linkedin.venice.utils.CollectionUtils; +import java.util.Map; + + +public class ReplicaTypeTest extends VeniceDimensionInterfaceTest { + protected ReplicaTypeTest() { + super(ReplicaType.class); + } + + @Override + protected VeniceMetricsDimensions expectedDimensionName() { + return VeniceMetricsDimensions.VENICE_REPLICA_TYPE; + } + + @Override + protected Map expectedDimensionValueMapping() { + return CollectionUtils.mapBuilder() + .put(ReplicaType.LEADER, "leader") + .put(ReplicaType.FOLLOWER, "follower") + .build(); + } +} diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensionsTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensionsTest.java index 025c9c65017..9992e548002 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensionsTest.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensionsTest.java @@ -64,6 +64,18 @@ public void testGetDimensionNameInSnakeCase() { case VENICE_HELIX_GROUP_ID: assertEquals(dimension.getDimensionName(format), "venice.helix_group.id"); break; + case VENICE_REGION_NAME: + assertEquals(dimension.getDimensionName(format), "venice.region.name"); + break; + case VENICE_VERSION_ROLE: + assertEquals(dimension.getDimensionName(format), "venice.version.role"); + break; + case VENICE_REPLICA_TYPE: + assertEquals(dimension.getDimensionName(format), "venice.replica.type"); + break; + case VENICE_REPLICA_STATE: + assertEquals(dimension.getDimensionName(format), "venice.replica.state"); + break; default: throw new IllegalArgumentException("Unknown dimension: " + dimension); } @@ -126,6 +138,18 @@ public void testGetDimensionNameInCamelCase() { case VENICE_HELIX_GROUP_ID: assertEquals(dimension.getDimensionName(format), "venice.helixGroup.id"); break; + case VENICE_REGION_NAME: + assertEquals(dimension.getDimensionName(format), "venice.region.name"); + break; + case VENICE_VERSION_ROLE: + assertEquals(dimension.getDimensionName(format), "venice.version.role"); + break; + case VENICE_REPLICA_TYPE: + assertEquals(dimension.getDimensionName(format), "venice.replica.type"); + break; + case VENICE_REPLICA_STATE: + assertEquals(dimension.getDimensionName(format), "venice.replica.state"); + break; default: throw new IllegalArgumentException("Unknown dimension: " + dimension); } @@ -188,6 +212,18 @@ public void testGetDimensionNameInPascalCase() { case VENICE_HELIX_GROUP_ID: assertEquals(dimension.getDimensionName(format), "Venice.HelixGroup.Id"); break; + case VENICE_REGION_NAME: + assertEquals(dimension.getDimensionName(format), "Venice.Region.Name"); + break; + case VENICE_VERSION_ROLE: + assertEquals(dimension.getDimensionName(format), "Venice.Version.Role"); + break; + case VENICE_REPLICA_TYPE: + assertEquals(dimension.getDimensionName(format), "Venice.Replica.Type"); + break; + case VENICE_REPLICA_STATE: + assertEquals(dimension.getDimensionName(format), "Venice.Replica.State"); + break; default: throw new IllegalArgumentException("Unknown dimension: " + dimension); } diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VersionRoleTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VersionRoleTest.java new file mode 100644 index 00000000000..fd45312dab5 --- /dev/null +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VersionRoleTest.java @@ -0,0 +1,25 @@ +package com.linkedin.venice.stats.dimensions; + +import com.linkedin.venice.utils.CollectionUtils; +import java.util.Map; + + +public class VersionRoleTest extends VeniceDimensionInterfaceTest { + protected VersionRoleTest() { + super(VersionRole.class); + } + + @Override + protected VeniceMetricsDimensions expectedDimensionName() { + return VeniceMetricsDimensions.VENICE_VERSION_ROLE; + } + + @Override + protected Map expectedDimensionValueMapping() { + return CollectionUtils.mapBuilder() + .put(VersionRole.CURRENT, "current") + .put(VersionRole.FUTURE, "future") + .put(VersionRole.BACKUP, "backup") + .build(); + } +} diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/metrics/ModuleMetricEntityInterfaceTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/metrics/ModuleMetricEntityInterfaceTest.java index 46c1693b30c..fb9a093add4 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/metrics/ModuleMetricEntityInterfaceTest.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/metrics/ModuleMetricEntityInterfaceTest.java @@ -32,7 +32,7 @@ public void testNoMetricsFound() { @Test public void testSingleEnum() { - Collection metrics = ModuleMetricEntityInterface.getUniqueMetricEntities(SingleEnum.class); + Collection metrics = ModuleMetricEntityInterface.getUniqueMetricEntities(SingleEnumForTest.class); assertEquals(metrics.size(), 1); MetricEntity m = metrics.iterator().next(); @@ -81,12 +81,12 @@ public MetricEntity getMetricEntity() { } } - private enum SingleEnum implements ModuleMetricEntityInterface { + public enum SingleEnumForTest implements ModuleMetricEntityInterface { ENUM_ONE(new MetricEntity("enum_one", MetricType.COUNTER, MetricUnit.NUMBER, TEST_DESC, TEST_DIMENSIONS)); - private final MetricEntity metric; + private final transient MetricEntity metric; - SingleEnum(MetricEntity m) { + SingleEnumForTest(MetricEntity m) { this.metric = m; } @@ -100,7 +100,7 @@ private enum TwoDistinctEnum implements ModuleMetricEntityInterface { ENUM_ONE(new MetricEntity("enum_one", MetricType.COUNTER, MetricUnit.NUMBER, TEST_DESC, TEST_DIMENSIONS)), ENUM_TWO(new MetricEntity("enum_two", MetricType.HISTOGRAM, MetricUnit.MILLISECOND, TEST_DESC, TEST_DIMENSIONS)); - private final MetricEntity metric; + private final transient MetricEntity metric; TwoDistinctEnum(MetricEntity m) { this.metric = m; @@ -116,7 +116,7 @@ private enum DuplicateEnum implements ModuleMetricEntityInterface { ENUM_ONE(new MetricEntity("enum_dup", MetricType.COUNTER, MetricUnit.NUMBER, TEST_DESC, TEST_DIMENSIONS)), ENUM_TWO(new MetricEntity("enum_dup", MetricType.COUNTER, MetricUnit.NUMBER, TEST_DESC, TEST_DIMENSIONS)); - private final MetricEntity metric; + private final transient MetricEntity metric; DuplicateEnum(MetricEntity m) { this.metric = m; @@ -136,7 +136,7 @@ private enum ConflictEnum implements ModuleMetricEntityInterface { new MetricEntity("enum_conflict", MetricType.HISTOGRAM, MetricUnit.NUMBER, TEST_DESC, TEST_DIMENSIONS) ); - private final MetricEntity metric; + private final transient MetricEntity metric; ConflictEnum(MetricEntity m) { this.metric = m; diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/utils/metrics/MetricsRepositoryUtilsTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/utils/metrics/MetricsRepositoryUtilsTest.java new file mode 100644 index 00000000000..7d36c141f45 --- /dev/null +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/utils/metrics/MetricsRepositoryUtilsTest.java @@ -0,0 +1,80 @@ +package com.linkedin.venice.utils.metrics; + +import static com.linkedin.venice.stats.metrics.ModuleMetricEntityInterface.getUniqueMetricEntities; +import static org.testng.Assert.*; + +import com.linkedin.venice.stats.VeniceMetricsRepository; +import com.linkedin.venice.stats.VeniceOpenTelemetryMetricNamingFormat; +import com.linkedin.venice.stats.metrics.ModuleMetricEntityInterfaceTest; +import com.linkedin.venice.utils.DataProviderUtils; +import io.tehuti.metrics.MetricConfig; +import io.tehuti.metrics.MetricsRepository; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.testng.annotations.Test; + + +public class MetricsRepositoryUtilsTest { + @Test + public void testCreateSingleThreadedMetricsRepositoryWithCustomTimeouts() { + long maxTimeout = TimeUnit.MINUTES.toMillis(2); + long initialTimeout = 200; + assertNotNull(MetricsRepositoryUtils.createSingleThreadedMetricsRepository(maxTimeout, initialTimeout)); + } + + @Test + public void testCreateSingleThreadedVeniceMetricsRepository() { + MetricsRepository repository = MetricsRepositoryUtils.createSingleThreadedVeniceMetricsRepository(); + + assertNotNull(repository, "Repository should not be null"); + assertTrue(repository instanceof VeniceMetricsRepository, "Should return a VeniceMetricsRepository instance"); + + VeniceMetricsRepository veniceRepo = (VeniceMetricsRepository) repository; + assertFalse(veniceRepo.getVeniceMetricsConfig().emitOtelMetrics(), "OTel should be disabled by default"); + } + + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) + public void testCreateSingleThreadedVeniceMetricsRepositoryWithAllParameters(boolean otelEnabled) { + long maxTimeout = TimeUnit.MINUTES.toMillis(5); + long initialTimeout = 500; + + try { + MetricsRepositoryUtils.createSingleThreadedVeniceMetricsRepository( + maxTimeout, + initialTimeout, + otelEnabled, + VeniceOpenTelemetryMetricNamingFormat.SNAKE_CASE, + Collections.emptyList()); + if (otelEnabled) { + fail(); + } + } catch (Exception e) { + if (!otelEnabled) { + fail(); + } + assertTrue(e.getCause() instanceof IllegalArgumentException); + assertTrue(e.getCause().getMessage().contains("metricEntities cannot be empty")); + } + + // with valid MetricEntities + MetricsRepository repository = MetricsRepositoryUtils.createSingleThreadedVeniceMetricsRepository( + maxTimeout, + initialTimeout, + otelEnabled, + VeniceOpenTelemetryMetricNamingFormat.SNAKE_CASE, + getUniqueMetricEntities(ModuleMetricEntityInterfaceTest.SingleEnumForTest.class)); + + assertNotNull(repository, "Repository should not be null"); + assertTrue(repository instanceof VeniceMetricsRepository, "Should return a VeniceMetricsRepository instance"); + + VeniceMetricsRepository veniceRepo = (VeniceMetricsRepository) repository; + assertEquals(veniceRepo.getVeniceMetricsConfig().emitOtelMetrics(), otelEnabled); + } + + @Test + public void testCreateDefaultSingleThreadedMetricConfig() { + MetricConfig config = MetricsRepositoryUtils.createDefaultSingleThreadedMetricConfig(); + assertNotNull(config, "Config should not be null"); + assertNotNull(config.getAsyncGaugeExecutor(), "AsyncGaugeExecutor should not be null"); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 3be643cb821..9c8302acc56 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -3,6 +3,7 @@ import com.linkedin.venice.pubsub.PubSubConstants; import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerConfig; import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig; +import com.linkedin.venice.stats.dimensions.VersionRole; public class ConfigKeys { @@ -2676,7 +2677,7 @@ private ConfigKeys() { /** * Server configs to enable the topic partition re-subscription during ingestion to let bottom ingestion service aware * of store version's ingestion context changed (workload type {#@link PartitionReplicaIngestionContext.WorkloadType} or - * {#@link VersionRole.WorkloadType} version role changed). + * {@link VersionRole} version role changed). */ public static final String SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED = "server.resubscription.triggered.by.version.ingestion.context.change.enabled"; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java index ddc5291bff7..9de4a5d2754 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java @@ -1,5 +1,6 @@ package com.linkedin.venice.integration.utils; +import static com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatOtelStats.SERVER_METRIC_ENTITIES; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_BLOCK_CACHE_SIZE_IN_BYTES; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_OPTIONS_USE_DIRECT_READS; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; @@ -46,6 +47,7 @@ import static com.linkedin.venice.ConfigKeys.SYSTEM_SCHEMA_INITIALIZATION_AT_START_TIME_ENABLED; import static com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper.addKafkaClusterIDMappingToServerConfigs; import static com.linkedin.venice.meta.PersistenceType.ROCKS_DB; +import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_VENICE_METRICS_ENABLED; import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.ingestion.utils.IngestionTaskReusableObjects; @@ -59,6 +61,7 @@ import com.linkedin.venice.server.VeniceServer; import com.linkedin.venice.server.VeniceServerContext; import com.linkedin.venice.servicediscovery.ServiceDiscoveryAnnouncer; +import com.linkedin.venice.stats.VeniceMetricsRepository; import com.linkedin.venice.tehuti.MetricsAware; import com.linkedin.venice.tehuti.MockTehutiReporter; import com.linkedin.venice.utils.ForkedJavaProcess; @@ -69,6 +72,7 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.tehuti.Metric; import io.tehuti.metrics.MetricsRepository; import java.io.File; @@ -99,6 +103,7 @@ public class VeniceServerWrapper extends ProcessWrapper implements MetricsAware { private static final Logger LOGGER = LogManager.getLogger(VeniceServerWrapper.class); public static final String SERVICE_NAME = VeniceComponent.SERVER.getName(); + private static final String SERVICE_METRIC_PREFIX = "server"; /** * Possible config options which are not included in {@link com.linkedin.venice.ConfigKeys}. @@ -288,7 +293,8 @@ static StatefulServiceProvider generateService( SERVER_INGESTION_TASK_REUSABLE_OBJECTS_STRATEGY, IngestionTaskReusableObjects.Strategy.SINGLETON_THREAD_LOCAL) .put(SERVER_RESUBSCRIPTION_CHECK_INTERVAL_IN_SECONDS, 1) - .put(SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_STARTUP, serverDeleteUnassignedPartitionsOnStartup); + .put(SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_STARTUP, serverDeleteUnassignedPartitionsOnStartup) + .put(OTEL_VENICE_METRICS_ENABLED, Boolean.TRUE.toString()); if (sslToKafka) { serverPropsBuilder.put(PUBSUB_SECURITY_PROTOCOL_LEGACY, PubSubSecurityProtocol.SSL.name()); serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); @@ -361,7 +367,7 @@ static StatefulServiceProvider generateService( VeniceServerContext.Builder serverContextBuilder = new VeniceServerContext.Builder().setVeniceConfigLoader(veniceConfigLoader) - .setMetricsRepository(MetricsRepositoryUtils.createSingleThreadedMetricsRepository()) + .setMetricsRepository(getVeniceMetricRepositoryForServer(serverProps)) .setSslFactory(sslFactory) .setClientConfigForConsumer(consumerClientConfig) .setServiceDiscoveryAnnouncers(d2Servers) @@ -525,7 +531,7 @@ protected void newProcess() throws Exception { this.veniceServer = new TestVeniceServer( new VeniceServerContext.Builder().setVeniceConfigLoader(config) - .setMetricsRepository(MetricsRepositoryUtils.createSingleThreadedMetricsRepository()) + .setMetricsRepository(getVeniceMetricRepositoryForServer(serverProps)) .setD2Client(D2TestUtils.getAndStartD2Client(zkAddress)) .setSslFactory(sslFactory) .setClientConfigForConsumer(consumerClientConfig) @@ -559,6 +565,18 @@ public LogContext getComponentTagForLogging() { .build(); } + private static VeniceMetricsRepository getVeniceMetricRepositoryForServer(VeniceProperties serverProps) { + InMemoryMetricReader inMemoryMetricReader = InMemoryMetricReader.create(); + VeniceMetricsRepository veniceMetricsRepository = VeniceMetricsRepository.getVeniceMetricsRepository( + SERVICE_NAME, + SERVICE_METRIC_PREFIX, + SERVER_METRIC_ENTITIES, + serverProps.getAsMap(), + true); + veniceMetricsRepository.getVeniceMetricsConfig().setOtelAdditionalMetricsReader(inMemoryMetricReader); + return veniceMetricsRepository; + } + public static void main(String args[]) throws Exception { // parse the inputs LOGGER.info("VeniceServer args: {}", Arrays.toString(args)); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/KafkaConsumptionTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/KafkaConsumptionTest.java index 1a4a1597008..5cfc81dc8c8 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/KafkaConsumptionTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/KafkaConsumptionTest.java @@ -42,6 +42,7 @@ import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.pubsub.manager.TopicManager; import com.linkedin.venice.stats.TehutiUtils; +import com.linkedin.venice.stats.dimensions.VersionRole; import com.linkedin.venice.throttle.EventThrottler; import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.IntegrationTestPushUtils; @@ -212,7 +213,7 @@ public void testLocalAndRemoteConsumption(KafkaConsumerService.ConsumerAssignmen PartitionReplicaIngestionContext partitionReplicaIngestionContext = new PartitionReplicaIngestionContext( versionTopic, pubSubTopicPartition, - PartitionReplicaIngestionContext.VersionRole.CURRENT, + VersionRole.CURRENT, PartitionReplicaIngestionContext.WorkloadType.AA_OR_WRITE_COMPUTE); StorePartitionDataReceiver localDataReceiver = (StorePartitionDataReceiver) aggKafkaConsumerService.subscribeConsumerFor( diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/OpenTelemetryDataTestUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/OpenTelemetryDataTestUtils.java index c0394d26d79..429973408e7 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/OpenTelemetryDataTestUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/OpenTelemetryDataTestUtils.java @@ -267,10 +267,11 @@ public static void validateExponentialHistogramPointData( assertEquals(histogramPointData.getMax(), expectedMax, "Histogram max value should be " + expectedMax); assertEquals(histogramPointData.getCount(), expectedCount, "Histogram count should be " + expectedCount); assertEquals(histogramPointData.getSum(), expectedSum, "Histogram sum should be " + expectedSum); + long expectedNumPositiveBuckets = (expectedMin > 0 || expectedMax > 0) ? expectedCount : 0; assertEquals( histogramPointData.getPositiveBuckets().getTotalCount(), - expectedCount, - "Histogram positive buckets total count should be " + expectedCount); + expectedNumPositiveBuckets, + "Histogram positive buckets total count should be " + expectedNumPositiveBuckets); assertEquals(histogramPointData.getAttributes(), expectedAttributes, "Histogram attributes should match"); } diff --git a/services/venice-server/build.gradle b/services/venice-server/build.gradle index 9b8c4d50ec3..b3ba5535fef 100644 --- a/services/venice-server/build.gradle +++ b/services/venice-server/build.gradle @@ -59,10 +59,12 @@ dependencies { implementation libraries.tehuti // It's necessary to pull in the most recent version of zkclient explicitly, otherwise Helix won't have it... implementation libraries.zkclient + implementation libraries.opentelemetryApi testImplementation libraries.fastUtil testImplementation project(':internal:alpini:router:alpini-router-api') testImplementation project(':services:venice-router') + testImplementation libraries.openTelemetryTestSdk } jar {