Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.stats.dimensions.VersionRole;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -321,7 +322,7 @@ public CurrentVersionConsumerPoolStrategy() {
@Override
public KafkaConsumerService delegateKafkaConsumerServiceFor(
PartitionReplicaIngestionContext topicPartitionReplicaRole) {
PartitionReplicaIngestionContext.VersionRole versionRole = topicPartitionReplicaRole.getVersionRole();
VersionRole versionRole = topicPartitionReplicaRole.getVersionRole();
PartitionReplicaIngestionContext.WorkloadType workloadType = topicPartitionReplicaRole.getWorkloadType();
PubSubTopicPartition pubSubTopicPartition = topicPartitionReplicaRole.getPubSubTopicPartition();
PubSubTopic pubSubTopic = pubSubTopicPartition.getPubSubTopic();
Expand All @@ -331,8 +332,7 @@ public KafkaConsumerService delegateKafkaConsumerServiceFor(
* 2. If the workload type is active-active leader and write-computer leader.
* 3. If the replica is ready to serve.
*/
if (versionRole.equals(PartitionReplicaIngestionContext.VersionRole.CURRENT)
&& topicPartitionReplicaRole.isReadyToServe()) {
if (versionRole.equals(VersionRole.CURRENT) && topicPartitionReplicaRole.isReadyToServe()) {
if (workloadType.equals(PartitionReplicaIngestionContext.WorkloadType.AA_OR_WRITE_COMPUTE)
&& pubSubTopic.isRealTime()) {
return pubSubTopic.isSeparateRealTimeTopic()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.stats.dimensions.VersionRole;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -22,10 +23,6 @@ public class PartitionReplicaIngestionContext {
private final WorkloadType workloadType;
private final boolean isReadyToServe;

public enum VersionRole {
CURRENT, BACKUP, FUTURE
}

// TODO: Add more workload types if needed, here we only care about active active or write compute workload.
public enum WorkloadType {
AA_OR_WRITE_COMPUTE, NON_AA_OR_WRITE_COMPUTE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import com.linkedin.venice.serializer.AvroGenericDeserializer;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.stats.dimensions.VersionRole;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.system.store.MetaStoreWriter;
import com.linkedin.venice.utils.ByteUtils;
Expand Down Expand Up @@ -404,7 +405,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
private final String[] msgForLagMeasurement;
protected final AtomicBoolean recordLevelMetricEnabled;
protected final boolean isGlobalRtDivEnabled;
protected volatile PartitionReplicaIngestionContext.VersionRole versionRole;
protected volatile VersionRole versionRole;
protected volatile PartitionReplicaIngestionContext.WorkloadType workloadType;
protected final boolean batchReportIncPushStatusEnabled;

Expand Down Expand Up @@ -1679,7 +1680,7 @@ protected void refreshIngestionContextIfChanged(Store store) throws InterruptedE
}

boolean isWriteComputeEnabled = store.isWriteComputationEnabled();
PartitionReplicaIngestionContext.VersionRole newVersionRole =
VersionRole newVersionRole =
PartitionReplicaIngestionContext.determineStoreVersionRole(versionNumber, currentVersionNumber);
PartitionReplicaIngestionContext.WorkloadType newWorkloadType =
PartitionReplicaIngestionContext.determineWorkloadType(isActiveActiveReplicationEnabled, isWriteComputeEnabled);
Expand Down Expand Up @@ -5013,7 +5014,7 @@ Lazy<CountDownLatch> getGracefulShutdownLatch() {
}

// For unit test purpose.
void setVersionRole(PartitionReplicaIngestionContext.VersionRole versionRole) {
void setVersionRole(VersionRole versionRole) {
this.versionRole = versionRole;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public abstract class AbstractVeniceAggVersionedStats<STATS, STATS_REPORTER exte
private final Map<String, VeniceVersionedStats<STATS, STATS_REPORTER>> aggStats;
private final boolean unregisterMetricForDeletedStoreEnabled;

protected MetricsRepository getMetricsRepository() {
return metricsRepository;
}

public AbstractVeniceAggVersionedStats(
MetricsRepository metricsRepository,
ReadOnlyStoreRepository metadataRepository,
Expand Down Expand Up @@ -135,6 +139,9 @@ protected void updateStatsVersionInfo(String storeName, List<Version> existingVe
versionedStats.setFutureVersion(futureVersion);
}

// Notify subclasses that version info has changed
onVersionInfoUpdated(storeName, versionedStats.getCurrentVersion(), versionedStats.getFutureVersion());

/**
* Since versions are changed, update the total stats accordingly.
*/
Expand Down Expand Up @@ -186,4 +193,16 @@ protected int getCurrentVersion(String storeName) {
protected void updateTotalStats(String storeName) {
// no-op
}

/**
* Hook method called when version info is updated for a store.
* Subclasses can override this to react to version changes.
*
* @param storeName The store whose version info changed
* @param currentVersion The new current version
* @param futureVersion The new future version
*/
protected void onVersionInfoUpdated(String storeName, int currentVersion, int futureVersion) {
// no-op by default
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.linkedin.davinci.stats;

import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_STATE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_TYPE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_VERSION_ROLE;
import static com.linkedin.venice.utils.Utils.setOf;

import com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions;
import com.linkedin.venice.stats.metrics.MetricEntity;
import com.linkedin.venice.stats.metrics.MetricType;
import com.linkedin.venice.stats.metrics.MetricUnit;
import com.linkedin.venice.stats.metrics.ModuleMetricEntityInterface;
import java.util.Set;


/**
* List all metric entities for Venice server (storage node).
*/
public enum ServerMetricEntity implements ModuleMetricEntityInterface {
/**
* Heartbeat replication delay: Tracks nearline replication lag in milliseconds.
*/
INGESTION_HEARTBEAT_DELAY(
"ingestion.replication.heartbeat.delay", MetricType.HISTOGRAM, MetricUnit.MILLISECOND,
"Nearline ingestion replication lag",
setOf(
VENICE_STORE_NAME,
VENICE_CLUSTER_NAME,
VENICE_REGION_NAME,
VENICE_VERSION_ROLE,
VENICE_REPLICA_TYPE,
VENICE_REPLICA_STATE)
);

private final MetricEntity metricEntity;

ServerMetricEntity(
String name,
MetricType metricType,
MetricUnit unit,
String description,
Set<VeniceMetricsDimensions> dimensionsList) {
this.metricEntity = new MetricEntity(name, metricType, unit, description, dimensionsList);
}

@Override
public MetricEntity getMetricEntity() {
return metricEntity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public HeartbeatMonitoringService(
storeName,
regionNames),
leaderHeartbeatTimeStamps,
followerHeartbeatTimeStamps);
followerHeartbeatTimeStamps,
serverConfig.getClusterName());
this.heartbeatMonitoringServiceStats = heartbeatMonitoringServiceStats;
this.customizedViewRepositoryFuture = customizedViewRepositoryFuture;
this.nodeId = Utils.getHelixNodeIdentifier(serverConfig.getListenerHostname(), serverConfig.getListenerPort());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package com.linkedin.davinci.stats.ingestion.heartbeat;

import static com.linkedin.davinci.stats.ServerMetricEntity.INGESTION_HEARTBEAT_DELAY;
import static com.linkedin.venice.meta.Store.NON_EXISTING_VERSION;
import static com.linkedin.venice.stats.metrics.ModuleMetricEntityInterface.getUniqueMetricEntities;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.davinci.stats.ServerMetricEntity;
import com.linkedin.venice.stats.OpenTelemetryMetricsSetup;
import com.linkedin.venice.stats.VeniceOpenTelemetryMetricsRepository;
import com.linkedin.venice.stats.dimensions.ReplicaState;
import com.linkedin.venice.stats.dimensions.ReplicaType;
import com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions;
import com.linkedin.venice.stats.dimensions.VersionRole;
import com.linkedin.venice.stats.metrics.MetricEntity;
import com.linkedin.venice.stats.metrics.MetricEntityStateThreeEnums;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;


/**
* OpenTelemetry metrics for heartbeat monitoring.
* Note: Tehuti metrics are managed separately in {@link HeartbeatStatReporter}.
*/
public class HeartbeatOtelStats {
public static final Collection<MetricEntity> SERVER_METRIC_ENTITIES =
getUniqueMetricEntities(ServerMetricEntity.class);
private final boolean emitOtelMetrics;
private final VeniceOpenTelemetryMetricsRepository otelRepository;
private final Map<VeniceMetricsDimensions, String> baseDimensionsMap;

// Per-region metric entity states
private final Map<String, MetricEntityStateThreeEnums<VersionRole, ReplicaType, ReplicaState>> metricsByRegion;

private static class VersionInfo {
private final int currentVersion;
private final int futureVersion;

VersionInfo(int currentVersion, int futureVersion) {
this.currentVersion = currentVersion;
this.futureVersion = futureVersion;
}
}

private volatile VersionInfo versionInfo = new VersionInfo(NON_EXISTING_VERSION, NON_EXISTING_VERSION);

public HeartbeatOtelStats(MetricsRepository metricsRepository, String storeName, String clusterName) {
this.metricsByRegion = new VeniceConcurrentHashMap<>();

OpenTelemetryMetricsSetup.OpenTelemetryMetricsSetupInfo otelSetup =
OpenTelemetryMetricsSetup.builder(metricsRepository)
.setStoreName(storeName)
.setClusterName(clusterName)
.build();

this.emitOtelMetrics = otelSetup.emitOpenTelemetryMetrics();
this.otelRepository = otelSetup.getOtelRepository();
this.baseDimensionsMap = otelSetup.getBaseDimensionsMap();
}

/**
* Returns true if OTel metrics are emitted.
*/
public boolean emitOtelMetrics() {
return emitOtelMetrics;
}

/**
* Updates the current and future version for this store.
*
* @param currentVersion The current serving version
* @param futureVersion The future/upcoming version
*/
public void updateVersionInfo(int currentVersion, int futureVersion) {
this.versionInfo = new VersionInfo(currentVersion, futureVersion);
}

/**
* Records a heartbeat delay with all dimensional attributes to OTel metrics.
* Returns early if OTel metrics are disabled or version is invalid.
*
* @param version The version number
* @param region The region name
* @param replicaType The replica type {@link ReplicaType}
* @param replicaState The replica state {@link ReplicaState}
* @param delayMs The delay in milliseconds
*/
public void recordHeartbeatDelayOtelMetrics(
int version,
String region,
ReplicaType replicaType,
ReplicaState replicaState,
long delayMs) {
if (!emitOtelMetrics()) {
return;
}
VersionRole versionRole = classifyVersion(version, this.versionInfo);

MetricEntityStateThreeEnums<VersionRole, ReplicaType, ReplicaState> metricState = getOrCreateMetricState(region);

// Records to OTel metrics only
metricState.record(delayMs, versionRole, replicaType, replicaState);
}

/**
* Gets or creates a metric entity state for a specific region.
*/
private MetricEntityStateThreeEnums<VersionRole, ReplicaType, ReplicaState> getOrCreateMetricState(String region) {
return metricsByRegion.computeIfAbsent(region, r -> {
// Add region to base dimensions
Map<VeniceMetricsDimensions, String> regionBaseDimensions = new HashMap<>(baseDimensionsMap);
regionBaseDimensions.put(VeniceMetricsDimensions.VENICE_REGION_NAME, r);

return MetricEntityStateThreeEnums.create(
INGESTION_HEARTBEAT_DELAY.getMetricEntity(),
otelRepository,
regionBaseDimensions,
VersionRole.class,
ReplicaType.class,
ReplicaState.class);
});
}

/**
* Classifies a version as CURRENT or FUTURE or BACKUP
*
* @param version The version number to classify
* @param versionInfo The current/future version (cached)
* @return {@link VersionRole}
*/
static VersionRole classifyVersion(int version, VersionInfo versionInfo) {
if (version == versionInfo.currentVersion) {
return VersionRole.CURRENT;
} else if (version == versionInfo.futureVersion) {
return VersionRole.FUTURE;
}
return VersionRole.BACKUP;
}

@VisibleForTesting
public VersionInfo getVersionInfo() {
return versionInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,37 @@ public HeartbeatStat(MetricConfig metricConfig, Set<String> regions) {
defaultSensor = new WritePathLatencySensor(localRepository, metricConfig, "default-");
}

public void recordReadyToServeLeaderLag(String region, long startTime) {
long endTime = System.currentTimeMillis();
readyToServeLeaderSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime);
/**
* Records the heartbeat lag for a ready-to-serve leader replica.
*
* @param region The region name
* @param delay The pre-calculated delay in milliseconds
* @param endTime The pre-calculated end time
*/
public void recordReadyToServeLeaderLag(String region, long delay, long endTime) {
readyToServeLeaderSensors.computeIfAbsent(region, k -> defaultSensor).record(delay, endTime);
}

public void recordReadyToServeFollowerLag(String region, long startTime) {
long endTime = System.currentTimeMillis();
readyToServeFollowerSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime);
/**
* Records the heartbeat lag for a ready-to-serve follower replica.
*
* @param region The region name
* @param delay The pre-calculated delay in milliseconds
* @param endTime The pre-calculated end time
*/
public void recordReadyToServeFollowerLag(String region, long delay, long endTime) {
readyToServeFollowerSensors.computeIfAbsent(region, k -> defaultSensor).record(delay, endTime);
}

public void recordCatchingUpFollowerLag(String region, long startTime) {
long endTime = System.currentTimeMillis();
catchingUpFollowerSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime);
/**
* Records the heartbeat lag for a catching-up follower replica.
*
* @param region The region name
* @param delay The pre-calculated delay in milliseconds (0 for squelching)
* @param endTime The pre-calculated end time
*/
public void recordCatchingUpFollowerLag(String region, long delay, long endTime) {
catchingUpFollowerSensors.computeIfAbsent(region, k -> defaultSensor).record(delay, endTime);
}

public WritePathLatencySensor getReadyToServeLeaderLag(String region) {
Expand Down
Loading
Loading