Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][vpj][controller] Add support to ingest from separate RT topic in A/A SIT when feature is enabled #1311

Merged
merged 25 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
17f63d9
[server] Subscribe/Unsubscribe to separate RT topic in leader logic w…
sixpluszero Nov 12, 2024
78ab435
Remove some debug logging
sixpluszero Nov 15, 2024
49ce213
Some refactor and cleanup
sixpluszero Nov 15, 2024
c669257
Fix some (but not all) errors
sixpluszero Nov 15, 2024
df7841e
fix unit tests
sixpluszero Nov 16, 2024
4f317a4
debug
sixpluszero Nov 16, 2024
d6d2663
fix test
sixpluszero Nov 16, 2024
71fd163
remove logging; improvement
sixpluszero Nov 16, 2024
930ac8e
fix workload pool
sixpluszero Nov 17, 2024
4ffa100
avoid emitting metrics for non-sep SIT
sixpluszero Nov 18, 2024
5e9c313
further address rebase issue
sixpluszero Nov 18, 2024
ac01cf9
fix merge bug
sixpluszero Nov 18, 2024
37a7ffc
resolve comments; Add emtpy push to validate RTS check
sixpluszero Nov 21, 2024
770c908
fix unit test
sixpluszero Nov 21, 2024
7b731d0
Move sep RT in current version to a new pool
sixpluszero Nov 21, 2024
b666497
fix merge resolution error
sixpluszero Dec 3, 2024
4d4d077
remove extra logging
sixpluszero Dec 3, 2024
543743c
Add some more Java doc to newly added methods
sixpluszero Dec 4, 2024
c3efae2
Address review comments
sixpluszero Dec 4, 2024
927b04d
static
sixpluszero Dec 4, 2024
78ea988
make AA RT metrics measurement also include Sep RT
sixpluszero Dec 5, 2024
838d1a9
Add a new RMD vector test to validate
sixpluszero Dec 6, 2024
bea9bd1
static analysis
sixpluszero Dec 6, 2024
4c118cc
resolve merge conflict
sixpluszero Dec 6, 2024
ed93079
fix nit
sixpluszero Dec 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@
import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_ALLOCATION_STRATEGY;
import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_FOR_CURRENT_VERSION_AA_WC_LEADER;
import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_FOR_CURRENT_VERSION_NON_AA_WC_LEADER;
import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_FOR_CURRENT_VERSION_SEPARATE_RT_LEADER;
import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_FOR_NON_CURRENT_VERSION_AA_WC_LEADER;
import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_FOR_NON_CURRENT_VERSION_NON_AA_WC_LEADER;
import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER;
import static com.linkedin.venice.ConfigKeys.SERVER_CURRENT_VERSION_AA_WC_LEADER_QUOTA_RECORDS_PER_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_CURRENT_VERSION_NON_AA_WC_LEADER_QUOTA_RECORDS_PER_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_CURRENT_VERSION_SEPARATE_RT_LEADER_QUOTA_RECORDS_PER_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_LOOKUP_QUEUE_CAPACITY;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_MEMORY_STATS_ENABLED;
Expand Down Expand Up @@ -531,6 +533,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean dedicatedConsumerPoolForAAWCLeaderEnabled;
private final KafkaConsumerServiceDelegator.ConsumerPoolStrategyType consumerPoolStrategyType;
private final int consumerPoolSizeForCurrentVersionAAWCLeader;
private final int consumerPoolSizeForCurrentVersionSepRTLeader;
private final int consumerPoolSizeForNonCurrentVersionAAWCLeader;
private final int consumerPoolSizeForCurrentVersionNonAAWCLeader;
private final int consumerPoolSizeForNonCurrentVersionNonAAWCLeader;
Expand All @@ -554,6 +557,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final int defaultMaxRecordSizeBytes;
private final int aaWCLeaderQuotaRecordsPerSecond;
private final int currentVersionAAWCLeaderQuotaRecordsPerSecond;
private final int currentVersionSepRTLeaderQuotaRecordsPerSecond;
private final int currentVersionNonAAWCLeaderQuotaRecordsPerSecond;
private final int nonCurrentVersionAAWCLeaderQuotaRecordsPerSecond;
private final int nonCurrentVersionNonAAWCLeaderQuotaRecordsPerSecond;
Expand Down Expand Up @@ -888,6 +892,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
KafkaConsumerServiceDelegator.ConsumerPoolStrategyType.DEFAULT.name()));
consumerPoolSizeForCurrentVersionAAWCLeader =
serverProperties.getInt(SERVER_CONSUMER_POOL_SIZE_FOR_CURRENT_VERSION_AA_WC_LEADER, 10);
consumerPoolSizeForCurrentVersionSepRTLeader =
serverProperties.getInt(SERVER_CONSUMER_POOL_SIZE_FOR_CURRENT_VERSION_SEPARATE_RT_LEADER, 10);
consumerPoolSizeForNonCurrentVersionAAWCLeader =
serverProperties.getInt(SERVER_CONSUMER_POOL_SIZE_FOR_NON_CURRENT_VERSION_AA_WC_LEADER, 10);
consumerPoolSizeForCurrentVersionNonAAWCLeader =
Expand Down Expand Up @@ -920,6 +926,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
aaWCLeaderQuotaRecordsPerSecond = serverProperties.getInt(SERVER_AA_WC_LEADER_QUOTA_RECORDS_PER_SECOND, -1);
currentVersionAAWCLeaderQuotaRecordsPerSecond =
serverProperties.getInt(SERVER_CURRENT_VERSION_AA_WC_LEADER_QUOTA_RECORDS_PER_SECOND, -1);
currentVersionSepRTLeaderQuotaRecordsPerSecond =
serverProperties.getInt(SERVER_CURRENT_VERSION_SEPARATE_RT_LEADER_QUOTA_RECORDS_PER_SECOND, -1);
currentVersionNonAAWCLeaderQuotaRecordsPerSecond =
serverProperties.getInt(SERVER_CURRENT_VERSION_NON_AA_WC_LEADER_QUOTA_RECORDS_PER_SECOND, -1);
nonCurrentVersionAAWCLeaderQuotaRecordsPerSecond =
Expand Down Expand Up @@ -1605,6 +1613,10 @@ public int getConsumerPoolSizeForCurrentVersionAAWCLeader() {
return consumerPoolSizeForCurrentVersionAAWCLeader;
}

