Skip to content

Commit d4ab149

Browse files
committed
fix test
1 parent dc40d3f commit d4ab149

File tree

3 files changed

+17
-6
lines changed

3 files changed

+17
-6
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -1916,10 +1916,8 @@ protected boolean shouldProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelo
19161916
}
19171917
}
19181918

1919-
if (!record.getTopicPartition()
1920-
.getPubSubTopic()
1921-
.getPubSubTopicType()
1922-
.equals(currentLeaderTopic.getPubSubTopicType())) {
1919+
if (!Utils.getLeaderTopicFromPubSubTopic(pubSubTopicRepository, record.getTopicPartition().getPubSubTopic())
1920+
.equals(currentLeaderTopic)) {
19231921
String errorMsg =
19241922
"Leader replica: {} received a pubsub message that doesn't belong to the leader topic. Leader topic: "
19251923
+ currentLeaderTopic + ", topic of incoming message: "
@@ -1978,7 +1976,8 @@ protected boolean shouldPersistRecord(
19781976
case LEADER:
19791977
PubSubTopic currentLeaderTopic =
19801978
partitionConsumptionState.getOffsetRecord().getLeaderTopic(pubSubTopicRepository);
1981-
if (!incomingMessageTopic.getPubSubTopicType().equals(currentLeaderTopic.getPubSubTopicType())) {
1979+
if (!Utils.getLeaderTopicFromPubSubTopic(pubSubTopicRepository, incomingMessageTopic)
1980+
.equals(currentLeaderTopic)) {
19821981
String errorMsg =
19831982
"Leader replica: {} received a pubsub message that doesn't belong to the leader topic. Leader topic: "
19841983
+ currentLeaderTopic + ", topic of incoming message: " + incomingMessageTopic.getName();

internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java

+12
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import com.linkedin.venice.meta.RoutingDataRepository;
2121
import com.linkedin.venice.meta.Store;
2222
import com.linkedin.venice.meta.Version;
23+
import com.linkedin.venice.pubsub.PubSubTopicRepository;
2324
import com.linkedin.venice.pubsub.api.PubSubTopic;
2425
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
26+
import com.linkedin.venice.pubsub.api.PubSubTopicType;
2527
import com.linkedin.venice.pushmonitor.ExecutionStatus;
2628
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
2729
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
@@ -941,4 +943,14 @@ public static boolean isSepTopicRegion(String region) {
941943
return region.endsWith(SEPARATE_TOPIC_SUFFIX);
942944
}
943945

946+
public static PubSubTopic getLeaderTopicFromPubSubTopic(
947+
PubSubTopicRepository pubSubTopicRepository,
948+
PubSubTopic pubSubTopic) {
949+
if (pubSubTopic.getPubSubTopicType().equals(PubSubTopicType.REALTIME_TOPIC)
950+
&& pubSubTopic.getName().endsWith(SEPARATE_TOPIC_SUFFIX)) {
951+
return pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(pubSubTopic.getStoreName()));
952+
}
953+
return pubSubTopic;
954+
}
955+
944956
}

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
256256
pubSubBrokerWrapper.getPubSubClientsFactory().getAdminAdapterFactory().getClass().getName())
257257
.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000)
258258
.put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 5000)
259-
.put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, false);
259+
.put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true);
260260
if (sslToKafka) {
261261
serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name());
262262
serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration()));

0 commit comments

Comments
 (0)