diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 1e896333157..8b6b80c3357 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -24,9 +24,9 @@ import com.linkedin.davinci.ingestion.main.MainIngestionStorageMetadataService; import com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils; import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; -import com.linkedin.davinci.kafka.consumer.StoreIngestionService; import com.linkedin.davinci.notifier.VeniceNotifier; import com.linkedin.davinci.repository.VeniceMetadataRepositoryBuilder; +import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.stats.AggVersionedStorageEngineStats; import com.linkedin.davinci.stats.HeartbeatMonitoringServiceStats; @@ -139,6 +139,7 @@ public enum ClientType { private final boolean writeBatchingPushStatus; private final HeartbeatMonitoringService heartbeatMonitoringService; private AggVersionedBlobTransferStats aggVersionedBlobTransferStats; + private AggBlobTransferStats aggBlobTransferStats; public DaVinciBackend( ClientConfig clientConfig, @@ -327,7 +328,8 @@ public DaVinciBackend( if (BlobTransferUtils.isBlobTransferManagerEnabled(backendConfig, isIsolatedIngestion())) { aggVersionedBlobTransferStats = new AggVersionedBlobTransferStats(metricsRepository, storeRepository, configLoader.getVeniceServerConfig()); - + aggBlobTransferStats = + new AggBlobTransferStats(aggVersionedBlobTransferStats, ingestionService.getHostLevelIngestionStats()); P2PBlobTransferConfig p2PBlobTransferConfig = new P2PBlobTransferConfig( configLoader.getVeniceServerConfig().getDvcP2pBlobTransferServerPort(), configLoader.getVeniceServerConfig().getDvcP2pBlobTransferClientPort(), @@ -351,13 +353,14 @@ public DaVinciBackend( .setStorageMetadataService(storageMetadataService) .setReadOnlyStoreRepository(readOnlyStoreRepository) .setStorageEngineRepository(storageService.getStorageEngineRepository()) - .setAggVersionedBlobTransferStats(aggVersionedBlobTransferStats) + .setAggBlobTransferStats(aggBlobTransferStats) .setBlobTransferSSLFactory(BlobTransferUtils.createSSLFactoryForBlobTransferInDVC(configLoader)) .setBlobTransferAclHandler(BlobTransferUtils.createAclHandler(configLoader)) .setPushStatusNotifierSupplier(() -> ingestionListener) .build(); } else { aggVersionedBlobTransferStats = null; + aggBlobTransferStats = null; blobTransferManager = null; } @@ -573,7 +576,7 @@ final StorageService getStorageService() { return storageService; } - final StoreIngestionService getIngestionService() { + final KafkaStoreIngestionService getIngestionService() { return ingestionService; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java index db795ffed82..8abf878a1f9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java @@ -5,7 +5,7 @@ import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient; import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService; import com.linkedin.davinci.notifier.VeniceNotifier; -import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; +import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.blobtransfer.BlobFinder; @@ -36,7 +36,7 @@ public class BlobTransferManagerBuilder { private StorageMetadataService storageMetadataService; private ReadOnlyStoreRepository readOnlyStoreRepository; private StorageEngineRepository storageEngineRepository; - private AggVersionedBlobTransferStats aggVersionedBlobTransferStats; + private AggBlobTransferStats aggBlobTransferStats; private Optional sslFactory; private Optional aclHandler; private VeniceAdaptiveBlobTransferTrafficThrottler adaptiveBlobTransferWriteTrafficThrottler; @@ -77,9 +77,8 @@ public BlobTransferManagerBuilder setStorageEngineRepository(StorageEngineReposi return this; } - public BlobTransferManagerBuilder setAggVersionedBlobTransferStats( - AggVersionedBlobTransferStats aggVersionedBlobTransferStats) { - this.aggVersionedBlobTransferStats = aggVersionedBlobTransferStats; + public BlobTransferManagerBuilder setAggBlobTransferStats(AggBlobTransferStats aggBlobTransferStats) { + this.aggBlobTransferStats = aggBlobTransferStats; return this; } @@ -112,6 +111,10 @@ public BlobTransferManagerBuilder setPushStatusNotifierSupplier(Supplier build() { try { validateFields(); @@ -150,6 +153,7 @@ public BlobTransferManager build() { blobTransferConfig.getBlobTransferMaxTimeoutInMin(), blobSnapshotManager, globalTrafficHandler, + getAggBlobTransferStats(), sslFactory, aclHandler, blobTransferConfig.getMaxConcurrentSnapshotUser()), @@ -161,11 +165,12 @@ public BlobTransferManager build() { blobTransferConfig.getBlobReceiveTimeoutInMin(), blobTransferConfig.getBlobReceiveReaderIdleTimeInSeconds(), globalTrafficHandler, + getAggBlobTransferStats(), sslFactory, veniceNotifier), blobFinder, blobTransferConfig.getBaseDir(), - aggVersionedBlobTransferStats, + getAggBlobTransferStats().getAggVersionedBlobTransferStats(), blobTransferConfig.getMaxConcurrentBlobReceiveReplicas()); // start the P2P blob transfer manager @@ -191,7 +196,7 @@ private void validateFields() { } if (blobTransferConfig == null || storageMetadataService == null || readOnlyStoreRepository == null - || storageEngineRepository == null || aggVersionedBlobTransferStats == null) { + || storageEngineRepository == null || aggBlobTransferStats == null) { throw new IllegalArgumentException( "The blob transfer config, storage metadata service, read only store repository, storage engine repository, " + "and agg versioned blob transfer stats must not be null"); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java index f1c69fa1f26..c8c4256d64e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java @@ -5,6 +5,7 @@ import com.linkedin.davinci.blobtransfer.BlobTransferUtils; import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferTableFormat; import com.linkedin.davinci.notifier.VeniceNotifier; +import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.exceptions.VenicePeersConnectionException; import com.linkedin.venice.listener.VerifySslHandler; @@ -78,6 +79,7 @@ public class NettyFileTransferClient { private final VeniceConcurrentHashMap unconnectableHostsToTimestamp = new VeniceConcurrentHashMap<>(); private final VeniceConcurrentHashMap connectedHostsToTimestamp = new VeniceConcurrentHashMap<>(); private final Supplier notifierSupplier; + private final AggBlobTransferStats aggBlobTransferStats; private final VerifySslHandler verifySsl = new VerifySslHandler(); @@ -90,6 +92,7 @@ public NettyFileTransferClient( int blobReceiveTimeoutInMin, int blobReceiveReaderIdleTimeInSeconds, GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler, + AggBlobTransferStats aggBlobTransferStats, Optional sslFactory, Supplier notifierSupplier) { this.baseDir = baseDir; @@ -99,6 +102,7 @@ public NettyFileTransferClient( this.peersConnectivityFreshnessInSeconds = peersConnectivityFreshnessInSeconds; this.blobReceiveTimeoutInMin = blobReceiveTimeoutInMin; this.blobReceiveReaderIdleTimeInSeconds = blobReceiveReaderIdleTimeInSeconds; + this.aggBlobTransferStats = aggBlobTransferStats; clientBootstrap = new Bootstrap(); workerGroup = new NioEventLoopGroup(); @@ -308,6 +312,7 @@ public CompletionStage get( version, partition, requestedTableFormat, + aggBlobTransferStats, checksumValidationExecutorService)) .addLast( new P2PMetadataTransferHandler( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java index b018be08a8f..023f48aade2 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java @@ -5,6 +5,7 @@ import com.linkedin.davinci.blobtransfer.BlobTransferPayload; import com.linkedin.davinci.blobtransfer.BlobTransferUtils; +import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.venice.exceptions.VeniceBlobTransferFileNotFoundException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.store.rocksdb.RocksDBUtils; @@ -55,7 +56,10 @@ public class P2PFileTransferClientHandler extends SimpleChannelInboundHandler checksumExceptionHolder = new AtomicReference<>(null); private final BlobTransferPayload payload; private final ExecutorService checksumValidationExecutorService; + private final AggBlobTransferStats aggBlobTransferStats; private final List> checksumValidationFutureList = new ArrayList<>(); + private final String storeName; + private final int version; // mutable states for a single file transfer. It will be updated for each file transfer. private FileChannel outputFileChannel; private String fileName; @@ -73,11 +77,15 @@ public P2PFileTransferClientHandler( int version, int partition, BlobTransferUtils.BlobTransferTableFormat tableFormat, + AggBlobTransferStats aggBlobTransferStats, ExecutorService checksumValidationExecutorService) { this.inputStreamFuture = inputStreamFuture; this.payload = new BlobTransferPayload(baseDir, storeName, version, partition, tableFormat); + this.storeName = storeName; + this.version = version; this.replicaId = Utils.getReplicaId(payload.getTopicName(), payload.getPartition()); this.checksumValidationExecutorService = checksumValidationExecutorService; + this.aggBlobTransferStats = aggBlobTransferStats; this.replicaTransferStartTime = System.currentTimeMillis(); } @@ -171,6 +179,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex count += transferred; } } + aggBlobTransferStats.recordBlobTransferBytesReceived(storeName, version, totalBytesToTransfer); if (content instanceof DefaultLastHttpContent) { // End of a single file transfer diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java index 45608759aa4..395b1ebe355 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java @@ -3,6 +3,7 @@ import com.linkedin.alpini.netty4.ssl.SslInitializer; import com.linkedin.davinci.blobtransfer.BlobSnapshotManager; import com.linkedin.davinci.blobtransfer.BlobTransferAclHandler; +import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.venice.listener.VerifySslHandler; import com.linkedin.venice.security.SSLFactory; import com.linkedin.venice.utils.SslUtils; @@ -30,6 +31,7 @@ public BlobTransferNettyChannelInitializer( int blobTransferMaxTimeoutInMin, BlobSnapshotManager blobSnapshotManager, GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler, + AggBlobTransferStats aggBlobTransferStats, Optional sslFactory, Optional aclHandler, int maxAllowedConcurrentSnapshotUsers) { @@ -40,6 +42,7 @@ public BlobTransferNettyChannelInitializer( baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager, + aggBlobTransferStats, maxAllowedConcurrentSnapshotUsers); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java index 05b38505b7c..d07d726329e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java @@ -2,6 +2,7 @@ import com.linkedin.davinci.blobtransfer.BlobSnapshotManager; import com.linkedin.davinci.blobtransfer.BlobTransferAclHandler; +import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.venice.security.SSLFactory; import com.linkedin.venice.service.AbstractVeniceService; import io.netty.bootstrap.ServerBootstrap; @@ -40,6 +41,7 @@ public P2PBlobTransferService( int blobTransferMaxTimeoutInMin, BlobSnapshotManager blobSnapshotManager, GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler, + AggBlobTransferStats aggBlobTransferStats, Optional sslFactory, Optional aclHandler, int maxAllowedConcurrentSnapshotUsers) { @@ -66,6 +68,7 @@ public P2PBlobTransferService( blobTransferMaxTimeoutInMin, blobSnapshotManager, globalChannelTrafficShapingHandler, + aggBlobTransferStats, sslFactory, aclHandler, maxAllowedConcurrentSnapshotUsers)) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java index 97128afb54a..caa29a68ce0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java @@ -15,6 +15,8 @@ import com.linkedin.davinci.blobtransfer.BlobTransferPartitionMetadata; import com.linkedin.davinci.blobtransfer.BlobTransferPayload; import com.linkedin.davinci.blobtransfer.BlobTransferUtils; +import com.linkedin.davinci.stats.AggBlobTransferStats; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.request.RequestHelper; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.Utils; @@ -64,6 +66,7 @@ public class P2PFileTransferServerHandler extends SimpleChannelInboundHandler BLOB_TRANSFER_REQUEST = AttributeKey.valueOf("blobTransferRequest"); private static final AttributeKey SUCCESS_COUNTED = @@ -73,10 +76,12 @@ public P2PFileTransferServerHandler( String baseDir, int blobTransferMaxTimeoutInMin, BlobSnapshotManager blobSnapshotManager, + AggBlobTransferStats aggBlobTransferStats, int maxAllowedConcurrentSnapshotUsers) { this.baseDir = baseDir; this.blobTransferMaxTimeoutInMin = blobTransferMaxTimeoutInMin; this.blobSnapshotManager = blobSnapshotManager; + this.aggBlobTransferStats = aggBlobTransferStats; this.maxAllowedConcurrentSnapshotUsers = maxAllowedConcurrentSnapshotUsers; } @@ -194,7 +199,7 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpReque return; } // send file - sendFile(file, ctx, replicaInfo); + sendFile(file, ctx, blobTransferRequest, replicaInfo); } sendMetadata(ctx, transferPartitionMetadata); @@ -262,7 +267,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E ctx.close(); } - private void sendFile(File file, ChannelHandlerContext ctx, String replicaInfo) throws IOException { + private void sendFile( + File file, + ChannelHandlerContext ctx, + BlobTransferPayload blobTransferPayload, + String replicaInfo) throws IOException { LOGGER.info( "Sending file: {} for replica {} to host {}.", file.getName(), @@ -289,6 +298,14 @@ private void sendFile(File file, ChannelHandlerContext ctx, String replicaInfo) sendFileFuture.addListener(future -> { if (future.isSuccess()) { + /** + * Note: This does not record the real-time byte rate of files sent. If we want to pursue more accurate read metric, + * we will need to overwrite the {@link HttpChunkedInput} above to intercept the traffic and record the byte rate. + */ + aggBlobTransferStats.recordBlobTransferBytesSent( + blobTransferPayload.getStoreName(), + Version.parseVersionFromKafkaTopicName(blobTransferPayload.getTopicName()), + length); LOGGER.info( "Sent file: {} successfully for replica: {} to host: {}", file.getName(), diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index bf7460f4101..54d6c611ca1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -1544,4 +1544,7 @@ public void maybeAddResubscribeRequest(String storeName, int version, int partit LOGGER.info("Added replica: {} to pending resubscribe queue.", Utils.getReplicaId(versionTopic, partition)); } + public AggHostLevelIngestionStats getHostLevelIngestionStats() { + return hostLevelIngestionStats; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggBlobTransferStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggBlobTransferStats.java new file mode 100644 index 00000000000..09bc674f846 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggBlobTransferStats.java @@ -0,0 +1,33 @@ +package com.linkedin.davinci.stats; + +public class AggBlobTransferStats { + private final AggVersionedBlobTransferStats aggVersionedBlobTransferStats; + private final AggHostLevelIngestionStats aggHostLevelIngestionStats; + + public AggBlobTransferStats( + AggVersionedBlobTransferStats aggVersionedBlobTransferStats, + AggHostLevelIngestionStats aggHostLevelIngestionStats) { + this.aggHostLevelIngestionStats = aggHostLevelIngestionStats; + this.aggVersionedBlobTransferStats = aggVersionedBlobTransferStats; + } + + public void recordBlobTransferBytesSent(String storeName, int version, long value) { + aggVersionedBlobTransferStats.recordBlobTransferBytesSent(storeName, version, value); + HostLevelIngestionStats totalStats = aggHostLevelIngestionStats.getTotalStats(); + if (totalStats != null) { + totalStats.recordTotalBlobTransferBytesSend(value); + } + } + + public void recordBlobTransferBytesReceived(String storeName, int version, long value) { + aggVersionedBlobTransferStats.recordBlobTransferBytesReceived(storeName, version, value); + HostLevelIngestionStats totalStats = aggHostLevelIngestionStats.getTotalStats(); + if (totalStats != null) { + totalStats.recordTotalBlobTransferBytesReceived(value); + } + } + + public AggVersionedBlobTransferStats getAggVersionedBlobTransferStats() { + return aggVersionedBlobTransferStats; + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java index 25dbabfd7eb..6e29c30a859 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java @@ -2,6 +2,7 @@ import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.venice.meta.ReadOnlyStoreRepository; +import com.linkedin.venice.utils.Time; import io.tehuti.metrics.MetricsRepository; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -21,7 +22,23 @@ public AggVersionedBlobTransferStats( super( metricsRepository, metadataRepository, - () -> new BlobTransferStats(), + BlobTransferStats::new, + BlobTransferStatsReporter::new, + serverConfig.isUnregisterMetricForDeletedStoreEnabled()); + } + + /** + * Constructor for testing that allows injecting a Time instance. + */ + public AggVersionedBlobTransferStats( + MetricsRepository metricsRepository, + ReadOnlyStoreRepository metadataRepository, + VeniceServerConfig serverConfig, + Time time) { + super( + metricsRepository, + metadataRepository, + () -> new BlobTransferStats(time), BlobTransferStatsReporter::new, serverConfig.isUnregisterMetricForDeletedStoreEnabled()); } @@ -70,4 +87,13 @@ public void recordBlobTransferFileReceiveThroughput(String storeName, int versio public void recordBlobTransferTimeInSec(String storeName, int version, double timeInSec) { recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferTimeInSec(timeInSec)); } + + public void recordBlobTransferBytesReceived(String storeName, int version, long value) { + recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferBytesReceived(value)); + } + + public void recordBlobTransferBytesSent(String storeName, int version, long value) { + recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferBytesSent(value)); + } + } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java index 9c5c0561ac0..c01d30bad86 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java @@ -1,5 +1,8 @@ package com.linkedin.davinci.stats; +import com.linkedin.venice.stats.LongAdderRateGauge; +import com.linkedin.venice.utils.SystemTime; +import com.linkedin.venice.utils.Time; import io.tehuti.metrics.MetricConfig; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; @@ -25,6 +28,8 @@ public class BlobTransferStats { // The blob file receiving throughput (in MB/sec) and time (in sec) protected static final String BLOB_TRANSFER_THROUGHPUT = "blob_transfer_file_receive_throughput"; protected static final String BLOB_TRANSFER_TIME = "blob_transfer_time"; + protected static final String BLOB_TRANSFER_BYTES_RECEIVED = "blob_transfer_bytes_received"; + protected static final String BLOB_TRANSFER_BYTES_SENT = "blob_transfer_bytes_sent"; private static final MetricConfig METRIC_CONFIG = new MetricConfig(); private final MetricsRepository localMetricRepository; @@ -38,9 +43,17 @@ public class BlobTransferStats { private Sensor blobTransferFileReceiveThroughputSensor; private Gauge blobTransferTimeGauge = new Gauge(); private Sensor blobTransferTimeSensor; + private LongAdderRateGauge blobTransferBytesReceivedSensor; + private LongAdderRateGauge blobTransferBytesSentSensor; public BlobTransferStats() { + this(new SystemTime()); + } + + public BlobTransferStats(Time time) { localMetricRepository = new MetricsRepository(METRIC_CONFIG); + blobTransferBytesReceivedSensor = new LongAdderRateGauge(time); + blobTransferBytesSentSensor = new LongAdderRateGauge(time); blobTransferTotalNumResponsesSensor = localMetricRepository.sensor(BLOB_TRANSFER_TOTAL_NUM_RESPONSES); blobTransferTotalNumResponsesSensor.add(BLOB_TRANSFER_TOTAL_NUM_RESPONSES, blobTransferTotalNumResponsesCount); @@ -57,6 +70,9 @@ public BlobTransferStats() { blobTransferTimeSensor = localMetricRepository.sensor(BLOB_TRANSFER_TIME); blobTransferTimeSensor.add(BLOB_TRANSFER_TIME, blobTransferTimeGauge); + + registerSensor(localMetricRepository, BLOB_TRANSFER_BYTES_RECEIVED, blobTransferBytesReceivedSensor); + registerSensor(localMetricRepository, BLOB_TRANSFER_BYTES_SENT, blobTransferBytesSentSensor); } /** @@ -138,4 +154,25 @@ public double getBlobTransferTime() { return blobTransferTimeGauge.measure(METRIC_CONFIG, System.currentTimeMillis()); } } + + public double getBlobTransferBytesReceived() { + return blobTransferBytesReceivedSensor.getRate(); + } + + public void recordBlobTransferBytesReceived(long value) { + blobTransferBytesReceivedSensor.record(value); + } + + public double getBlobTransferBytesSent() { + return blobTransferBytesSentSensor.getRate(); + } + + public void recordBlobTransferBytesSent(long value) { + blobTransferBytesSentSensor.record(value); + } + + void registerSensor(MetricsRepository localMetricRepository, String sensorName, LongAdderRateGauge gauge) { + Sensor sensor = localMetricRepository.sensor(sensorName); + sensor.add(sensorName + "_rate", gauge); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java index 835e7426a28..a61e6c53f8b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java @@ -45,6 +45,16 @@ protected void registerStats() { BlobTransferStats.BLOB_TRANSFER_THROUGHPUT)); registerSensor( new IngestionStatsGauge(this, () -> getStats().getBlobTransferTime(), BlobTransferStats.BLOB_TRANSFER_TIME)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferBytesReceived(), + BlobTransferStats.BLOB_TRANSFER_BYTES_RECEIVED)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferBytesSent(), + BlobTransferStats.BLOB_TRANSFER_BYTES_SENT)); } protected static class IngestionStatsGauge extends AsyncGauge { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java index 1c30564f9ab..f885d0cd0e0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java @@ -46,6 +46,11 @@ public class HostLevelIngestionStats extends AbstractVeniceStats { // The aggregated records ingested rate for the entire host private final LongAdderRateGauge totalRecordsConsumedRate; + // The aggregated blob transfer sent byte rate for the entire host + private final LongAdderRateGauge totalBlobTransferBytesSentRate; + // The aggregated blob transfer received byte rate for the entire host + private final LongAdderRateGauge totalBlobTransferBytesReceivedRate; + /* * Bytes read from Kafka by store ingestion task as a total. This metric includes bytes read for all store versions * allocated in a storage node reported with its uncompressed data size. @@ -172,6 +177,17 @@ public HostLevelIngestionStats( this.totalRecordsConsumedRate = registerOnlyTotalRate("records_consumed", totalStats, () -> totalStats.totalRecordsConsumedRate, time); + this.totalBlobTransferBytesSentRate = registerOnlyTotalRate( + "blob_transfer_bytes_sent", + totalStats, + () -> totalStats.totalBlobTransferBytesSentRate, + time); + this.totalBlobTransferBytesReceivedRate = registerOnlyTotalRate( + "blob_transfer_bytes_received", + totalStats, + () -> totalStats.totalBlobTransferBytesReceivedRate, + time); + this.totalBytesReadFromKafkaAsUncompressedSizeRate = registerOnlyTotalRate( "bytes_read_from_kafka_as_uncompressed_size", totalStats, @@ -512,6 +528,14 @@ public void recordTotalRecordsConsumed() { totalRecordsConsumedRate.record(); } + public void recordTotalBlobTransferBytesSend(long bytes) { + totalBlobTransferBytesSentRate.record(bytes); + } + + public void recordTotalBlobTransferBytesReceived(long bytes) { + totalBlobTransferBytesReceivedRate.record(bytes); + } + public void recordTotalBytesReadFromKafkaAsUncompressedSize(long bytes) { totalBytesReadFromKafkaAsUncompressedSizeRate.record(bytes); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java index 3c7134d3813..eb1862d24bd 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java @@ -22,7 +22,7 @@ import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory; import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.ingestion.IngestionBackend; -import com.linkedin.davinci.kafka.consumer.StoreIngestionService; +import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatLagMonitorAction; import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; import com.linkedin.davinci.storage.StorageService; @@ -100,7 +100,7 @@ void setUp() { when(backend.getMetricsRepository()).thenReturn(metricsRepository); when(backend.getStoreRepository()).thenReturn(mock(SubscriptionBasedReadOnlyStoreRepository.class)); when(backend.getStorageService()).thenReturn(storageService); - when(backend.getIngestionService()).thenReturn(mock(StoreIngestionService.class)); + when(backend.getIngestionService()).thenReturn(mock(KafkaStoreIngestionService.class)); when(backend.getVersionByTopicMap()).thenReturn(versionMap); when(backend.getVeniceLatestNonFaultyVersion(anyString(), anySet())).thenCallRealMethod(); when(backend.getVeniceCurrentVersion(anyString())).thenCallRealMethod(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java index aace0e0a4e7..e5c4fc57887 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java @@ -18,7 +18,7 @@ import com.linkedin.davinci.client.InternalDaVinciRecordTransformerConfig; import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.ingestion.IngestionBackend; -import com.linkedin.davinci.kafka.consumer.StoreIngestionService; +import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; import com.linkedin.davinci.stats.AggVersionedDaVinciRecordTransformerStats; import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; import com.linkedin.davinci.storage.StorageService; @@ -200,7 +200,7 @@ public void testRecordTransformerSubscribe() { recordTransformerConfig, mock(AggVersionedDaVinciRecordTransformerStats.class))); when(mockDaVinciBackend.getInternalRecordTransformerConfig(storeName)).thenReturn(internalRecordTransformerConfig); - when(mockDaVinciBackend.getIngestionService()).thenReturn(mock(StoreIngestionService.class)); + when(mockDaVinciBackend.getIngestionService()).thenReturn(mock(KafkaStoreIngestionService.class)); VersionBackend versionBackend = new VersionBackend(mockDaVinciBackend, version, mockStoreBackendStats); Collection partitionList = Arrays.asList(0, 1, 2); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestBlobTransferManagerBuilder.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestBlobTransferManagerBuilder.java index bd871b2c007..2359bb2e7d3 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestBlobTransferManagerBuilder.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestBlobTransferManagerBuilder.java @@ -3,7 +3,7 @@ import static org.mockito.Mockito.mock; import com.linkedin.davinci.notifier.VeniceNotifier; -import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; +import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.client.store.ClientConfig; @@ -27,7 +27,7 @@ public void testInitBlobTransferManager() throws IOException { int port = TestUtils.getFreePort(); Path tmpPartitionDir = Files.createTempDirectory("tmpPartitionDir"); StorageMetadataService storageMetadataService = mock(StorageMetadataService.class); - AggVersionedBlobTransferStats blobTransferStats = mock(AggVersionedBlobTransferStats.class); + AggBlobTransferStats blobTransferStats = mock(AggBlobTransferStats.class); ReadOnlyStoreRepository readOnlyStoreRepository = mock(ReadOnlyStoreRepository.class); StorageEngineRepository storageEngineRepository = mock(StorageEngineRepository.class); ClientConfig clientConfig = mock(ClientConfig.class); @@ -58,7 +58,7 @@ public void testInitBlobTransferManager() throws IOException { .setStorageMetadataService(storageMetadataService) .setReadOnlyStoreRepository(readOnlyStoreRepository) .setStorageEngineRepository(storageEngineRepository) - .setAggVersionedBlobTransferStats(blobTransferStats) + .setAggBlobTransferStats(blobTransferStats) .setBlobTransferSSLFactory(Optional.of(sslFactory)) .setBlobTransferAclHandler(Optional.of(blobTransferAclHandler)) .setPushStatusNotifierSupplier(() -> notifier) @@ -72,7 +72,7 @@ public void testFailToCreateBlobTransferManager() throws IOException { int port = TestUtils.getFreePort(); Path tmpPartitionDir = Files.createTempDirectory("tmpPartitionDir"); StorageMetadataService storageMetadataService = mock(StorageMetadataService.class); - AggVersionedBlobTransferStats blobTransferStats = mock(AggVersionedBlobTransferStats.class); + AggBlobTransferStats blobTransferStats = mock(AggBlobTransferStats.class); ReadOnlyStoreRepository readOnlyStoreRepository = mock(ReadOnlyStoreRepository.class); StorageEngineRepository storageEngineRepository = mock(StorageEngineRepository.class); ClientConfig clientConfig = mock(ClientConfig.class); @@ -104,7 +104,7 @@ public void testFailToCreateBlobTransferManager() throws IOException { .setStorageMetadataService(storageMetadataService) .setReadOnlyStoreRepository(readOnlyStoreRepository) .setStorageEngineRepository(storageEngineRepository) - .setAggVersionedBlobTransferStats(blobTransferStats) + .setAggBlobTransferStats(blobTransferStats) .setPushStatusNotifierSupplier(() -> null) .build(); Assert.assertNull(blobTransferManager); @@ -122,7 +122,7 @@ public void testFailToCreateBlobTransferManager() throws IOException { .setStorageMetadataService(storageMetadataService) .setReadOnlyStoreRepository(readOnlyStoreRepository) .setStorageEngineRepository(storageEngineRepository) - .setAggVersionedBlobTransferStats(blobTransferStats) + .setAggBlobTransferStats(blobTransferStats) .setPushStatusNotifierSupplier(() -> null) .build(); Assert.assertNull(blobTransferManager1); @@ -139,7 +139,7 @@ public void testFailToCreateBlobTransferManager() throws IOException { .setStorageMetadataService(storageMetadataService) .setReadOnlyStoreRepository(readOnlyStoreRepository) .setStorageEngineRepository(storageEngineRepository) - .setAggVersionedBlobTransferStats(blobTransferStats) + .setAggBlobTransferStats(blobTransferStats) .setPushStatusNotifierSupplier(() -> null) .build(); Assert.assertNull(blobTransferManager2); @@ -159,7 +159,7 @@ public void testFailToCreateBlobTransferManager() throws IOException { .setStorageMetadataService(storageMetadataService) .setReadOnlyStoreRepository(readOnlyStoreRepository) .setStorageEngineRepository(storageEngineRepository) - .setAggVersionedBlobTransferStats(blobTransferStats) + .setAggBlobTransferStats(blobTransferStats) .setBlobTransferAclHandler(null) .setBlobTransferSSLFactory(Optional.ofNullable(sslFactory)) .setPushStatusNotifierSupplier(() -> null) @@ -179,7 +179,7 @@ public void testFailToCreateBlobTransferManager() throws IOException { .setStorageMetadataService(storageMetadataService) .setReadOnlyStoreRepository(readOnlyStoreRepository) .setStorageEngineRepository(storageEngineRepository) - .setAggVersionedBlobTransferStats(blobTransferStats) + .setAggBlobTransferStats(blobTransferStats) .setBlobTransferAclHandler(Optional.empty()) .setBlobTransferSSLFactory(Optional.of(sslFactory)) .setPushStatusNotifierSupplier(() -> null) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java index 3417d161cb1..6e1a5b63144 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java @@ -15,6 +15,7 @@ import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService; import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.notifier.VeniceNotifier; +import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; @@ -72,7 +73,8 @@ public class TestNettyP2PBlobTransferManager { NettyP2PBlobTransferManager manager; StorageMetadataService storageMetadataService; BlobSnapshotManager blobSnapshotManager; - AggVersionedBlobTransferStats blobTransferStats; + AggBlobTransferStats blobTransferStats; + AggVersionedBlobTransferStats versionedBlobTransferStats; Path tmpSnapshotDir; Path tmpPartitionDir; String TEST_STORE = "test_store"; @@ -100,7 +102,8 @@ public void setUp() throws Exception { // intentionally use different directories for snapshot and partition so that we can verify the file transfer storageMetadataService = mock(StorageMetadataService.class); - blobTransferStats = mock(AggVersionedBlobTransferStats.class); + blobTransferStats = mock(AggBlobTransferStats.class); + versionedBlobTransferStats = mock(AggVersionedBlobTransferStats.class); StorageEngineRepository storageEngineRepository = mock(StorageEngineRepository.class); GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler = getGlobalChannelTrafficShapingHandlerInstance(2000000, 2000000); @@ -132,6 +135,7 @@ public void setUp() throws Exception { blobTransferMaxTimeoutInMin, blobSnapshotManager, globalChannelTrafficShapingHandler, + blobTransferStats, sslFactory, aclHandler, 20); @@ -144,11 +148,17 @@ public void setUp() throws Exception { 60, blobTransferMaxTimeoutInMin, globalChannelTrafficShapingHandler, + blobTransferStats, sslFactory, () -> notifier)); finder = mock(BlobFinder.class); - - manager = new NettyP2PBlobTransferManager(server, client, finder, tmpPartitionDir.toString(), blobTransferStats, 5); + manager = new NettyP2PBlobTransferManager( + server, + client, + finder, + tmpPartitionDir.toString(), + versionedBlobTransferStats, + 5); manager.start(); } @@ -507,6 +517,7 @@ public void testGetCompletesWithTimeoutExceptionAndClosesChannel() throws Except 0, // general transfer timeout immediately 10, newGlobalChannelTrafficShapingHandler, + blobTransferStats, sslFactory, null)); @@ -516,12 +527,18 @@ public void testGetCompletesWithTimeoutExceptionAndClosesChannel() throws Except 20, blobSnapshotManager, newGlobalChannelTrafficShapingHandler, + blobTransferStats, sslFactory, aclHandler, 20); - NettyP2PBlobTransferManager newManager = - new NettyP2PBlobTransferManager(newServer, newClient, finder, tmpPartitionDir.toString(), blobTransferStats, 5); + NettyP2PBlobTransferManager newManager = new NettyP2PBlobTransferManager( + newServer, + newClient, + finder, + tmpPartitionDir.toString(), + versionedBlobTransferStats, + 5); newManager.start(); // Action diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java index eba95a45858..b2c2afb4f91 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java @@ -12,6 +12,7 @@ import com.linkedin.davinci.blobtransfer.client.P2PFileTransferClientHandler; import com.linkedin.davinci.blobtransfer.client.P2PMetadataTransferHandler; import com.linkedin.davinci.notifier.VeniceNotifier; +import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.state.IncrementalPushReplicaStatus; @@ -66,6 +67,7 @@ public class TestP2PFileTransferClientHandler { CompletionStage inputStreamFuture; StorageMetadataService storageMetadataService; ExecutorService checksumValidationExecutorService; + AggBlobTransferStats blobTransferStats; P2PFileTransferClientHandler clientFileHandler; P2PMetadataTransferHandler clientMetadataHandler; @@ -76,6 +78,7 @@ public void setUp() throws IOException { baseDir = Files.createTempDirectory("tmp"); inputStreamFuture = new CompletableFuture<>(); storageMetadataService = Mockito.mock(StorageMetadataService.class); + blobTransferStats = Mockito.mock(AggBlobTransferStats.class); checksumValidationExecutorService = Executors.newSingleThreadExecutor(); clientFileHandler = Mockito.spy( @@ -86,6 +89,7 @@ public void setUp() throws IOException { TEST_VERSION, TEST_PARTITION, BlobTransferUtils.BlobTransferTableFormat.BLOCK_BASED_TABLE, + blobTransferStats, checksumValidationExecutorService)); veniceNotifier = Mockito.mock(VeniceNotifier.class); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java index bdf0bab2a3c..fae6e66fc23 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.davinci.blobtransfer.server.P2PFileTransferServerHandler; +import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.davinci.store.StorageEngine; @@ -55,6 +56,7 @@ public class TestP2PFileTransferServerHandler { P2PFileTransferServerHandler serverHandler; BlobSnapshotManager blobSnapshotManager; StorageEngineRepository storageEngineRepository; + AggBlobTransferStats blobTransferStats; int maxAllowedConcurrentSnapshotUsers = 20; @BeforeMethod @@ -63,12 +65,13 @@ public void setUp() throws IOException { blobTransferMaxTimeoutInMin = 30; storageMetadataService = Mockito.mock(StorageMetadataService.class); storageEngineRepository = Mockito.mock(StorageEngineRepository.class); - + blobTransferStats = Mockito.mock(AggBlobTransferStats.class); blobSnapshotManager = Mockito.spy(new BlobSnapshotManager(storageEngineRepository, storageMetadataService)); serverHandler = new P2PFileTransferServerHandler( baseDir.toString(), blobTransferMaxTimeoutInMin, blobSnapshotManager, + blobTransferStats, maxAllowedConcurrentSnapshotUsers); ch = new EmbeddedChannel(serverHandler); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggBlobTransferStatsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggBlobTransferStatsTest.java new file mode 100644 index 00000000000..77d504aa68a --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggBlobTransferStatsTest.java @@ -0,0 +1,169 @@ +package com.linkedin.davinci.stats; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import com.linkedin.davinci.config.VeniceServerConfig; +import com.linkedin.venice.meta.OfflinePushStrategy; +import com.linkedin.venice.meta.PersistenceType; +import com.linkedin.venice.meta.ReadOnlyStoreRepository; +import com.linkedin.venice.meta.ReadStrategy; +import com.linkedin.venice.meta.RoutingStrategy; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.ZKStore; +import com.linkedin.venice.stats.LongAdderRateGauge; +import com.linkedin.venice.tehuti.MockTehutiReporter; +import com.linkedin.venice.utils.TestMockTime; +import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.Utils; +import io.tehuti.metrics.MetricsRepository; +import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class AggBlobTransferStatsTest { + @Test + public void testHostLevelBlobTransferMetrics() { + TestMockTime mockTime = new TestMockTime(); + MetricsRepository metricsRepo = new MetricsRepository(mockTime); + MockTehutiReporter reporter = new MockTehutiReporter(); + VeniceServerConfig mockVeniceServerConfig = Mockito.mock(VeniceServerConfig.class); + + String storeName = Utils.getUniqueString("store_foo"); + + metricsRepo.addReporter(reporter); + ReadOnlyStoreRepository mockMetaRepository = mock(ReadOnlyStoreRepository.class); + doReturn(Int2ObjectMaps.emptyMap()).when(mockVeniceServerConfig).getKafkaClusterIdToAliasMap(); + doReturn(true).when(mockVeniceServerConfig).isUnregisterMetricForDeletedStoreEnabled(); + doReturn("test-cluster").when(mockVeniceServerConfig).getClusterName(); + + // Create AggVersionedBlobTransferStats + AggVersionedBlobTransferStats aggVersionedStats = + new AggVersionedBlobTransferStats(metricsRepo, mockMetaRepository, mockVeniceServerConfig, mockTime); + + // Create AggHostLevelIngestionStats + AggHostLevelIngestionStats aggHostLevelStats = new AggHostLevelIngestionStats( + metricsRepo, + mockVeniceServerConfig, + new HashMap<>(), + mockMetaRepository, + true, + mockTime); + + // Create AggBlobTransferStats that wraps both + AggBlobTransferStats aggBlobTransferStats = new AggBlobTransferStats(aggVersionedStats, aggHostLevelStats); + + Store mockStore = createStore(storeName); + List storeList = new ArrayList<>(); + storeList.add(mockStore); + + doReturn(mockStore).when(mockMetaRepository).getStoreOrThrow(any()); + doReturn(storeList).when(mockMetaRepository).getAllStores(); + + aggVersionedStats.loadAllStats(); + storeName = mockStore.getName(); + + // Record host-level blob transfer bytes received and sent + aggBlobTransferStats.recordBlobTransferBytesReceived(storeName, 1, 2048); + aggBlobTransferStats.recordBlobTransferBytesSent(storeName, 1, 8192); + + // Advance time past the 30-second cache duration to get the rate calculation + mockTime.addMilliseconds(Time.MS_PER_SECOND * LongAdderRateGauge.RATE_GAUGE_CACHE_DURATION_IN_SECONDS); + + // Expected rates + double expectedReceivedRate = 2048.0 / LongAdderRateGauge.RATE_GAUGE_CACHE_DURATION_IN_SECONDS; + double expectedSentRate = 8192.0 / LongAdderRateGauge.RATE_GAUGE_CACHE_DURATION_IN_SECONDS; + + // Verify version-level metrics + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_bytes_received.IngestionStatsGauge").value(), + expectedReceivedRate); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_bytes_sent.IngestionStatsGauge").value(), + expectedSentRate); + + // Verify host-level metrics + Assert.assertEquals(reporter.query(".total--blob_transfer_bytes_received.Rate").value(), expectedReceivedRate); + Assert.assertEquals(reporter.query(".total--blob_transfer_bytes_sent.Rate").value(), expectedSentRate); + } + + @Test + public void testHostLevelBlobTransferMetricsAcrossMultipleVersions() { + TestMockTime mockTime = new TestMockTime(); + MetricsRepository metricsRepo = new MetricsRepository(mockTime); + MockTehutiReporter reporter = new MockTehutiReporter(); + VeniceServerConfig mockVeniceServerConfig = Mockito.mock(VeniceServerConfig.class); + + String storeName = Utils.getUniqueString("store_bar"); + + metricsRepo.addReporter(reporter); + ReadOnlyStoreRepository mockMetaRepository = mock(ReadOnlyStoreRepository.class); + doReturn(Int2ObjectMaps.emptyMap()).when(mockVeniceServerConfig).getKafkaClusterIdToAliasMap(); + doReturn(true).when(mockVeniceServerConfig).isUnregisterMetricForDeletedStoreEnabled(); + doReturn("test-cluster").when(mockVeniceServerConfig).getClusterName(); + + // Create AggVersionedBlobTransferStats + AggVersionedBlobTransferStats aggVersionedStats = + new AggVersionedBlobTransferStats(metricsRepo, mockMetaRepository, mockVeniceServerConfig, mockTime); + + // Create AggHostLevelIngestionStats + AggHostLevelIngestionStats aggHostLevelStats = new AggHostLevelIngestionStats( + metricsRepo, + mockVeniceServerConfig, + new HashMap<>(), + mockMetaRepository, + true, + mockTime); + + // Create AggBlobTransferStats that wraps both + AggBlobTransferStats aggBlobTransferStats = new AggBlobTransferStats(aggVersionedStats, aggHostLevelStats); + + Store mockStore = createStore(storeName); + List storeList = new ArrayList<>(); + storeList.add(mockStore); + + doReturn(mockStore).when(mockMetaRepository).getStoreOrThrow(any()); + doReturn(storeList).when(mockMetaRepository).getAllStores(); + + aggVersionedStats.loadAllStats(); + storeName = mockStore.getName(); + + // Record metrics for version 1 + aggBlobTransferStats.recordBlobTransferBytesReceived(storeName, 1, 1000); + aggBlobTransferStats.recordBlobTransferBytesSent(storeName, 1, 2000); + + // Record metrics for version 2 + aggBlobTransferStats.recordBlobTransferBytesReceived(storeName, 2, 3000); + aggBlobTransferStats.recordBlobTransferBytesSent(storeName, 2, 4000); + + // Advance time past the 30-second cache duration + mockTime.addMilliseconds(Time.MS_PER_SECOND * LongAdderRateGauge.RATE_GAUGE_CACHE_DURATION_IN_SECONDS); + + // Host-level metrics should aggregate across all versions + // Total bytes received: 1000 + 3000 = 4000 + // Total bytes sent: 2000 + 4000 = 6000 + double expectedReceivedRate = 4000.0 / LongAdderRateGauge.RATE_GAUGE_CACHE_DURATION_IN_SECONDS; + double expectedSentRate = 6000.0 / LongAdderRateGauge.RATE_GAUGE_CACHE_DURATION_IN_SECONDS; + + Assert.assertEquals(reporter.query(".total--blob_transfer_bytes_received.Rate").value(), expectedReceivedRate); + Assert.assertEquals(reporter.query(".total--blob_transfer_bytes_sent.Rate").value(), expectedSentRate); + } + + private Store createStore(String storeName) { + return new ZKStore( + storeName, + "", + 10, + PersistenceType.ROCKS_DB, + RoutingStrategy.CONSISTENT_HASH, + ReadStrategy.ANY_OF_ONLINE, + OfflinePushStrategy.WAIT_ALL_REPLICAS, + 1); + } +} diff --git a/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStatsTest.java similarity index 78% rename from services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java rename to clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStatsTest.java index d49b2f10b5c..114c805db1c 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStatsTest.java @@ -1,4 +1,4 @@ -package com.linkedin.venice.stats; +package com.linkedin.davinci.stats; import static java.lang.Double.NaN; import static org.mockito.Mockito.any; @@ -6,7 +6,6 @@ import static org.mockito.Mockito.mock; import com.linkedin.davinci.config.VeniceServerConfig; -import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.venice.meta.OfflinePushStrategy; import com.linkedin.venice.meta.PersistenceType; import com.linkedin.venice.meta.ReadOnlyStoreRepository; @@ -14,7 +13,10 @@ import com.linkedin.venice.meta.RoutingStrategy; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.ZKStore; +import com.linkedin.venice.stats.LongAdderRateGauge; import com.linkedin.venice.tehuti.MockTehutiReporter; +import com.linkedin.venice.utils.TestMockTime; +import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils; import io.tehuti.metrics.MetricsRepository; @@ -32,6 +34,7 @@ public void testRecordBlobTransferMetrics() { MetricsRepository metricsRepo = MetricsRepositoryUtils.createSingleThreadedMetricsRepository(); MockTehutiReporter reporter = new MockTehutiReporter(); VeniceServerConfig mockVeniceServerConfig = Mockito.mock(VeniceServerConfig.class); + TestMockTime mockTime = new TestMockTime(); String storeName = Utils.getUniqueString("store_foo"); @@ -41,7 +44,7 @@ public void testRecordBlobTransferMetrics() { doReturn(true).when(mockVeniceServerConfig).isUnregisterMetricForDeletedStoreEnabled(); AggVersionedBlobTransferStats stats = - new AggVersionedBlobTransferStats(metricsRepo, mockMetaRepository, mockVeniceServerConfig); + new AggVersionedBlobTransferStats(metricsRepo, mockMetaRepository, mockVeniceServerConfig, mockTime); Store mockStore = createStore(storeName); List storeList = new ArrayList<>(); @@ -52,6 +55,7 @@ public void testRecordBlobTransferMetrics() { stats.loadAllStats(); storeName = mockStore.getName(); + // initial stats // Gauge default value is NaN Assert @@ -96,6 +100,22 @@ public void testRecordBlobTransferMetrics() { stats.recordBlobTransferTimeInSec(storeName, 1, 20.0); Assert .assertEquals(reporter.query("." + storeName + "_total--blob_transfer_time.IngestionStatsGauge").value(), 20.0); + // Record blob transfer bytes received + stats.recordBlobTransferBytesReceived(storeName, 1, 1024); + // Advance time past the 30-second cache duration to get the rate calculation + mockTime.addMilliseconds(Time.MS_PER_SECOND * LongAdderRateGauge.RATE_GAUGE_CACHE_DURATION_IN_SECONDS); + // Expected rate: 1024 bytes / 30 seconds = 34.13 bytes/sec + double expectedRate = 1024.0 / LongAdderRateGauge.RATE_GAUGE_CACHE_DURATION_IN_SECONDS; + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_bytes_received.IngestionStatsGauge").value(), + expectedRate); + // Record blob transfer bytes sent + stats.recordBlobTransferBytesSent(storeName, 1, 4096); + expectedRate = 4096.0 / LongAdderRateGauge.RATE_GAUGE_CACHE_DURATION_IN_SECONDS; + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_bytes_sent.IngestionStatsGauge").value(), + expectedRate); + } private Store createStore(String storeName) { diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 84edddd8ef2..7adce6a5ba4 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -17,6 +17,7 @@ import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; import com.linkedin.davinci.kafka.consumer.RemoteIngestionRepairService; import com.linkedin.davinci.repository.VeniceMetadataRepositoryBuilder; +import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.stats.AggVersionedStorageEngineStats; import com.linkedin.davinci.stats.HeartbeatMonitoringServiceStats; @@ -129,6 +130,7 @@ public class VeniceServer { private ServerReadMetadataRepository serverReadMetadataRepository; private BlobTransferManager blobTransferManager; private AggVersionedBlobTransferStats aggVersionedBlobTransferStats; + private AggBlobTransferStats aggBlobTransferStats; private Lazy zkHelixAdmin; private final Optional d2Client; @@ -485,7 +487,9 @@ private List createServices() { */ if (BlobTransferUtils.isBlobTransferManagerEnabled(serverConfig, false)) { aggVersionedBlobTransferStats = new AggVersionedBlobTransferStats(metricsRepository, metadataRepo, serverConfig); - + aggBlobTransferStats = new AggBlobTransferStats( + aggVersionedBlobTransferStats, + kafkaStoreIngestionService.getHostLevelIngestionStats()); P2PBlobTransferConfig p2PBlobTransferConfig = new P2PBlobTransferConfig( serverConfig.getDvcP2pBlobTransferServerPort(), serverConfig.getDvcP2pBlobTransferClientPort(), @@ -535,14 +539,13 @@ private List createServices() { .setStorageMetadataService(storageMetadataService) .setReadOnlyStoreRepository(metadataRepo) .setStorageEngineRepository(storageService.getStorageEngineRepository()) - .setAggVersionedBlobTransferStats(aggVersionedBlobTransferStats) + .setAggBlobTransferStats(aggBlobTransferStats) .setBlobTransferSSLFactory(sslFactory) .setBlobTransferAclHandler(BlobTransferUtils.createAclHandler(veniceConfigLoader)) .setAdaptiveBlobTransferWriteTrafficThrottler(writeThrottler) .setAdaptiveBlobTransferReadTrafficThrottler(readThrottler) - .setPushStatusNotifierSupplier(() -> { - return helixParticipationService != null ? helixParticipationService.getPushStatusNotifier() : null; - }) + .setPushStatusNotifierSupplier( + () -> helixParticipationService != null ? helixParticipationService.getPushStatusNotifier() : null) .build(); } else { aggVersionedBlobTransferStats = null;