public int getConsumerPoolSizeForCurrentVersionSepRTLeader() {
return consumerPoolSizeForCurrentVersionSepRTLeader;
}

public int getConsumerPoolSizeForNonCurrentVersionAAWCLeader() {
return consumerPoolSizeForNonCurrentVersionAAWCLeader;
}
Expand Down Expand Up @@ -1673,6 +1685,10 @@ public int getCurrentVersionAAWCLeaderQuotaRecordsPerSecond() {
return currentVersionAAWCLeaderQuotaRecordsPerSecond;
}

public int getCurrentVersionSepRTLeaderQuotaRecordsPerSecond() {
return currentVersionSepRTLeaderQuotaRecordsPerSecond;
}

public int getCurrentVersionNonAAWCLeaderQuotaRecordsPerSecond() {
return currentVersionNonAAWCLeaderQuotaRecordsPerSecond;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,6 @@ protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> con
}
partitionToPutMessageCount.computeIfAbsent(message.getPartition(), x -> new AtomicLong(0)).incrementAndGet();
}

// Determine if the event should be filtered or not
if (filterRecordByVersionSwapHighWatermarks(replicationCheckpoint, pubSubTopicPartition)) {
return Optional.empty();
Expand Down Expand Up @@ -893,9 +892,10 @@ protected boolean handleVersionSwapControlMessage(
if (controlMessageType.equals(ControlMessageType.VERSION_SWAP)) {
VersionSwap versionSwap = (VersionSwap) controlMessage.controlMessageUnion;
LOGGER.info(
"Obtain version swap message: {} and versions swap high watermarks: {}",
"Obtain version swap message: {} and versions swap high watermarks: {} for: {}",
versionSwap,
versionSwap.getLocalHighWatermarks());
versionSwap.getLocalHighWatermarks(),
pubSubTopicPartition);
PubSubTopic newServingVersionTopic =
pubSubTopicRepository.getTopic(versionSwap.newServingVersionTopic.toString());

Expand Down
Loading
Loading