Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24209 Added tests for filters and storage profiles awareness on partition reset #5064

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

package org.apache.ignite.internal.table.distributed.disaster;

import static java.util.Collections.emptySet;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.table.TableTestUtils.getTableId;
import static org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.RECOVERY_TRIGGER_KEY;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -38,8 +43,10 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
Expand All @@ -50,9 +57,12 @@
import org.apache.ignite.internal.distributionzones.Node;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.versioned.VersionedSerialization;

Expand Down Expand Up @@ -132,10 +142,71 @@ private void waitAndAssertStableAssignmentsOfPartitionEqualTo(
.collect(Collectors.toUnmodifiableSet()));
}

final void waitAndAssertEmptyRebalanceKeysAndStableAssignmentsOfPartitionEqualTo(
IgniteImpl gatewayNode, String tableName, Set<Integer> partitionIds, Set<String> nodes) {
partitionIds.forEach(p -> {
try {
waitAndAssertEmptyRebalanceKeysAndStableAssignmentsOfPartitionEqualTo(gatewayNode, tableName, p, nodes);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}

private void waitAndAssertEmptyRebalanceKeysAndStableAssignmentsOfPartitionEqualTo(
IgniteImpl gatewayNode, String tableName, int partNum, Set<String> nodes
) throws InterruptedException {
AtomicReference<Set<String>> stableNodes = new AtomicReference<>();
AtomicReference<Set<String>> pendingNodes = new AtomicReference<>();
AtomicReference<Set<String>> plannedNodes = new AtomicReference<>();

assertTrue(
waitForCondition(() -> {
Integer tableId = getTableId(gatewayNode.catalogManager(), tableName, clock.nowLong());

assert tableId != null;

TablePartitionId tablePartitionId = new TablePartitionId(tableId, partNum);

ByteArray stableKey = stablePartAssignmentsKey(tablePartitionId);
ByteArray pendingKey = pendingPartAssignmentsKey(tablePartitionId);
ByteArray plannedKey = plannedPartAssignmentsKey(tablePartitionId);

Map<ByteArray, Entry> results = await(gatewayNode.metaStorageManager()
.getAll(Set.of(stableKey, pendingKey, plannedKey)), 1, TimeUnit.SECONDS);

boolean isStableAsExpected = nodes.equals(assignmentsFromEntry(results.get(stableKey)));
boolean isPendingEmpty = results.get(pendingKey).value() == null;
boolean isPlannedEmpty = results.get(plannedKey).value() == null;

stableNodes.set(assignmentsFromEntry(results.get(stableKey)));
pendingNodes.set(assignmentsFromEntry(results.get(pendingKey)));
plannedNodes.set(assignmentsFromEntry(results.get(plannedKey)));

return isStableAsExpected && isPendingEmpty && isPlannedEmpty;
},
500,
30_000
), IgniteStringFormatter.format(
"Expected: (stable: {}, pending: [], planned: []), but actual: (stable: {}, pending: {}, planned: {})",
nodes, stableNodes, pendingNodes, plannedNodes
)
);
}

static Entry getRecoveryTriggerKey(IgniteImpl node) {
return node.metaStorageManager().getLocally(RECOVERY_TRIGGER_KEY, Long.MAX_VALUE);
}

private static Set<String> assignmentsFromEntry(Entry entry) {
return (entry.value() != null)
? Assignments.fromBytes(entry.value()).nodes()
.stream()
.map(Assignment::consistentId)
.collect(Collectors.toUnmodifiableSet())
: emptySet();
}

private Set<Assignment> getPartitionClusterNodes(IgniteImpl node, String tableName, int partNum) {
return Optional.ofNullable(getTableId(node.catalogManager(), tableName, clock.nowLong()))
.map(tableId -> partitionAssignments(node.metaStorageManager(), tableId, partNum).join())
Expand All @@ -148,10 +219,16 @@ final int zoneIdByName(CatalogService catalogService, String zoneName) {

private void createHaZoneWithTables(
String zoneName, List<String> tableNames, String filter, Set<String> targetNodes) throws InterruptedException {
createHaZoneWithTables(zoneName, tableNames, filter, DEFAULT_STORAGE_PROFILE, targetNodes);
}

private void createHaZoneWithTables(
String zoneName, List<String> tableNames, String filter, String storageProfiles, Set<String> targetNodes
) throws InterruptedException {
executeSql(String.format(
"CREATE ZONE %s WITH REPLICAS=%s, PARTITIONS=%s, STORAGE_PROFILES='%s', "
+ "CONSISTENCY_MODE='HIGH_AVAILABILITY', DATA_NODES_FILTER='%s'",
zoneName, targetNodes.size(), PARTITIONS_NUMBER, DEFAULT_STORAGE_PROFILE, filter
zoneName, targetNodes.size(), PARTITIONS_NUMBER, storageProfiles, filter
));

Set<Integer> tableIds = new HashSet<>();
Expand Down Expand Up @@ -186,10 +263,23 @@ final void createHaZoneWithTable(String zoneName, String tableName) throws Inter
createHaZoneWithTables(zoneName, List.of(tableName));
}

final void createHaZoneWithTable(int replicas, String filter, Set<String> targetNodes) throws InterruptedException {
createHaZoneWithTables(
HA_ZONE_NAME, List.of(HA_TABLE_NAME), filter, targetNodes
);
}

final void createHaZoneWithTable() throws InterruptedException {
createHaZoneWithTable(HA_ZONE_NAME, HA_TABLE_NAME);
}

final void createHaZoneWithTableWithStorageProfile(int replicas, String storageProfiles, Set<String> targetNodes)
throws InterruptedException {
createHaZoneWithTables(
HA_ZONE_NAME, List.of(HA_TABLE_NAME), DEFAULT_FILTER, storageProfiles, targetNodes
);
}

final void createScZoneWithTable() {
executeSql(String.format(
"CREATE ZONE %s WITH REPLICAS=%s, PARTITIONS=%s, STORAGE_PROFILES='%s', CONSISTENCY_MODE='STRONG_CONSISTENCY'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@
import java.util.Set;
import org.apache.ignite.internal.app.IgniteImpl;
import org.intellij.lang.annotations.Language;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;

/** Test suite for the cases with a recovery of the group replication factor after reset by zone filter update. */
public class ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends AbstractHighAvailablePartitionsRecoveryTest {
private static final String GLOBAL_EU_NODES_CONFIG = nodeConfig("{region = EU, zone = global}");
private static final String GLOBAL_EU_NODES_CONFIG =
nodeConfig("{region = EU, zone = global}", "{segmented_aipersist.engine = aipersist}");

private static final String EU_ONLY_NODES_CONFIG = nodeConfig("{region = EU}");
private static final String EU_ONLY_NODES_CONFIG = nodeConfig("{region = EU}", null);

private static final String GLOBAL_NODES_CONFIG = nodeConfig("{zone = global}");
private static final String GLOBAL_NODES_CONFIG = nodeConfig("{zone = global}", null);

private static final String ROCKS_NODES_CONFIG = nodeConfig(null, "{lru_rocks.engine = rocksdb}");

private static final String AIPERSIST_NODES_CONFIG = nodeConfig(null, "{segmented_aipersist.engine = aipersist}");

@Override
protected int initialNodes() {
Expand Down Expand Up @@ -72,13 +78,61 @@ void testScaleUpAfterZoneFilterUpdate() throws InterruptedException {
waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, globalNodes);
}

@Test
void testThatPartitionResetZoneFilterAware() throws InterruptedException {
startNode(1, EU_ONLY_NODES_CONFIG);
startNode(2, GLOBAL_NODES_CONFIG);

String euFilter = "$[?(@.region == \"EU\")]";

Set<String> euNodes = nodeNames(0, 1);

createHaZoneWithTable(2, euFilter, euNodes);

IgniteImpl node = igniteImpl(0);

waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, euNodes);

assertRecoveryKeyIsEmpty(node);

stopNodes(1);

// Due to the fact that only one [0] node is suitable according to filter:
waitAndAssertEmptyRebalanceKeysAndStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, nodeNames(0));
}

@Test
void testThatPartitionResetZoneStorageProfileFilterAware() throws InterruptedException {
startNode(1, AIPERSIST_NODES_CONFIG);
startNode(2, ROCKS_NODES_CONFIG);

Set<String> nodesWithAiProfile = nodeNames(0, 1);

createHaZoneWithTableWithStorageProfile(2, "segmented_aipersist", nodesWithAiProfile);

IgniteImpl node = igniteImpl(0);

waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, nodesWithAiProfile);

assertRecoveryKeyIsEmpty(node);

stopNodes(1);

// Due to the fact that only one [0] node is suitable according to storage profiles:
waitAndAssertEmptyRebalanceKeysAndStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, nodeNames(0));
}

private void alterZoneSql(String filter, String zoneName) {
executeSql(String.format("ALTER ZONE \"%s\" SET \"DATA_NODES_FILTER\" = '%s'", zoneName, filter));
}

private static String nodeConfig(@Language("HOCON") String nodeAtrributes) {
private static String nodeConfig(
@Nullable @Language("HOCON") String nodeAtrributes,
@Nullable @Language("HOCON") String storageProfiles
) {
return "ignite {\n"
+ " nodeAttributes.nodeAttributes: " + nodeAtrributes + ",\n"
+ " storage.profiles: " + storageProfiles + ",\n"
+ " network: {\n"
+ " port: {},\n"
+ " nodeFinder: {\n"
Expand Down