Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import com.linkedin.davinci.ingestion.main.MainIngestionStorageMetadataService;
import com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.kafka.consumer.StoreIngestionService;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.repository.VeniceMetadataRepositoryBuilder;
import com.linkedin.davinci.stats.AggBlobTransferStats;
import com.linkedin.davinci.stats.AggVersionedBlobTransferStats;
import com.linkedin.davinci.stats.AggVersionedStorageEngineStats;
import com.linkedin.davinci.stats.HeartbeatMonitoringServiceStats;
Expand Down Expand Up @@ -139,6 +139,7 @@ public enum ClientType {
private final boolean writeBatchingPushStatus;
private final HeartbeatMonitoringService heartbeatMonitoringService;
private AggVersionedBlobTransferStats aggVersionedBlobTransferStats;
private AggBlobTransferStats aggBlobTransferStats;

public DaVinciBackend(
ClientConfig clientConfig,
Expand Down Expand Up @@ -327,7 +328,8 @@ public DaVinciBackend(
if (BlobTransferUtils.isBlobTransferManagerEnabled(backendConfig, isIsolatedIngestion())) {
aggVersionedBlobTransferStats =
new AggVersionedBlobTransferStats(metricsRepository, storeRepository, configLoader.getVeniceServerConfig());

aggBlobTransferStats =
new AggBlobTransferStats(aggVersionedBlobTransferStats, ingestionService.getHostLevelIngestionStats());
P2PBlobTransferConfig p2PBlobTransferConfig = new P2PBlobTransferConfig(
configLoader.getVeniceServerConfig().getDvcP2pBlobTransferServerPort(),
configLoader.getVeniceServerConfig().getDvcP2pBlobTransferClientPort(),
Expand All @@ -351,13 +353,14 @@ public DaVinciBackend(
.setStorageMetadataService(storageMetadataService)
.setReadOnlyStoreRepository(readOnlyStoreRepository)
.setStorageEngineRepository(storageService.getStorageEngineRepository())
.setAggVersionedBlobTransferStats(aggVersionedBlobTransferStats)
.setAggBlobTransferStats(aggBlobTransferStats)
.setBlobTransferSSLFactory(BlobTransferUtils.createSSLFactoryForBlobTransferInDVC(configLoader))
.setBlobTransferAclHandler(BlobTransferUtils.createAclHandler(configLoader))
.setPushStatusNotifierSupplier(() -> ingestionListener)
.build();
} else {
aggVersionedBlobTransferStats = null;
aggBlobTransferStats = null;
blobTransferManager = null;
}

Expand Down Expand Up @@ -573,7 +576,7 @@ final StorageService getStorageService() {
return storageService;
}

final StoreIngestionService getIngestionService() {
final KafkaStoreIngestionService getIngestionService() {
return ingestionService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient;
import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggVersionedBlobTransferStats;
import com.linkedin.davinci.stats.AggBlobTransferStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.venice.blobtransfer.BlobFinder;
Expand Down Expand Up @@ -36,7 +36,7 @@ public class BlobTransferManagerBuilder {
private StorageMetadataService storageMetadataService;
private ReadOnlyStoreRepository readOnlyStoreRepository;
private StorageEngineRepository storageEngineRepository;
private AggVersionedBlobTransferStats aggVersionedBlobTransferStats;
private AggBlobTransferStats aggBlobTransferStats;
private Optional<SSLFactory> sslFactory;
private Optional<BlobTransferAclHandler> aclHandler;
private VeniceAdaptiveBlobTransferTrafficThrottler adaptiveBlobTransferWriteTrafficThrottler;
Expand Down Expand Up @@ -77,9 +77,8 @@ public BlobTransferManagerBuilder setStorageEngineRepository(StorageEngineReposi
return this;
}

public BlobTransferManagerBuilder setAggVersionedBlobTransferStats(
AggVersionedBlobTransferStats aggVersionedBlobTransferStats) {
this.aggVersionedBlobTransferStats = aggVersionedBlobTransferStats;
public BlobTransferManagerBuilder setAggBlobTransferStats(AggBlobTransferStats aggBlobTransferStats) {
this.aggBlobTransferStats = aggBlobTransferStats;
return this;
}

Expand Down Expand Up @@ -112,6 +111,10 @@ public BlobTransferManagerBuilder setPushStatusNotifierSupplier(Supplier<VeniceN
return this;
}

public AggBlobTransferStats getAggBlobTransferStats() {
return aggBlobTransferStats;
}

public BlobTransferManager<Void> build() {
try {
validateFields();
Expand Down Expand Up @@ -150,6 +153,7 @@ public BlobTransferManager<Void> build() {
blobTransferConfig.getBlobTransferMaxTimeoutInMin(),
blobSnapshotManager,
globalTrafficHandler,
getAggBlobTransferStats(),
sslFactory,
aclHandler,
blobTransferConfig.getMaxConcurrentSnapshotUser()),
Expand All @@ -161,11 +165,12 @@ public BlobTransferManager<Void> build() {
blobTransferConfig.getBlobReceiveTimeoutInMin(),
blobTransferConfig.getBlobReceiveReaderIdleTimeInSeconds(),
globalTrafficHandler,
getAggBlobTransferStats(),
sslFactory,
veniceNotifier),
blobFinder,
blobTransferConfig.getBaseDir(),
aggVersionedBlobTransferStats,
getAggBlobTransferStats().getAggVersionedBlobTransferStats(),
blobTransferConfig.getMaxConcurrentBlobReceiveReplicas());

// start the P2P blob transfer manager
Expand All @@ -191,7 +196,7 @@ private void validateFields() {
}

if (blobTransferConfig == null || storageMetadataService == null || readOnlyStoreRepository == null
|| storageEngineRepository == null || aggVersionedBlobTransferStats == null) {
|| storageEngineRepository == null || aggBlobTransferStats == null) {
throw new IllegalArgumentException(
"The blob transfer config, storage metadata service, read only store repository, storage engine repository, "
+ "and agg versioned blob transfer stats must not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferTableFormat;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggBlobTransferStats;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.venice.exceptions.VenicePeersConnectionException;
import com.linkedin.venice.listener.VerifySslHandler;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class NettyFileTransferClient {
private final VeniceConcurrentHashMap<String, Long> unconnectableHostsToTimestamp = new VeniceConcurrentHashMap<>();
private final VeniceConcurrentHashMap<String, Long> connectedHostsToTimestamp = new VeniceConcurrentHashMap<>();
private final Supplier<VeniceNotifier> notifierSupplier;
private final AggBlobTransferStats aggBlobTransferStats;

private final VerifySslHandler verifySsl = new VerifySslHandler();

Expand All @@ -90,6 +92,7 @@ public NettyFileTransferClient(
int blobReceiveTimeoutInMin,
int blobReceiveReaderIdleTimeInSeconds,
GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler,
AggBlobTransferStats aggBlobTransferStats,
Optional<SSLFactory> sslFactory,
Supplier<VeniceNotifier> notifierSupplier) {
this.baseDir = baseDir;
Expand All @@ -99,6 +102,7 @@ public NettyFileTransferClient(
this.peersConnectivityFreshnessInSeconds = peersConnectivityFreshnessInSeconds;
this.blobReceiveTimeoutInMin = blobReceiveTimeoutInMin;
this.blobReceiveReaderIdleTimeInSeconds = blobReceiveReaderIdleTimeInSeconds;
this.aggBlobTransferStats = aggBlobTransferStats;

clientBootstrap = new Bootstrap();
workerGroup = new NioEventLoopGroup();
Expand Down Expand Up @@ -308,6 +312,7 @@ public CompletionStage<InputStream> get(
version,
partition,
requestedTableFormat,
aggBlobTransferStats,
checksumValidationExecutorService))
.addLast(
new P2PMetadataTransferHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.linkedin.davinci.blobtransfer.BlobTransferPayload;
import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.stats.AggBlobTransferStats;
import com.linkedin.venice.exceptions.VeniceBlobTransferFileNotFoundException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
Expand Down Expand Up @@ -55,7 +56,10 @@ public class P2PFileTransferClientHandler extends SimpleChannelInboundHandler<Ht
private final AtomicReference<Throwable> checksumExceptionHolder = new AtomicReference<>(null);
private final BlobTransferPayload payload;
private final ExecutorService checksumValidationExecutorService;
private final AggBlobTransferStats aggBlobTransferStats;
private final List<CompletableFuture<Void>> checksumValidationFutureList = new ArrayList<>();
private final String storeName;
private final int version;
// mutable states for a single file transfer. It will be updated for each file transfer.
private FileChannel outputFileChannel;
private String fileName;
Expand All @@ -73,11 +77,15 @@ public P2PFileTransferClientHandler(
int version,
int partition,
BlobTransferUtils.BlobTransferTableFormat tableFormat,
AggBlobTransferStats aggBlobTransferStats,
ExecutorService checksumValidationExecutorService) {
this.inputStreamFuture = inputStreamFuture;
this.payload = new BlobTransferPayload(baseDir, storeName, version, partition, tableFormat);
this.storeName = storeName;
this.version = version;
this.replicaId = Utils.getReplicaId(payload.getTopicName(), payload.getPartition());
this.checksumValidationExecutorService = checksumValidationExecutorService;
this.aggBlobTransferStats = aggBlobTransferStats;
this.replicaTransferStartTime = System.currentTimeMillis();
}

Expand Down Expand Up @@ -171,6 +179,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex
count += transferred;
}
}
aggBlobTransferStats.recordBlobTransferBytesReceived(storeName, version, totalBytesToTransfer);

if (content instanceof DefaultLastHttpContent) {
// End of a single file transfer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.alpini.netty4.ssl.SslInitializer;
import com.linkedin.davinci.blobtransfer.BlobSnapshotManager;
import com.linkedin.davinci.blobtransfer.BlobTransferAclHandler;
import com.linkedin.davinci.stats.AggBlobTransferStats;
import com.linkedin.venice.listener.VerifySslHandler;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.utils.SslUtils;
Expand Down Expand Up @@ -30,6 +31,7 @@ public BlobTransferNettyChannelInitializer(
int blobTransferMaxTimeoutInMin,
BlobSnapshotManager blobSnapshotManager,
GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler,
AggBlobTransferStats aggBlobTransferStats,
Optional<SSLFactory> sslFactory,
Optional<BlobTransferAclHandler> aclHandler,
int maxAllowedConcurrentSnapshotUsers) {
Expand All @@ -40,6 +42,7 @@ public BlobTransferNettyChannelInitializer(
baseDir,
blobTransferMaxTimeoutInMin,
blobSnapshotManager,
aggBlobTransferStats,
maxAllowedConcurrentSnapshotUsers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.davinci.blobtransfer.BlobSnapshotManager;
import com.linkedin.davinci.blobtransfer.BlobTransferAclHandler;
import com.linkedin.davinci.stats.AggBlobTransferStats;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.service.AbstractVeniceService;
import io.netty.bootstrap.ServerBootstrap;
Expand Down Expand Up @@ -40,6 +41,7 @@ public P2PBlobTransferService(
int blobTransferMaxTimeoutInMin,
BlobSnapshotManager blobSnapshotManager,
GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler,
AggBlobTransferStats aggBlobTransferStats,
Optional<SSLFactory> sslFactory,
Optional<BlobTransferAclHandler> aclHandler,
int maxAllowedConcurrentSnapshotUsers) {
Expand All @@ -66,6 +68,7 @@ public P2PBlobTransferService(
blobTransferMaxTimeoutInMin,
blobSnapshotManager,
globalChannelTrafficShapingHandler,
aggBlobTransferStats,
sslFactory,
aclHandler,
maxAllowedConcurrentSnapshotUsers))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import com.linkedin.davinci.blobtransfer.BlobTransferPartitionMetadata;
import com.linkedin.davinci.blobtransfer.BlobTransferPayload;
import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.stats.AggBlobTransferStats;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.request.RequestHelper;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.Utils;
Expand Down Expand Up @@ -64,6 +66,7 @@ public class P2PFileTransferServerHandler extends SimpleChannelInboundHandler<Fu
private final BlobSnapshotManager blobSnapshotManager;
// Global counter for all active transfer requests across all topics and partitions
private final AtomicInteger globalConcurrentTransferRequests = new AtomicInteger(0);
private final AggBlobTransferStats aggBlobTransferStats;
private static final AttributeKey<BlobTransferPayload> BLOB_TRANSFER_REQUEST =
AttributeKey.valueOf("blobTransferRequest");
private static final AttributeKey<AtomicBoolean> SUCCESS_COUNTED =
Expand All @@ -73,10 +76,12 @@ public P2PFileTransferServerHandler(
String baseDir,
int blobTransferMaxTimeoutInMin,
BlobSnapshotManager blobSnapshotManager,
AggBlobTransferStats aggBlobTransferStats,
int maxAllowedConcurrentSnapshotUsers) {
this.baseDir = baseDir;
this.blobTransferMaxTimeoutInMin = blobTransferMaxTimeoutInMin;
this.blobSnapshotManager = blobSnapshotManager;
this.aggBlobTransferStats = aggBlobTransferStats;
this.maxAllowedConcurrentSnapshotUsers = maxAllowedConcurrentSnapshotUsers;
}

Expand Down Expand Up @@ -194,7 +199,7 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpReque
return;
}
// send file
sendFile(file, ctx, replicaInfo);
sendFile(file, ctx, blobTransferRequest, replicaInfo);
}

sendMetadata(ctx, transferPartitionMetadata);
Expand Down Expand Up @@ -262,7 +267,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.close();
}

private void sendFile(File file, ChannelHandlerContext ctx, String replicaInfo) throws IOException {
private void sendFile(
File file,
ChannelHandlerContext ctx,
BlobTransferPayload blobTransferPayload,
String replicaInfo) throws IOException {
LOGGER.info(
"Sending file: {} for replica {} to host {}.",
file.getName(),
Expand All @@ -289,6 +298,14 @@ private void sendFile(File file, ChannelHandlerContext ctx, String replicaInfo)

sendFileFuture.addListener(future -> {
if (future.isSuccess()) {
/**
* Note: This does not record the real-time byte rate of files sent. If we want to pursue more accurate read metric,
* we will need to overwrite the {@link HttpChunkedInput} above to intercept the traffic and record the byte rate.
*/
aggBlobTransferStats.recordBlobTransferBytesSent(
blobTransferPayload.getStoreName(),
Version.parseVersionFromKafkaTopicName(blobTransferPayload.getTopicName()),
length);
LOGGER.info(
"Sent file: {} successfully for replica: {} to host: {}",
file.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1544,4 +1544,7 @@ public void maybeAddResubscribeRequest(String storeName, int version, int partit
LOGGER.info("Added replica: {} to pending resubscribe queue.", Utils.getReplicaId(versionTopic, partition));
}

public AggHostLevelIngestionStats getHostLevelIngestionStats() {
return hostLevelIngestionStats;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.linkedin.davinci.stats;

public class AggBlobTransferStats {
private final AggVersionedBlobTransferStats aggVersionedBlobTransferStats;
private final AggHostLevelIngestionStats aggHostLevelIngestionStats;

public AggBlobTransferStats(
AggVersionedBlobTransferStats aggVersionedBlobTransferStats,
AggHostLevelIngestionStats aggHostLevelIngestionStats) {
this.aggHostLevelIngestionStats = aggHostLevelIngestionStats;
this.aggVersionedBlobTransferStats = aggVersionedBlobTransferStats;
}

public void recordBlobTransferBytesSent(String storeName, int version, long value) {
aggVersionedBlobTransferStats.recordBlobTransferBytesSent(storeName, version, value);
HostLevelIngestionStats totalStats = aggHostLevelIngestionStats.getTotalStats();
if (totalStats != null) {
totalStats.recordTotalBlobTransferBytesSend(value);
}
}

public void recordBlobTransferBytesReceived(String storeName, int version, long value) {
aggVersionedBlobTransferStats.recordBlobTransferBytesReceived(storeName, version, value);
HostLevelIngestionStats totalStats = aggHostLevelIngestionStats.getTotalStats();
if (totalStats != null) {
totalStats.recordTotalBlobTransferBytesReceived(value);
}
}

public AggVersionedBlobTransferStats getAggVersionedBlobTransferStats() {
return aggVersionedBlobTransferStats;
}
}
Loading
Loading