diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java index 7ffbc66bc84..93ecfedf2a8 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java @@ -17,13 +17,18 @@ package org.apache.ignite.internal.table.distributed.disaster; +import static java.util.Collections.emptySet; import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments; +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey; +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey; +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey; import static org.apache.ignite.internal.table.TableTestUtils.getTableId; import static org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.RECOVERY_TRIGGER_KEY; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; @@ -38,8 +43,10 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.Ignite; @@ -50,9 +57,12 @@ import org.apache.ignite.internal.distributionzones.Node; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.partitiondistribution.Assignment; +import org.apache.ignite.internal.partitiondistribution.Assignments; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.versioned.VersionedSerialization; @@ -132,10 +142,71 @@ private void waitAndAssertStableAssignmentsOfPartitionEqualTo( .collect(Collectors.toUnmodifiableSet())); } + final void waitAndAssertEmptyRebalanceKeysAndStableAssignmentsOfPartitionEqualTo( + IgniteImpl gatewayNode, String tableName, Set partitionIds, Set nodes) { + partitionIds.forEach(p -> { + try { + waitAndAssertEmptyRebalanceKeysAndStableAssignmentsOfPartitionEqualTo(gatewayNode, tableName, p, nodes); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + private void waitAndAssertEmptyRebalanceKeysAndStableAssignmentsOfPartitionEqualTo( + IgniteImpl gatewayNode, String tableName, int partNum, Set nodes + ) throws InterruptedException { + AtomicReference> stableNodes = new AtomicReference<>(); + AtomicReference> pendingNodes = new AtomicReference<>(); + AtomicReference> plannedNodes = new AtomicReference<>(); + + assertTrue( + waitForCondition(() -> { + Integer tableId = getTableId(gatewayNode.catalogManager(), tableName, clock.nowLong()); + + assert tableId != null; + + TablePartitionId tablePartitionId = new TablePartitionId(tableId, partNum); + + ByteArray stableKey = stablePartAssignmentsKey(tablePartitionId); + ByteArray pendingKey = pendingPartAssignmentsKey(tablePartitionId); + ByteArray plannedKey = plannedPartAssignmentsKey(tablePartitionId); + + Map results = await(gatewayNode.metaStorageManager() + .getAll(Set.of(stableKey, pendingKey, plannedKey)), 1, TimeUnit.SECONDS); + + boolean isStableAsExpected = nodes.equals(assignmentsFromEntry(results.get(stableKey))); + boolean isPendingEmpty = results.get(pendingKey).value() == null; + boolean isPlannedEmpty = results.get(plannedKey).value() == null; + + stableNodes.set(assignmentsFromEntry(results.get(stableKey))); + pendingNodes.set(assignmentsFromEntry(results.get(pendingKey))); + plannedNodes.set(assignmentsFromEntry(results.get(plannedKey))); + + return isStableAsExpected && isPendingEmpty && isPlannedEmpty; + }, + 500, + 30_000 + ), IgniteStringFormatter.format( + "Expected: (stable: {}, pending: [], planned: []), but actual: (stable: {}, pending: {}, planned: {})", + nodes, stableNodes, pendingNodes, plannedNodes + ) + ); + } + static Entry getRecoveryTriggerKey(IgniteImpl node) { return node.metaStorageManager().getLocally(RECOVERY_TRIGGER_KEY, Long.MAX_VALUE); } + private static Set assignmentsFromEntry(Entry entry) { + return (entry.value() != null) + ? Assignments.fromBytes(entry.value()).nodes() + .stream() + .map(Assignment::consistentId) + .collect(Collectors.toUnmodifiableSet()) + : emptySet(); + } + private Set getPartitionClusterNodes(IgniteImpl node, String tableName, int partNum) { return Optional.ofNullable(getTableId(node.catalogManager(), tableName, clock.nowLong())) .map(tableId -> partitionAssignments(node.metaStorageManager(), tableId, partNum).join()) @@ -148,10 +219,16 @@ final int zoneIdByName(CatalogService catalogService, String zoneName) { private void createHaZoneWithTables( String zoneName, List tableNames, String filter, Set targetNodes) throws InterruptedException { + createHaZoneWithTables(zoneName, tableNames, filter, DEFAULT_STORAGE_PROFILE, targetNodes); + } + + private void createHaZoneWithTables( + String zoneName, List tableNames, String filter, String storageProfiles, Set targetNodes + ) throws InterruptedException { executeSql(String.format( "CREATE ZONE %s WITH REPLICAS=%s, PARTITIONS=%s, STORAGE_PROFILES='%s', " + "CONSISTENCY_MODE='HIGH_AVAILABILITY', DATA_NODES_FILTER='%s'", - zoneName, targetNodes.size(), PARTITIONS_NUMBER, DEFAULT_STORAGE_PROFILE, filter + zoneName, targetNodes.size(), PARTITIONS_NUMBER, storageProfiles, filter )); Set tableIds = new HashSet<>(); @@ -186,10 +263,23 @@ final void createHaZoneWithTable(String zoneName, String tableName) throws Inter createHaZoneWithTables(zoneName, List.of(tableName)); } + final void createHaZoneWithTable(int replicas, String filter, Set targetNodes) throws InterruptedException { + createHaZoneWithTables( + HA_ZONE_NAME, List.of(HA_TABLE_NAME), filter, targetNodes + ); + } + final void createHaZoneWithTable() throws InterruptedException { createHaZoneWithTable(HA_ZONE_NAME, HA_TABLE_NAME); } + final void createHaZoneWithTableWithStorageProfile(int replicas, String storageProfiles, Set targetNodes) + throws InterruptedException { + createHaZoneWithTables( + HA_ZONE_NAME, List.of(HA_TABLE_NAME), DEFAULT_FILTER, storageProfiles, targetNodes + ); + } + final void createScZoneWithTable() { executeSql(String.format( "CREATE ZONE %s WITH REPLICAS=%s, PARTITIONS=%s, STORAGE_PROFILES='%s', CONSISTENCY_MODE='STRONG_CONSISTENCY'", diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java index 7456704f5f9..c7d4d5bac22 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java @@ -20,15 +20,21 @@ import java.util.Set; import org.apache.ignite.internal.app.IgniteImpl; import org.intellij.lang.annotations.Language; +import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Test; /** Test suite for the cases with a recovery of the group replication factor after reset by zone filter update. */ public class ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends AbstractHighAvailablePartitionsRecoveryTest { - private static final String GLOBAL_EU_NODES_CONFIG = nodeConfig("{region = EU, zone = global}"); + private static final String GLOBAL_EU_NODES_CONFIG = + nodeConfig("{region = EU, zone = global}", "{segmented_aipersist.engine = aipersist}"); - private static final String EU_ONLY_NODES_CONFIG = nodeConfig("{region = EU}"); + private static final String EU_ONLY_NODES_CONFIG = nodeConfig("{region = EU}", null); - private static final String GLOBAL_NODES_CONFIG = nodeConfig("{zone = global}"); + private static final String GLOBAL_NODES_CONFIG = nodeConfig("{zone = global}", null); + + private static final String ROCKS_NODES_CONFIG = nodeConfig(null, "{lru_rocks.engine = rocksdb}"); + + private static final String AIPERSIST_NODES_CONFIG = nodeConfig(null, "{segmented_aipersist.engine = aipersist}"); @Override protected int initialNodes() { @@ -72,13 +78,61 @@ void testScaleUpAfterZoneFilterUpdate() throws InterruptedException { waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, globalNodes); } + @Test + void testThatPartitionResetZoneFilterAware() throws InterruptedException { + startNode(1, EU_ONLY_NODES_CONFIG); + startNode(2, GLOBAL_NODES_CONFIG); + + String euFilter = "$[?(@.region == \"EU\")]"; + + Set euNodes = nodeNames(0, 1); + + createHaZoneWithTable(2, euFilter, euNodes); + + IgniteImpl node = igniteImpl(0); + + waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, euNodes); + + assertRecoveryKeyIsEmpty(node); + + stopNodes(1); + + // Due to the fact that only one [0] node is suitable according to filter: + waitAndAssertEmptyRebalanceKeysAndStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, nodeNames(0)); + } + + @Test + void testThatPartitionResetZoneStorageProfileFilterAware() throws InterruptedException { + startNode(1, AIPERSIST_NODES_CONFIG); + startNode(2, ROCKS_NODES_CONFIG); + + Set nodesWithAiProfile = nodeNames(0, 1); + + createHaZoneWithTableWithStorageProfile(2, "segmented_aipersist", nodesWithAiProfile); + + IgniteImpl node = igniteImpl(0); + + waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, nodesWithAiProfile); + + assertRecoveryKeyIsEmpty(node); + + stopNodes(1); + + // Due to the fact that only one [0] node is suitable according to storage profiles: + waitAndAssertEmptyRebalanceKeysAndStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, nodeNames(0)); + } + private void alterZoneSql(String filter, String zoneName) { executeSql(String.format("ALTER ZONE \"%s\" SET \"DATA_NODES_FILTER\" = '%s'", zoneName, filter)); } - private static String nodeConfig(@Language("HOCON") String nodeAtrributes) { + private static String nodeConfig( + @Nullable @Language("HOCON") String nodeAtrributes, + @Nullable @Language("HOCON") String storageProfiles + ) { return "ignite {\n" + " nodeAttributes.nodeAttributes: " + nodeAtrributes + ",\n" + + " storage.profiles: " + storageProfiles + ",\n" + " network: {\n" + " port: {},\n" + " nodeFinder: {\n"