Skip to content

Commit

Permalink
IGNITE-16499 Сonsistency check command should support IGNITE_TO_STRIN…
Browse files Browse the repository at this point in the history
…G_INCLUDE_SENSITIVE option (apache#9814)
  • Loading branch information
anton-vinogradov authored Mar 10, 2022
1 parent 95ad019 commit 6b0637e
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.ignite.util.GridCommandHandlerClusterByClassTest;
import org.apache.ignite.util.GridCommandHandlerClusterByClassWithSSLTest;
import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest;
import org.apache.ignite.util.GridCommandHandlerConsistencySensitiveTest;
import org.apache.ignite.util.GridCommandHandlerConsistencyTest;
import org.apache.ignite.util.GridCommandHandlerDefragmentationTest;
import org.apache.ignite.util.GridCommandHandlerIndexForceRebuildTest;
Expand Down Expand Up @@ -93,6 +94,7 @@

GridCommandHandlerConsistencyTest.class,
GridCommandHandlerConsistencyBinaryTest.class,
GridCommandHandlerConsistencySensitiveTest.class,

SystemViewCommandTest.class,
MetricCommandTest.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.util;

import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.testframework.junits.WithSystemProperty;

/**
*
*/
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_TO_STRING_INCLUDE_SENSITIVE, value = "false")
public class GridCommandHandlerConsistencySensitiveTest extends GridCommandHandlerConsistencyTest {
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.consistency.VisorConsistencyStatusTask;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand All @@ -48,6 +52,7 @@
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
import static org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask.CONSISTENCY_VIOLATIONS_FOUND;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.LogListener.matches;

/**
*
Expand All @@ -66,6 +71,9 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster
/** Partitions. */
private static final int PARTITIONS = 32;

/** */
protected final ListeningTestLogger listeningLog = new ListeningTestLogger(log);

/** */
@Parameterized.Parameters(name = "strategy={0}")
public static Iterable<Object[]> data() {
Expand Down Expand Up @@ -103,6 +111,8 @@ protected CacheConfiguration<Integer, Integer> cacheConfiguration(boolean tx) {

cfg.setDataStorageConfiguration(null);

cfg.setGridLogger(listeningLog);

return cfg;
}

Expand Down Expand Up @@ -138,6 +148,27 @@ private void testAtomicAndTx(boolean incVal) throws Exception {

injectTestSystemOut();

LogListener lsnrUnmaskedKey = matches("Key: 0 (cache: ").build(); // Unmasked key.
LogListener lsnrMaskedKey = matches("Key: [HIDDEN_KEY#").build(); // Masked key.
LogListener lsnrMaskedVal = matches("Value: [HIDDEN_VALUE#").build(); // Masked value.

listeningLog.registerListener(lsnrUnmaskedKey);
listeningLog.registerListener(lsnrMaskedKey);
listeningLog.registerListener(lsnrMaskedVal);

List<LogListener> listeners = new ArrayList<>();

// It's unable to check just "Key:" count while https://issues.apache.org/jira/browse/IGNITE-15316 not fixed
if (S.includeSensitive()) {
for (int i = 0; i < PARTITIONS; i++) {
LogListener keyListener = matches("Key: " + i + " (cache: ").build();

listeningLog.registerListener(keyListener);

listeners.add(keyListener);
}
}

assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
assertContains(log, testOut.toString(),
"conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts.get());
Expand All @@ -146,13 +177,30 @@ private void testAtomicAndTx(boolean incVal) throws Exception {

readRepair(brokenParts, txCacheName, fixesPerEntry);

if (S.includeSensitive()) {
for (LogListener listener : listeners) {
assertTrue(listener.check());

listener.reset();
}
}

if (fixesPerEntry != null && fixesPerEntry > 0)
assertEquals(PARTITIONS, brokenParts.get()); // Half fixed.

readRepair(brokenParts, atomicCacheName, fixesPerEntry != null ? 0 : null);

if (S.includeSensitive()) {
for (LogListener listener : listeners)
assertTrue(listener.check());
}

if (fixesPerEntry != null && fixesPerEntry > 0)
assertEquals(PARTITIONS, brokenParts.get()); // Atomics still broken.

assertEquals(S.includeSensitive(), lsnrUnmaskedKey.check());
assertEquals(S.includeSensitive(), !lsnrMaskedKey.check());
assertEquals(S.includeSensitive(), !lsnrMaskedVal.check());
}

/**
Expand Down Expand Up @@ -212,6 +260,8 @@ public void testRepairNonExistentCache() throws Exception {
ConsistencyCommand.PARTITION, String.valueOf(i),
ConsistencyCommand.STRATEGY, strategy.toString()));

assertTrue(VisorConsistencyStatusTask.MAP.isEmpty());

assertContains(log, testOut.toString(), "Cache not found");
}
}
Expand All @@ -225,6 +275,9 @@ private void readRepair(AtomicInteger brokenParts, String cacheName, Integer fix
ConsistencyCommand.CACHE, cacheName,
ConsistencyCommand.PARTITION, String.valueOf(i),
ConsistencyCommand.STRATEGY, strategy.toString()));

assertTrue(VisorConsistencyStatusTask.MAP.isEmpty());

assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND);
assertContains(log, testOut.toString(), "[found=1, fixed=" + (fixesPerEntry != null ? fixesPerEntry.toString() : ""));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class CacheConsistencyViolationEvent extends EventAdapter {
private static final long serialVersionUID = 0L;

/** Represents original values of entries.*/
private final Map<Object, Map<ClusterNode, EntryInfo>> entries;
private final Map<Object, EntriesInfo> entries;

/** Fixed entries. */
private final Map<Object, Object> fixed;
Expand All @@ -92,7 +92,7 @@ public CacheConsistencyViolationEvent(
String cacheName,
ClusterNode node,
String msg,
Map<Object, Map<ClusterNode, EntryInfo>> entries,
Map<Object, EntriesInfo> entries,
Map<Object, Object> fixed,
ReadRepairStrategy strategy) {
super(node, msg, EVT_CONSISTENCY_VIOLATION);
Expand All @@ -108,7 +108,7 @@ public CacheConsistencyViolationEvent(
*
* @return Collection of original entries.
*/
public Map<Object, Map<ClusterNode, EntryInfo>> getEntries() {
public Map<Object, EntriesInfo> getEntries() {
return entries;
}

Expand Down Expand Up @@ -139,6 +139,21 @@ public ReadRepairStrategy getStrategy() {
return strategy;
}

/**
* Inconsistent entries mapping.
*/
public interface EntriesInfo {
/**
* @return Entry's mapping.
*/
public Map<ClusterNode, EntryInfo> getMapping();

/**
* @return Entry's partition.
*/
public int partition();
}

/**
* Inconsistent entry info.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2500,7 +2500,7 @@ public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
keepCacheObjects,
deserializeBinary,
false,
getRes,
null,
getRes.version(),
0,
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cache.ReadRepairStrategy;
import org.apache.ignite.cluster.ClusterNode;
Expand All @@ -45,6 +47,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.S;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
Expand Down Expand Up @@ -335,7 +338,12 @@ protected final void recordConsistencyViolation(
if (!evtMgr.isRecordable(EVT_CONSISTENCY_VIOLATION))
return;

Map<Object, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>> entries = new HashMap<>();
boolean includeSensitive = S.includeSensitive();

Map<KeyCacheObject, Object> sensitiveKeyMap = new HashMap<>();
Map<ByteArrayWrapper, Object> sensitiveValMap = new HashMap<>();

Map<Object, CacheConsistencyViolationEvent.EntriesInfo> entries = new HashMap<>();

for (Map.Entry<ClusterNode, GridPartitionedGetFuture<KeyCacheObject, EntryGetResult>> pair : futs.entrySet()) {
ClusterNode node = pair.getKey();
Expand All @@ -344,21 +352,24 @@ protected final void recordConsistencyViolation(

for (KeyCacheObject key : fut.keys()) {
if (inconsistentKeys.contains(key)) {
Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> map =
entries.computeIfAbsent(
ctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false, null), k -> new HashMap<>());
sensitiveKeyMap.computeIfAbsent(key, k -> includeSensitive
? ctx.unwrapBinaryIfNeeded(k, !deserializeBinary, false, null)
: "[HIDDEN_KEY#" + UUID.randomUUID() + "]");

CacheConsistencyViolationEvent.EntriesInfo entriesInfo =
entries.computeIfAbsent(sensitiveKeyMap.get(key), k -> new EventEntriesInfo(key.partition()));

EntryGetResult res = fut.result().get(key);
CacheEntryVersion ver = res != null ? res.version() : null;

Object val = res != null ? ctx.unwrapBinaryIfNeeded(res.value(), !deserializeBinary, false, null) : null;
Object val = sensitiveValue(includeSensitive, res, sensitiveValMap);

boolean primary = primaries.get(key).equals(fut.affNode());
boolean correct = fixedEntries != null &&
((fixedEntries.get(key) != null && fixedEntries.get(key).equals(res)) ||
(fixedEntries.get(key) == null && res == null));

map.put(node, new EventEntryInfo(val, ver, primary, correct));
entriesInfo.getMapping().put(node, new EventEntryInfo(val, ver, primary, correct));
}
}
}
Expand All @@ -371,9 +382,8 @@ protected final void recordConsistencyViolation(
fixed = new HashMap<>();

for (Map.Entry<KeyCacheObject, EntryGetResult> entry : fixedEntries.entrySet()) {
Object key = ctx.unwrapBinaryIfNeeded(entry.getKey(), !deserializeBinary, false, null);
Object val = entry.getValue() != null ?
ctx.unwrapBinaryIfNeeded(entry.getValue().value(), !deserializeBinary, false, null) : null;
Object key = sensitiveKeyMap.get(entry.getKey());
Object val = sensitiveValue(includeSensitive, entry.getValue(), sensitiveValMap);

fixed.put(key, val);
}
Expand All @@ -388,6 +398,58 @@ protected final void recordConsistencyViolation(
strategy));
}

/**
*
*/
private Object sensitiveValue(boolean includeSensitive, EntryGetResult res,
Map<ByteArrayWrapper, Object> sensitiveValMap) {
if (res != null) {
CacheObject val = res.value();

try {
ByteArrayWrapper wrapped = new ByteArrayWrapper(val.valueBytes(ctx.cacheObjectContext()));

return sensitiveValMap.computeIfAbsent(wrapped, w ->
includeSensitive ?
ctx.unwrapBinaryIfNeeded(val, !deserializeBinary, false, null) :
"[HIDDEN_VALUE#" + UUID.randomUUID() + "]");
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to unmarshall object.", e);
}
}
else
return null;
}

/**
*
*/
private static final class EventEntriesInfo implements CacheConsistencyViolationEvent.EntriesInfo {
/** Mapping. */
final Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> mapping = new HashMap<>();

/** Partition. */
final int partition;

/**
* @param partition Partition.
*/
public EventEntriesInfo(int partition) {
this.partition = partition;
}

/** {@inheritDoc} */
@Override public Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> getMapping() {
return mapping;
}

/** {@inheritDoc} */
@Override public int partition() {
return partition;
}
}

/**
*
*/
Expand Down Expand Up @@ -440,4 +502,27 @@ public EventEntryInfo(Object val, CacheEntryVersion ver, boolean primary, boolea
return correct;
}
}

/**
*
*/
protected static final class ByteArrayWrapper {
/** Array. */
final byte[] arr;

/** */
public ByteArrayWrapper(byte[] arr) {
this.arr = arr;
}

/** */
@Override public boolean equals(Object o) {
return Arrays.equals(arr, ((ByteArrayWrapper)o).arr);
}

/** */
@Override public int hashCode() {
return Arrays.hashCode(arr);
}
}
}
Loading

0 comments on commit 6b0637e

Please sign in to comment.