From 8b3f6eb7f1727469cde1269bffb1cdc618f56f9d Mon Sep 17 00:00:00 2001 From: Nikolay Date: Tue, 15 Mar 2022 23:42:32 +0300 Subject: [PATCH] IGNITE-15117 CDC for in-memory caches (#9345) --- .../DataRegionConfiguration.java | 29 ++ .../DataStorageConfiguration.java | 28 -- .../apache/ignite/internal/cdc/CdcMain.java | 17 +- .../cache/CacheAffinitySharedManager.java | 26 +- .../processors/cache/CacheGroupContext.java | 29 +- .../processors/cache/ClusterCachesInfo.java | 9 +- .../processors/cache/GridCacheMapEntry.java | 48 ++-- .../processors/cache/GridCacheProcessor.java | 14 +- .../cache/GridCacheSharedContext.java | 22 +- .../processors/cache/GridCacheUtils.java | 49 ++++ .../processors/cache/WalStateManager.java | 4 +- .../cache/binary/BinaryMetadataFileStore.java | 30 +- .../GridDistributedTxRemoteAdapter.java | 8 +- .../preloader/GridDhtPartitionDemander.java | 12 +- .../dht/topology/GridDhtLocalPartition.java | 4 +- .../GridCacheDatabaseSharedManager.java | 32 +-- .../persistence/GridCacheOffheapManager.java | 4 +- .../IgniteCacheDatabaseSharedManager.java | 58 +++- .../filename/PdsFolderResolver.java | 2 +- .../wal/FileWriteAheadLogManager.java | 10 +- .../wal/reader/IgniteWalIteratorFactory.java | 2 +- ...aloneIgniteCacheDatabaseSharedManager.java | 6 + .../transactions/IgniteTxLocalAdapter.java | 7 +- .../apache/ignite/cdc/AbstractCdcTest.java | 27 +- .../ignite/cdc/CdcCacheVersionTest.java | 37 ++- .../org/apache/ignite/cdc/CdcSelfTest.java | 143 +++++++--- .../org/apache/ignite/cdc/WalForCdcTest.java | 257 ++++++++++++++++++ .../checkpoint/LightweightCheckpointTest.java | 2 +- .../IgniteWalIteratorSwitchSegmentTest.java | 23 +- .../pagemem/BPlusTreePageMemoryImplTest.java | 5 +- .../BPlusTreeReuseListPageMemoryImplTest.java | 5 +- .../IndexStoragePageMemoryImplTest.java | 5 +- .../pagemem/PageMemoryImplNoLoadTest.java | 5 +- .../pagemem/PageMemoryImplTest.java | 6 +- .../hashmap/GridCacheTestContext.java | 2 +- .../testsuites/IgnitePdsTestSuite2.java | 2 + .../ignite/internal/cdc/SqlCdcTest.java | 18 +- .../stat/StatisticsStorageRestartTest.java | 7 +- .../query/stat/StatisticsStorageUnitTest.java | 7 +- .../test/config/cdc/correct-cdc-config.xml | 2 +- 40 files changed, 769 insertions(+), 234 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataRegionConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataRegionConfiguration.java index 2865bab48ea1e..2279ba8ce1fbb 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DataRegionConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataRegionConfiguration.java @@ -20,6 +20,7 @@ import org.apache.ignite.DataRegionMetrics; import org.apache.ignite.internal.mem.IgniteOutOfMemoryException; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteExperimental; import org.apache.ignite.mem.MemoryAllocator; import org.apache.ignite.mxbean.DataRegionMetricsMXBean; import org.apache.ignite.mxbean.MetricsMxBean; @@ -156,6 +157,10 @@ public final class DataRegionConfiguration implements Serializable { /** Memory allocator. */ @Nullable private MemoryAllocator memoryAllocator = null; + /** Change Data Capture enabled flag. */ + @IgniteExperimental + private boolean cdcEnabled; + /** * Gets data region name. * @@ -556,6 +561,30 @@ public DataRegionConfiguration setWarmUpConfiguration(@Nullable WarmUpConfigurat return warmUpCfg; } + /** + * Sets flag indicating whether CDC enabled. + * + * @param cdcEnabled CDC enabled flag. + * @return {@code this} for chaining. + */ + @IgniteExperimental + public DataRegionConfiguration setCdcEnabled(boolean cdcEnabled) { + this.cdcEnabled = cdcEnabled; + + return this; + } + + /** + * Gets flag indicating whether CDC is enabled. + * Default value is {@code false}. + * + * @return CDC enabled flag. + */ + @IgniteExperimental + public boolean isCdcEnabled() { + return cdcEnabled; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DataRegionConfiguration.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java index 28d9d7ea34701..ad5b306db96ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java @@ -244,10 +244,6 @@ public class DataStorageConfiguration implements Serializable { @IgniteExperimental private String cdcWalPath = DFLT_WAL_CDC_PATH; - /** Change Data Capture enabled flag. */ - @IgniteExperimental - private boolean cdcEnabled; - /** Metrics enabled flag. */ private boolean metricsEnabled = DFLT_METRICS_ENABLED; @@ -800,30 +796,6 @@ public DataStorageConfiguration setCdcWalPath(String cdcWalPath) { return this; } - /** - * Sets flag indicating whether CDC enabled. - * - * @param cdcEnabled CDC enabled flag. - * @return {@code this} for chaining. - */ - @IgniteExperimental - public DataStorageConfiguration setCdcEnabled(boolean cdcEnabled) { - this.cdcEnabled = cdcEnabled; - - return this; - } - - /** - * Gets flag indicating whether CDC is enabled. - * Default value is {@code false}. - * - * @return Metrics enabled flag. - */ - @IgniteExperimental - public boolean isCdcEnabled() { - return cdcEnabled; - } - /** * Gets flag indicating whether persistence metrics collection is enabled. * Default value is {@link #DFLT_METRICS_ENABLED}. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index 84ba5cfe92cc7..c466834d4c0bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -35,6 +35,7 @@ import org.apache.ignite.cdc.CdcConfiguration; import org.apache.ignite.cdc.CdcConsumer; import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.GridComponent; @@ -74,7 +75,7 @@ * * Ignite node should be explicitly configured for using {@link CdcMain}. *
    - *
  1. Set {@link DataStorageConfiguration#setCdcEnabled(boolean)} to true.
  2. + *
  3. Set {@link DataRegionConfiguration#setCdcEnabled(boolean)} to true.
  4. *
  5. Optional: Set {@link DataStorageConfiguration#setCdcWalPath(String)} to path to the directory * to store WAL segments for CDC.
  6. *
  7. Optional: Set {@link DataStorageConfiguration#setWalForceArchiveTimeout(long)} to configure timeout for @@ -101,7 +102,7 @@ *
  8. Infinitely waits for new available segment and processes it.
  9. *
* - * @see DataStorageConfiguration#setCdcEnabled(boolean) + * @see DataRegionConfiguration#setCdcEnabled(boolean) * @see DataStorageConfiguration#setCdcWalPath(String) * @see DataStorageConfiguration#setWalForceArchiveTimeout(long) * @see CdcCommandLineStartup @@ -110,7 +111,7 @@ */ public class CdcMain implements Runnable { /** */ - public static final String ERR_MSG = "Persistence disabled. Capture Data Change can't run!"; + public static final String ERR_MSG = "Persistence and CDC disabled. Capture Data Change can't run!"; /** State dir. */ public static final String STATE_DIR = "state"; @@ -239,7 +240,7 @@ public CdcMain( public void runX() throws Exception { ackAsciiLogo(); - if (!CU.isPersistenceEnabled(igniteCfg)) { + if (!CU.isCdcEnabled(igniteCfg)) { log.error(ERR_MSG); throw new IllegalArgumentException(ERR_MSG); @@ -355,7 +356,7 @@ private CdcFileLockHolder lockPds() throws IgniteCheckedException { new PdsFolderResolver<>(igniteCfg, log, igniteCfg.getConsistentId(), this::tryLock).resolve(); if (settings == null) { - throw new IgniteException("Can't find folder to read WAL segments from based on provided configuration! " + + throw new IgniteException("Can't find the folder to read WAL segments from! " + "[workDir=" + igniteCfg.getWorkDirectory() + ", consistentId=" + igniteCfg.getConsistentId() + ']'); } @@ -524,12 +525,6 @@ private void consumeSegment(Path segment) { * @return Lock or null if lock failed. */ private CdcFileLockHolder tryLock(File dbStoreDirWithSubdirectory) { - if (!dbStoreDirWithSubdirectory.exists()) { - log.warning("DB store directory not exists [dir=" + dbStoreDirWithSubdirectory + ']'); - - return null; - } - File cdcRoot = new File(igniteCfg.getDataStorageConfiguration().getCdcWalPath()); if (!cdcRoot.isAbsolute()) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 68c7d1d7917b2..9f4d8514c0a3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -27,7 +26,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -2902,7 +2900,7 @@ void validateCacheGroup(CacheGroupDescriptor grpDesc) { if (partsNum == 0) return; - DataRegionConfiguration drCfg = findDataRegion(dsCfg, grpCfg.getDataRegionName()); + DataRegionConfiguration drCfg = CU.findDataRegion(dsCfg, grpCfg.getDataRegionName()); if (drCfg == null) return; @@ -2941,27 +2939,5 @@ private String buildWarningMessage(CacheGroupDescriptor grpDesc, U.sizeInMegabytes(drCfg.getMaxSize()) ); } - - /** - * Finds data region by name. - * - * @param dsCfg Data storage configuration. - * @param drName Data region name. - * - * @return Found data region. - */ - @Nullable private DataRegionConfiguration findDataRegion(DataStorageConfiguration dsCfg, String drName) { - if (dsCfg.getDataRegionConfigurations() == null || drName == null) - return dsCfg.getDefaultDataRegionConfiguration(); - - if (dsCfg.getDefaultDataRegionConfiguration().getName().equals(drName)) - return dsCfg.getDefaultDataRegionConfiguration(); - - Optional cfgOpt = Arrays.stream(dsCfg.getDataRegionConfigurations()) - .filter(drCfg -> drCfg.getName().equals(drName)) - .findFirst(); - - return cfgOpt.isPresent() ? cfgOpt.get() : null; - } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index ddb7931f90e62..9bd805100c27b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCluster; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; @@ -40,6 +41,8 @@ import org.apache.ignite.internal.metric.IoStatisticsHolderCache; import org.apache.ignite.internal.metric.IoStatisticsHolderIndex; import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; @@ -1095,6 +1098,19 @@ public boolean persistenceEnabled() { return persistenceEnabled; } + /** + * @return {@code True} if {@link DataRecord} should be loged in the WAL. + */ + public boolean logDataRecords() { + return walEnabled() && (persistenceEnabled || cdcEnabled()); + } + + /** @return {@code True} if CDC enabled. */ + public boolean cdcEnabled() { + // Data region is null for client and non affinity nodes. + return dataRegion != null && dataRegion.config().isCdcEnabled(); + } + /** * @param nodeId Node ID. * @param req Request. @@ -1204,7 +1220,11 @@ public void onReconnected() { } /** - * WAL enabled flag. + * Value returned by this method can be changed runtime by the user or during rebalance. + * + * @return WAL enabled flag. + * @see IgniteCluster#disableWal(String) + * @see IgniteCluster#enableWal(String) */ public boolean walEnabled() { return localWalEnabled && globalWalEnabled; @@ -1321,6 +1341,13 @@ public void removeIOStatistic(boolean destroy) { ctx.kernalContext().metric().remove(statHolderIdx.metricRegistryName(), destroy); } + /** + * @return Write ahead log manager. + */ + public IgniteWriteAheadLogManager wal() { + return ctx.wal(cdcEnabled()); + } + /** * @param ccfg Cache configuration. * @param plugins Ignite plugin processor. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 910a8e66b5484..15b488319fc28 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -43,6 +43,7 @@ import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.GridCachePluginContext; @@ -2306,13 +2307,19 @@ private CacheGroupDescriptor registerCacheGroup( Map caches = Collections.singletonMap(startedCacheCfg.getName(), cacheId); boolean persistent = resolvePersistentFlag(exchActions, startedCacheCfg); - boolean walGloballyEnabled = false; + boolean walGloballyEnabled; // client nodes cannot read wal enabled/disabled status so they should use default one if (ctx.clientNode()) walGloballyEnabled = persistent; else if (persistent) walGloballyEnabled = ctx.cache().context().database().walEnabled(grpId, false); + else { + DataRegionConfiguration drCfg = + CU.findDataRegion(ctx.config().getDataStorageConfiguration(), startedCacheCfg.getDataRegionName()); + + walGloballyEnabled = drCfg != null && drCfg.isCdcEnabled(); + } CacheGroupDescriptor grpDesc = new CacheGroupDescriptor( startedCacheCfg, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 951b41c3243f6..9131eb4e91547 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1213,8 +1213,8 @@ else if (res.resultType() == ResultType.REMOVED_NOT_NULL) { counters.accumulateSizeDelta(cctx.cacheId(), partition(), -1); } - if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) { - logPtr = cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry( + if (cctx.group().logDataRecords()) { + logPtr = cctx.group().wal().log(new MvccDataRecord(new MvccDataEntry( cctx.cacheId(), key, val, @@ -1342,7 +1342,7 @@ else if (res.resultType() == ResultType.LOCKED) { counters.accumulateSizeDelta(cctx.cacheId(), partition(), -1); } - if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) + if (cctx.group().logDataRecords()) logPtr = logMvccUpdate(tx, null, 0, 0L, mvccVer); update(null, 0, 0, newVer, true); @@ -1561,7 +1561,7 @@ else if (interceptorVal != val0) updateCntr0 = nextPartitionCounter(tx, updateCntr); - if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled()) + if (tx != null && cctx.group().logDataRecords()) logPtr = logTxUpdate(tx, val, addConflictVersion(tx.writeVersion(), newVer), expireTime, updateCntr0); update(val, expireTime, ttl, newVer, true); @@ -1786,7 +1786,7 @@ protected Object keyValue(boolean cpy) { updateCntr0 = nextPartitionCounter(tx, updateCntr); - if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled()) + if (tx != null && cctx.group().logDataRecords()) logPtr = logTxUpdate(tx, null, addConflictVersion(tx.writeVersion(), newVer), 0, updateCntr0); drReplicate(drType, null, newVer, topVer); @@ -3350,7 +3350,7 @@ private boolean skipInterceptor(@Nullable GridCacheVersion explicitVer) { try { checkObsolete(); - boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled() && cctx.group().walEnabled(); + boolean walEnabled = !cctx.isNear() && cctx.group().logDataRecords(); long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; @@ -3466,7 +3466,7 @@ else if (deletedUnlocked()) if (walEnabled) { if (cctx.mvccEnabled()) { - cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry( + cctx.group().wal().log(new MvccDataRecord(new MvccDataEntry( cctx.cacheId(), key, val, @@ -3479,7 +3479,7 @@ else if (deletedUnlocked()) mvccVer == null ? MvccUtils.INITIAL_VERSION : mvccVer ))); } else { - cctx.shared().wal().log(new DataRecord(new DataEntry( + cctx.group().wal().log(new DataRecord(new DataEntry( cctx.cacheId(), key, val, @@ -4330,8 +4330,8 @@ protected void logUpdate( assert cctx.atomic(); try { - if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) - cctx.shared().wal().log(new DataRecord(new DataEntry( + if (cctx.group().logDataRecords()) + cctx.group().wal().log(new DataRecord(new DataEntry( cctx.cacheId(), key, val, @@ -4373,7 +4373,7 @@ protected WALPointer logTxUpdate( else op = this.val == null ? GridCacheOperation.CREATE : UPDATE; - return cctx.shared().wal().log(new DataRecord(new DataEntry( + return cctx.group().wal().log(new DataRecord(new DataEntry( cctx.cacheId(), key, val, @@ -4410,7 +4410,7 @@ protected WALPointer logMvccUpdate(IgniteInternalTx tx, CacheObject val, long ex else op = this.val == null ? GridCacheOperation.CREATE : UPDATE; - return cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry( + return cctx.group().wal().log(new MvccDataRecord(new MvccDataEntry( cctx.cacheId(), key, val, @@ -5310,7 +5310,7 @@ else if (res.resultType() == ResultType.LOCKED) { counters.accumulateSizeDelta(cctx.cacheId(), entry.partition(), -1); } - if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) + if (cctx.group().logDataRecords()) entry.logMvccUpdate(tx, null, 0, 0, mvccVer); entry.update(null, 0, 0, newVer, true); @@ -5658,8 +5658,8 @@ else if (res.resultType() == ResultType.REMOVED_NOT_NULL) { counters.accumulateSizeDelta(cctx.cacheId(), entry.partition(), -1); } - if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) - logPtr = cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry( + if (cctx.group().logDataRecords()) + logPtr = cctx.group().wal().log(new MvccDataRecord(new MvccDataEntry( cctx.cacheId(), entry.key(), val, @@ -6855,9 +6855,9 @@ private IgniteBiTuple runEntryProcessor(CacheInvokeEntry walEntries = walEnabled ? new ArrayList<>(entries.size() + 1) : Collections.EMPTY_LIST; + List walEntries = logDataRecs ? new ArrayList<>(entries.size() + 1) : Collections.EMPTY_LIST; // Apply updates in reverse order (oldest is last) until any of previous versions found. // If we found prev version then it means history has been actualized either with previous update @@ -6880,7 +6880,7 @@ private IgniteBiTuple runEntryProcessor(CacheInvokeEntry runEntryProcessor(CacheInvokeEntry runEntryProcessor(CacheInvokeEntry runEntryProcessor(CacheInvokeEntry runEntryProcessor(CacheInvokeEntry { /** Write ahead log manager. {@code Null} if persistence is not enabled. */ @Nullable private IgniteWriteAheadLogManager walMgr; + /** Write ahead log manager for CDC. {@code Null} if persistence AND CDC is not enabled. */ + @Nullable private IgniteWriteAheadLogManager cdcWalMgr; + /** Write ahead log state manager. */ private WalStateManager walStateMgr; @@ -245,6 +248,7 @@ public GridCacheSharedContext( verMgr, mvccMgr, pageStoreMgr, + CU.isPersistenceEnabled(kernalCtx.config()) ? walMgr : null, walMgr, walStateMgr, dbMgr, @@ -427,6 +431,7 @@ void onReconnected(boolean active) throws IgniteCheckedException { mvccMgr, pageStoreMgr, walMgr, + cdcWalMgr, walStateMgr, dbMgr, snapshotMgr, @@ -477,6 +482,7 @@ private void setManagers( GridCacheMvccManager mvccMgr, @Nullable IgnitePageStoreManager pageStoreMgr, IgniteWriteAheadLogManager walMgr, + IgniteWriteAheadLogManager cdcWalMgr, WalStateManager walStateMgr, IgniteCacheDatabaseSharedManager dbMgr, IgniteSnapshotManager snapshotMgr, @@ -497,6 +503,10 @@ private void setManagers( this.txMgr = add(mgrs, txMgr); this.pageStoreMgr = add(mgrs, pageStoreMgr); this.walMgr = add(mgrs, walMgr); + + assert walMgr == null || walMgr == cdcWalMgr; + + this.cdcWalMgr = walMgr == null ? add(mgrs, cdcWalMgr) : cdcWalMgr; this.walStateMgr = add(mgrs, walStateMgr); this.dbMgr = add(mgrs, dbMgr); this.snapshotMgr = add(mgrs, snapshotMgr); @@ -761,7 +771,17 @@ public IgniteSnapshotManager snapshotMgr() { * @return Write ahead log manager. */ @Nullable public IgniteWriteAheadLogManager wal() { - return walMgr; + return wal(false); + } + + /** + * @return Write ahead log manager. + * @param forCdc If {@code true} then wal queried to log CDC related stuff. + */ + @Nullable public IgniteWriteAheadLogManager wal(boolean forCdc) { + assert !forCdc || cdcWalMgr != null; + + return walMgr != null ? walMgr : (forCdc ? cdcWalMgr : null); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index d0ebb8ed9a6fb..d0be19c626eb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1935,6 +1935,27 @@ public static boolean isPersistentCache(CacheConfiguration ccfg, DataStorageConf return false; } + /** + * Finds data region by name. + * + * @param dsCfg Data storage configuration. + * @param drName Data region name. + * + * @return Found data region. + */ + @Nullable public static DataRegionConfiguration findDataRegion(DataStorageConfiguration dsCfg, String drName) { + if (dsCfg.getDataRegionConfigurations() == null || drName == null) + return dsCfg.getDefaultDataRegionConfiguration(); + + if (dsCfg.getDefaultDataRegionConfiguration().getName().equals(drName)) + return dsCfg.getDefaultDataRegionConfiguration(); + + return Arrays.stream(dsCfg.getDataRegionConfigurations()) + .filter(drCfg -> drCfg.getName().equals(drName)) + .findFirst() + .orElse(null); + } + /** * @param nodes Nodes to check. * @param marshaller JdkMarshaller @@ -2069,6 +2090,34 @@ public static boolean isPersistenceEnabled(DataStorageConfiguration cfg) { return false; } + /** + * @param cfg Ignite configuration. + * @return {@code true} if CDC enabled. + */ + public static boolean isCdcEnabled(IgniteConfiguration cfg) { + DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); + + if (dsCfg == null) + return false; + + DataRegionConfiguration dfltReg = dsCfg.getDefaultDataRegionConfiguration(); + + if (dfltReg != null && dfltReg.isCdcEnabled()) + return true; + + DataRegionConfiguration[] regCfgs = dsCfg.getDataRegionConfigurations(); + + if (regCfgs == null) + return false; + + for (DataRegionConfiguration regCfg : regCfgs) { + if (regCfg.isCdcEnabled()) + return true; + } + + return false; + } + /** * @param pageSize Page size. * @param encSpi Encryption spi. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java index 6f7205dba8504..4ecd871dc8131 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java @@ -437,8 +437,8 @@ public void disableGroupDurabilityForPreloading(GridDhtPartitionsExchangeFuture Collection grpContexts = cctx.cache().cacheGroups(); for (CacheGroupContext grp : grpContexts) { - if (grp.isLocal() || !grp.affinityNode() || !grp.persistenceEnabled() || !grp.localWalEnabled() - || !grp.rebalanceEnabled() || !grp.shared().isRebalanceEnabled()) + if (grp.isLocal() || !grp.affinityNode() || !(grp.persistenceEnabled() || grp.cdcEnabled()) + || !grp.localWalEnabled() || !grp.rebalanceEnabled() || !grp.shared().isRebalanceEnabled()) continue; List locParts = grp.topology().localPartitions(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java index 1c9540c0743fe..18eb9adf33513 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java @@ -63,7 +63,7 @@ class BinaryMetadataFileStore { private final GridKernalContext ctx; /** */ - private final boolean isPersistenceEnabled; + private final boolean enabled; /** */ private FileIOFactory fileIOFactory; @@ -79,7 +79,7 @@ class BinaryMetadataFileStore { * @param ctx Context. * @param log Logger. * @param binaryMetadataFileStoreDir Path to binary metadata store configured by user, should include binary_meta - * and consistentId + * and consistentId. */ BinaryMetadataFileStore( final ConcurrentMap metadataLocCache, @@ -89,10 +89,12 @@ class BinaryMetadataFileStore { ) throws IgniteCheckedException { this.metadataLocCache = metadataLocCache; this.ctx = ctx; - this.isPersistenceEnabled = CU.isPersistenceEnabled(ctx.config()); + + enabled = CU.isPersistenceEnabled(ctx.config()) || CU.isCdcEnabled(ctx.config()); + this.log = log; - if (!isPersistenceEnabled) + if (!enabled) return; fileIOFactory = ctx.config().getDataStorageConfiguration().getFileIOFactory(); @@ -115,7 +117,7 @@ class BinaryMetadataFileStore { * Starts worker thread for async writing of binary metadata. */ void start() throws IgniteCheckedException { - if (!isPersistenceEnabled) + if (!enabled) return; U.ensureDirectory(metadataDir, "directory for serialized binary metadata", log); @@ -136,7 +138,7 @@ void stop() { * @param binMeta Binary metadata to be written to disk. */ void writeMetadata(BinaryMetadata binMeta) { - if (!isPersistenceEnabled) + if (!enabled) return; try { @@ -172,7 +174,7 @@ void writeMetadata(BinaryMetadata binMeta) { * @param typeId Type identifier. */ private void removeMeta(int typeId) { - if (!isPersistenceEnabled) + if (!enabled) return; File file = new File(metadataDir, typeId + ".bin"); @@ -196,7 +198,7 @@ private void removeMeta(int typeId) { * Restores metadata on startup of {@link CacheObjectBinaryProcessorImpl} but before starting discovery. */ void restoreMetadata() { - if (!isPersistenceEnabled) + if (!enabled) return; for (File file : metadataDir.listFiles()) { @@ -256,7 +258,7 @@ private BinaryMetadata readMetadata(int typeId) { * */ void prepareMetadataWriting(BinaryMetadata meta, int typeVer) { - if (!isPersistenceEnabled) + if (!enabled) return; writer.prepareWriteFuture(meta, typeVer); @@ -267,7 +269,7 @@ void prepareMetadataWriting(BinaryMetadata meta, int typeVer) { * @param typeVer Type version. */ void writeMetadataAsync(int typeId, int typeVer) { - if (!isPersistenceEnabled) + if (!enabled) return; writer.startTaskAsync(typeId, typeVer); @@ -277,7 +279,7 @@ void writeMetadataAsync(int typeId, int typeVer) { * @param typeId Type ID. */ public void removeMetadataAsync(int typeId) { - if (!isPersistenceEnabled) + if (!enabled) return; writer.startTaskAsync(typeId, BinaryMetadataTransport.REMOVED_VERSION); @@ -294,7 +296,7 @@ public void removeMetadataAsync(int typeId) { * @throws IgniteCheckedException */ void waitForWriteCompletion(int typeId, int typeVer) throws IgniteCheckedException { - if (!isPersistenceEnabled) + if (!enabled) return; writer.waitForWriteCompletion(typeId, typeVer); @@ -305,7 +307,7 @@ void waitForWriteCompletion(int typeId, int typeVer) throws IgniteCheckedExcepti * @param typeVer Type version. */ void finishWrite(int typeId, int typeVer) { - if (!isPersistenceEnabled) + if (!enabled) return; writer.finishWriteFuture(typeId, typeVer, null); @@ -354,7 +356,7 @@ private void fixLegacyFolder(String consistendId) throws IgniteCheckedException * @param typeId Type ID. */ void prepareMetadataRemove(int typeId) { - if (!isPersistenceEnabled) + if (!enabled) return; writer.cancelTasksForType(typeId); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 72be77907a8eb..949c44748a722 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -616,7 +616,7 @@ else if (conflictCtx.isMerge()) { GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null; - if (!near() && cacheCtx.group().persistenceEnabled() && cacheCtx.group().walEnabled() && + if (!near() && cacheCtx.group().logDataRecords() && op != NOOP && op != RELOAD && (op != READ || cctx.snapshot().needTxReadLogging())) { if (dataEntries == null) dataEntries = new ArrayList<>(entries.size()); @@ -801,17 +801,17 @@ else if (op == READ) { cctx.mvccCaching().onTxFinished(this, true); - if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) { + if (!near() && !F.isEmpty(dataEntries) && cctx.wal(true) != null) { // Set new update counters for data entries received from persisted tx entries. List entriesWithCounters = dataEntries.stream() .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter())) .collect(Collectors.toList()); - ptr = cctx.wal().log(new DataRecord(entriesWithCounters)); + ptr = cctx.wal(true).log(new DataRecord(entriesWithCounters)); } if (ptr != null) - cctx.wal().flush(ptr, false); + cctx.wal(true).flush(ptr, false); } catch (Throwable ex) { state(UNKNOWN); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index adf981684e5f3..6c66fab12c1a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -1692,7 +1692,14 @@ private void cleanupRemoteContexts(UUID nodeId) { * @param own {@code True} to own partition if possible. */ private synchronized void partitionDone(UUID nodeId, int p, boolean own) { - if (own && grp.localWalEnabled()) + // Partitions own one by one for in-memory caches. + // For persistent caches all partitions owns in batch by the end of rebalance + // (see `ctx.exchange().finishPreloading(topVer, grp.groupId(), rebalanceId);`) + // `localWalEnabled` always `true` for in-memory cache group (see `CacheGroupContext` constructor). + // If CDC enabled for cache group then `localWalEnabled` is `false` during rebalance + // to avoid unnecessary `DataRecord` logging. So we need additionally check for in-memory + // (persistenceEnabled=false) to decide should we own partition right now. + if (own && (grp.localWalEnabled() || !grp.persistenceEnabled())) grp.topology().own(grp.topology().localPartition(p)); if (isDone()) @@ -1784,6 +1791,9 @@ private void checkIsDone(boolean cancelled) { }); } else { + if (grp.cdcEnabled() && !grp.localWalEnabled() && !cancelled) + grp.localWalEnabled(true, false); + onDone(!cancelled); if (log.isDebugEnabled()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java index 3dd919ead0fdf..d6b6206aeb3d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java @@ -229,7 +229,7 @@ public GridDhtLocalPartition( store = grp.offheap().createCacheDataStore(id); // Log partition creation for further crash recovery purposes. - if (grp.walEnabled() && !recovery) + if (grp.persistenceEnabled() && grp.walEnabled() && !recovery) ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, state(), 0)); // Inject row cache cleaner on store creation. @@ -1001,7 +1001,7 @@ protected long clearAll(EvictionContext evictionCtx) throws NodeStoppingExceptio // and repeat clearing on applying updates from WAL if the record was read. // It's need for atomic cache only. Transactional cache start a rebalance due to outdated counter in this case, // because atomic and transactional caches use different partition counters implementation. - if (state() == MOVING && !recoveryMode && grp.walEnabled() && + if (state() == MOVING && !recoveryMode && grp.persistenceEnabled() && grp.walEnabled() && grp.config().getAtomicityMode() == ATOMIC) ctx.wal().log(new PartitionClearingStartRecord(id, grp.groupId(), order)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 6fbf521e7f139..cfe92e259b8eb 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -335,9 +335,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Snapshot manager. */ private IgniteCacheSnapshotManager snapshotMgr; - /** */ - private final DataStorageMetricsImpl persStoreMetrics; - /** * MetaStorage instance. Value {@code null} means storage not initialized yet. * Guarded by {@link GridCacheDatabaseSharedManager#checkpointReadLock()} @@ -384,6 +381,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @param ctx Kernal context. */ public GridCacheDatabaseSharedManager(GridKernalContext ctx) { + super(ctx); + this.ctx = ctx; IgniteConfiguration cfg = ctx.config(); @@ -393,13 +392,6 @@ public GridCacheDatabaseSharedManager(GridKernalContext ctx) { assert persistenceCfg != null; lockWaitTime = persistenceCfg.getLockWaitTime(); - - persStoreMetrics = new DataStorageMetricsImpl( - ctx.metric(), - persistenceCfg.isMetricsEnabled(), - persistenceCfg.getMetricsRateTimeInterval(), - persistenceCfg.getMetricsSubIntervalCount() - ); } /** @@ -493,7 +485,7 @@ public IgniteInternalFuture enableCheckpoints(boolean enable) { List regionMetrics = dataRegionMap.values().stream() .map(DataRegion::metrics) .collect(Collectors.toList()); - persStoreMetrics.regionMetrics(regionMetrics); + dsMetrics.regionMetrics(regionMetrics); } /** @@ -584,7 +576,7 @@ private DataRegionConfiguration createDefragmentationMappingRegionConfig(long re this::getPageMemoryForCacheGroup, resolveThrottlingPolicy(), snapshotMgr, - persistentStoreMetricsImpl(), + dataStorageMetricsImpl(), kernalCtx.longJvmPauseDetector(), kernalCtx.failure(), kernalCtx.cache(), @@ -600,7 +592,7 @@ private DataRegionConfiguration createDefragmentationMappingRegionConfig(long re cleanupTempCheckpointDirectory(); - persStoreMetrics.wal(cctx.wal()); + dsMetrics.wal(cctx.wal()); } } @@ -830,7 +822,7 @@ private void prepareCacheDefragmentation(List cacheNames) throws IgniteC this::getPageMemoryForCacheGroup, resolveThrottlingPolicy(), snapshotMgr, - persistentStoreMetricsImpl(), + dataStorageMetricsImpl(), kernalCtx.longJvmPauseDetector(), kernalCtx.failure(), kernalCtx.cache() @@ -965,7 +957,7 @@ private void readMetastore() throws IgniteCheckedException { cctx.kernalContext().config(), MBEAN_GROUP, MBEAN_NAME, - persStoreMetrics, + dsMetrics, DataStorageMetricsMXBean.class ); } @@ -3175,14 +3167,12 @@ public NodeFileLockHolder(String rootDir, @NotNull GridKernalContext ctx, Ignite /** {@inheritDoc} */ @Override public DataStorageMetrics persistentStoreMetrics() { - return new DataStorageMetricsSnapshot(persStoreMetrics); + return new DataStorageMetricsSnapshot(dsMetrics); } - /** - * - */ - public DataStorageMetricsImpl persistentStoreMetricsImpl() { - return persStoreMetrics; + /** {@inheritDoc} */ + @Override public DataStorageMetricsImpl dataStorageMetricsImpl() { + return dsMetrics; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 8af3810d3d514..eb6d0e082afaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -232,7 +232,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple ctx.diagnostic().pageLockTracker() ); - persStoreMetrics = databaseSharedManager.persistentStoreMetricsImpl(); + persStoreMetrics = databaseSharedManager.dataStorageMetricsImpl(); databaseSharedManager.addCheckpointListener(this, grp.dataRegion()); } @@ -1041,7 +1041,7 @@ public void destroyPartitionStore(int partId) throws IgniteCheckedException { int tag = pageMemory.invalidate(grp.groupId(), partId); - if (grp.walEnabled()) + if (grp.persistenceEnabled() && grp.walEnabled()) ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), partId)); ctx.pageStore().truncate(grp.groupId(), partId, tag); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index 0742afd064c23..7caaf200a6d41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl; +import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -99,7 +100,10 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE; import static org.apache.ignite.IgniteSystemProperties.getDouble; import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA_REG_DEFAULT_NAME; +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_METRICS_ENABLED; import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE; +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_RATE_TIME_INTERVAL_MILLIS; +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_SUB_INTERVALS; import static org.apache.ignite.configuration.DataStorageConfiguration.HALF_MAX_WAL_ARCHIVE_SIZE; import static org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE; import static org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog.TX_LOG_CACHE_NAME; @@ -163,6 +167,38 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** First eviction was warned flag. */ private volatile boolean firstEvictWarn; + /** Data storege metrics. */ + protected final DataStorageMetricsImpl dsMetrics; + + /** + * @param ctx Kernal context. + */ + public IgniteCacheDatabaseSharedManager(GridKernalContext ctx) { + if (!CU.isCdcEnabled(ctx.config()) && !CU.isPersistenceEnabled(ctx.config())) { + dsMetrics = null; + + return; + } + + DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration(); + + if (dsCfg != null) { + dsMetrics = new DataStorageMetricsImpl( + ctx.metric(), + dsCfg.isMetricsEnabled(), + dsCfg.getMetricsRateTimeInterval(), + dsCfg.getMetricsSubIntervalCount() + ); + } + else { + dsMetrics = new DataStorageMetricsImpl( + ctx.metric(), + DFLT_METRICS_ENABLED, + DFLT_RATE_TIME_INTERVAL_MILLIS, + DFLT_SUB_INTERVALS + ); + } + } /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { @@ -903,6 +939,13 @@ public DataStorageMetrics persistentStoreMetrics() { return null; } + /** + * @return Data storage metrics implementation. + */ + public DataStorageMetricsImpl dataStorageMetricsImpl() { + return dsMetrics; + } + /** * @param dataRegionName Name of {@link DataRegion} to obtain {@link DataRegionMetrics} for. * @return {@link DataRegionMetrics} snapshot for specified {@link DataRegion} or {@code null} if @@ -1093,7 +1136,20 @@ public void beforeExchange(GridDhtPartitionsExchangeFuture discoEvt) throws Igni * @throws IgniteCheckedException If fails. */ public void startMemoryRestore(GridKernalContext kctx, TimeBag startTimer) throws IgniteCheckedException { - // No-op. + if (!CU.isCdcEnabled(kctx.config()) || kctx.clientNode()) + return; + + WALIterator iter = cctx.wal(true).replay(null, (type, ptr) -> true); + + while (iter.hasNext()) + iter.next(); + + WALPointer ptr = iter.lastRead().orElse(null); + + if (ptr != null) + ptr = ptr.next(); + + cctx.wal(true).resumeLogging(ptr); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/PdsFolderResolver.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/PdsFolderResolver.java index 8d53701f812d5..6c2d229f5b774 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/PdsFolderResolver.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/PdsFolderResolver.java @@ -155,7 +155,7 @@ public PdsFolderSettings resolve() throws IgniteCheckedException { final File pstStoreBasePath = resolvePersistentStoreBasePath(clientMode); - if (!CU.isPersistenceEnabled(cfg)) + if (!CU.isPersistenceEnabled(cfg) && !CU.isCdcEnabled(cfg)) return compatibleResolve(pstStoreBasePath, consistentId); if (clientMode) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 33c013538ac0a..f64b11ac8ea06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -83,6 +83,7 @@ import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; @@ -117,6 +118,7 @@ import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiPredicate; @@ -475,7 +477,7 @@ public void setFileIOFactory(FileIOFactory ioFactory) { "write ahead log archive directory" ); - if (dsCfg.isCdcEnabled()) { + if (CU.isCdcEnabled(igCfg)) { walCdcDir = initDirectory( dsCfg.getCdcWalPath(), DataStorageConfiguration.DFLT_WAL_CDC_PATH, @@ -488,9 +490,9 @@ public void setFileIOFactory(FileIOFactory ioFactory) { serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVer); - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database(); + IgniteCacheDatabaseSharedManager dbMgr = cctx.database(); - metrics = dbMgr.persistentStoreMetricsImpl(); + metrics = dbMgr.dataStorageMetricsImpl(); if (metrics != null) { metrics.setWalSizeProvider(new CO() { @@ -2076,7 +2078,7 @@ public SegmentArchiveResult archiveSegment(long absIdx) throws StorageException Files.move(dstTmpFile.toPath(), dstFile.toPath()); - if (dsCfg.isCdcEnabled()) + if (walCdcDir != null) Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath()); if (mode != WALMode.NONE) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index 8e01bac67f639..191d3c9987cc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -397,7 +397,7 @@ private FileDescriptor readFileDescriptor(File file, FileIOFactory ioFactory) { iteratorParametersBuilder.marshallerMappingFileStoreDir ); - StandaloneIgniteCacheDatabaseSharedManager dbMgr = new StandaloneIgniteCacheDatabaseSharedManager(); + StandaloneIgniteCacheDatabaseSharedManager dbMgr = new StandaloneIgniteCacheDatabaseSharedManager(kernalCtx); dbMgr.setPageSize(iteratorParametersBuilder.pageSize); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java index 85a872453e61b..7c0ab2f273bcd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java @@ -17,12 +17,18 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.reader; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; /** * Fake implementation for publishing setter and for creation in standalone WAL reader tool */ class StandaloneIgniteCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedManager { + /** @param ctx Kernal context. */ + public StandaloneIgniteCacheDatabaseSharedManager(GridKernalContext ctx) { + super(ctx); + } + /** {@inheritDoc} */ @Override public void setPageSize(int pageSize) { super.setPageSize(pageSize); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 11e2d3def09d3..a0a1b03e366b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -847,9 +847,8 @@ else if (op == RELOAD) { else if (op == READ) { CacheGroupContext grp = cacheCtx.group(); - if (grp.persistenceEnabled() && grp.walEnabled() && - cctx.snapshot().needTxReadLogging()) { - ptr = cctx.wal().log(new DataRecord(new DataEntry( + if (grp.logDataRecords() && cctx.snapshot().needTxReadLogging()) { + ptr = grp.wal().log(new DataRecord(new DataEntry( cacheCtx.cacheId(), txEntry.key(), val, @@ -924,7 +923,7 @@ assert ownsLock(txEntry.cached()) : cctx.mvccCaching().onTxFinished(this, true); if (ptr != null) - cctx.wal().flush(ptr, false); + cctx.wal(true).flush(ptr, false); } catch (Throwable ex) { // We are about to initiate transaction rollback when tx has started to committing. diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java index 19bf6eeb8f590..3dbe4ff3a18b3 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java @@ -47,6 +47,7 @@ import org.apache.ignite.spi.metric.LongMetric; import org.apache.ignite.spi.metric.MetricExporterSpi; import org.apache.ignite.spi.metric.ObjectMetric; +import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -112,7 +113,7 @@ protected CdcMain createCdc( cdcCfg.setConsumer(cnsmr); cdcCfg.setKeepBinary(keepBinary()); - cdcCfg.setMetricExporterSpi(metricExporters()); + cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi()); return new CdcMain(cfg, null, cdcCfg) { @Override protected CdcConsumerState createState(Path stateDir) { @@ -217,22 +218,20 @@ protected GridAbsPredicate sizePredicate( /** */ protected void checkMetrics(CdcMain cdc, int expCnt) throws Exception { - if (metricExporters() != null) { - IgniteConfiguration cfg = getFieldValue(cdc, "igniteCfg"); + IgniteConfiguration cfg = getFieldValue(cdc, "igniteCfg"); - DynamicMBean jmxCdcReg = metricRegistry(cdcInstanceName(cfg.getIgniteInstanceName()), null, "cdc"); + DynamicMBean jmxCdcReg = metricRegistry(cdcInstanceName(cfg.getIgniteInstanceName()), null, "cdc"); - Function jmxVal = m -> { - try { - return jmxCdcReg.getAttribute(m); - } - catch (Exception e) { - throw new IgniteException(e); - } - }; + Function jmxVal = m -> { + try { + return jmxCdcReg.getAttribute(m); + } + catch (Exception e) { + throw new IgniteException(e); + } + }; - checkMetrics(expCnt, (Function)jmxVal, (Function)jmxVal); - } + checkMetrics(expCnt, (Function)jmxVal, (Function)jmxVal); MetricRegistry mreg = getFieldValue(cdc, "mreg"); diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java index 17d6ad1839776..aa33d3859b330 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java @@ -97,6 +97,12 @@ public class CdcCacheVersionTest extends AbstractCdcTest { /** */ public static final int KEY_TO_UPD = 42; + /** */ + public static final String NOT_CDC = "not-cdc"; + + /** */ + public static final String CDC = "cdc"; + /** */ @Parameterized.Parameter public CacheAtomicityMode atomicityMode; @@ -109,6 +115,10 @@ public class CdcCacheVersionTest extends AbstractCdcTest { @Parameterized.Parameter(2) public int gridCnt; + /** */ + @Parameterized.Parameter(3) + public boolean persistenceEnabled; + /** */ private final AtomicLong walRecCheckedCntr = new AtomicLong(); @@ -122,14 +132,15 @@ public class CdcCacheVersionTest extends AbstractCdcTest { private volatile Supplier conflictResolutionMgrSupplier; /** */ - @Parameterized.Parameters(name = "atomicity={0}, mode={1}, gridCnt={2}") + @Parameterized.Parameters(name = "atomicity={0}, mode={1}, gridCnt={2}, persistenceEnabled={3}") public static Collection parameters() { List params = new ArrayList<>(); for (CacheAtomicityMode atomicity : EnumSet.of(ATOMIC, TRANSACTIONAL)) for (CacheMode mode : EnumSet.of(PARTITIONED, REPLICATED)) for (int gridCnt : new int[] {1, 3}) - params.add(new Object[] {atomicity, mode, gridCnt}); + for (boolean persistenceEnabled : new boolean[] {false, true}) + params.add(new Object[] {atomicity, mode, gridCnt, persistenceEnabled}); return params; } @@ -139,9 +150,18 @@ public static Collection parameters() { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); cfg.setDataStorageConfiguration(new DataStorageConfiguration() - .setCdcEnabled(true) .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) - .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))); + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(persistenceEnabled)) + .setDataRegionConfigurations( + new DataRegionConfiguration() + .setName(CDC) + .setPersistenceEnabled(persistenceEnabled) + .setCdcEnabled(true), + new DataRegionConfiguration() + .setName(NOT_CDC) + .setPersistenceEnabled(false) + .setCdcEnabled(false))); cfg.setPluginProviders(new AbstractTestPluginProvider() { @Override public String name() { @@ -243,6 +263,7 @@ public void testConflictVersionWritten() throws Exception { new CacheConfiguration(DEFAULT_CACHE_NAME) .setCacheMode(cacheMode) .setAtomicityMode(atomicityMode) + .setDataRegionName(CDC) .setBackups(Integer.MAX_VALUE)); if (atomicityMode == ATOMIC) @@ -359,14 +380,20 @@ public void testOrderIncrease() throws Exception { IgniteCache cache = ign.getOrCreateCache( new CacheConfiguration(DEFAULT_CACHE_NAME) .setAtomicityMode(atomicityMode) + .setDataRegionName(CDC) .setCacheMode(cacheMode)); + IgniteCache notCdcCache = ign.getOrCreateCache( + new CacheConfiguration(NOT_CDC).setDataRegionName(NOT_CDC)); + walRecCheckedCntr.set(0); // Update the same key several time. // Expect {@link CacheEntryVersion#order()} will monotically increase. - for (int i = 0; i < KEYS_CNT; i++) + for (int i = 0; i < KEYS_CNT; i++) { cache.put(KEY_TO_UPD, createUser(i)); + notCdcCache.put(KEY_TO_UPD, createUser(i)); + } assertTrue(waitForCondition(() -> walRecCheckedCntr.get() == KEYS_CNT, getTestTimeout())); } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index 04e831cc1c2f1..2f3bcbe5ce4e1 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -31,6 +31,7 @@ import java.util.TreeMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.ignite.Ignite; @@ -53,8 +54,6 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.metric.MetricExporterSpi; -import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.junit.Test; import org.junit.runner.RunWith; @@ -91,24 +90,17 @@ public class CdcSelfTest extends AbstractCdcTest { /** */ @Parameterized.Parameter(2) - public Supplier metricExporter; + public boolean persistenceEnabled; /** */ - @Parameterized.Parameter(3) - public int pageSz; - - /** */ - @Parameterized.Parameters(name = "specificConsistentId={0},walMode={1},metricExporter={2},pageSz={3}") + @Parameterized.Parameters(name = "consistentId={0}, wal={1}, persistence={2}") public static Collection parameters() { List params = new ArrayList<>(); for (WALMode mode : EnumSet.of(WALMode.FSYNC, WALMode.LOG_ONLY, WALMode.BACKGROUND)) - for (boolean specificConsistentId : new boolean[] {false, true}) { - Supplier jmx = JmxMetricExporterSpi::new; - - params.add(new Object[] {specificConsistentId, mode, null, 0}); - params.add(new Object[] {specificConsistentId, mode, jmx, 8192}); - } + for (boolean specificConsistentId : new boolean[] {false, true}) + for (boolean persistenceEnabled : new boolean[] {true, false}) + params.add(new Object[] {specificConsistentId, mode, persistenceEnabled}); return params; } @@ -121,15 +113,13 @@ public static Collection parameters() { cfg.setConsistentId(igniteInstanceName); cfg.setDataStorageConfiguration(new DataStorageConfiguration() - .setCdcEnabled(true) .setWalMode(walMode) .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) - .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(persistenceEnabled) + .setCdcEnabled(true)) .setWalArchivePath(DFLT_WAL_ARCHIVE_PATH + "/" + U.maskForFileName(igniteInstanceName))); - if (pageSz != 0) - cfg.getDataStorageConfiguration().setPageSize(pageSz); - cfg.setCacheConfiguration( new CacheConfiguration<>(TX_CACHE_NAME) .setAtomicityMode(TRANSACTIONAL) @@ -141,10 +131,14 @@ public static Collection parameters() { /** Simplest CDC test. */ @Test - public void testReadAllKeys() throws Exception { + public void testReadAllKeysCommitAll() throws Exception { // Read all records from iterator. readAll(new UserCdcConsumer(), true); + } + /** Simplest CDC test but read one event at a time to check correct iterator work. */ + @Test + public void testReadAllKeysWithoutCommit() throws Exception { // Read one record per call. readAll(new UserCdcConsumer() { @Override public boolean onEvents(Iterator evts) { @@ -153,7 +147,11 @@ public void testReadAllKeys() throws Exception { return false; } }, false); + } + /** Simplest CDC test but commit every event to check correct state restore. */ + @Test + public void testReadAllKeysCommitEachEvent() throws Exception { // Read one record per call and commit. readAll(new UserCdcConsumer() { @Override public boolean onEvents(Iterator evts) { @@ -217,6 +215,8 @@ public void testReadOneByOneForBackup() throws Exception { IgniteCache txCache = ign.cache(TX_CACHE_NAME); + awaitPartitionMapExchange(); + int keysCnt = 3; Map batch = primaryKeys(txCache, keysCnt).stream() @@ -300,6 +300,76 @@ public void testReadOneByOneForBackup() throws Exception { } } + /** Test check that state restored correctly and next event read by CDC on each restart. */ + @Test + public void testReadFromNextEntry() throws Exception { + IgniteConfiguration cfg = getConfiguration("ignite-0"); + + IgniteEx ign = startGrid(cfg); + + ign.cluster().state(ACTIVE); + + IgniteCache cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); + + int keysCnt = 10; + + addData(cache, 0, keysCnt / 2); + + long segIdx = ign.context().cache().context().wal(true).lastArchivedSegment(); + + waitForCondition(() -> ign.context().cache().context().wal(true).lastArchivedSegment() > segIdx, getTestTimeout()); + + addData(cache, keysCnt / 2, keysCnt); + + AtomicInteger expKey = new AtomicInteger(); + int lastKey = 0; + + while (expKey.get() != keysCnt) { + String errMsg = "Expected fail"; + + IgniteInternalFuture fut = runAsync(createCdc(new CdcConsumer() { + boolean oneConsumed; + + @Override public boolean onEvents(Iterator evts) { + // Fail application after one event read AND state committed. + if (oneConsumed) + throw new RuntimeException(errMsg); + + CdcEvent evt = evts.next(); + + assertEquals(expKey.get(), evt.key()); + + expKey.incrementAndGet(); + + // Fail application if all expected data read e.g. next event doesn't exist. + if (expKey.get() == keysCnt) + throw new RuntimeException(errMsg); + + oneConsumed = true; + + return true; + } + + @Override public void stop() { + // No-op. + } + + @Override public void start(MetricRegistry mreg) { + // No-op. + } + }, cfg)); + + assertTrue(waitForCondition(fut::isDone, getTestTimeout())); + + if (!errMsg.equals(fut.error().getMessage())) + throw new RuntimeException(fut.error()); + + assertEquals(1, expKey.get() - lastKey); + + lastKey = expKey.get(); + } + } + /** */ @Test public void testReadBeforeGracefulShutdown() throws Exception { @@ -473,13 +543,32 @@ public void testCdcSingleton() throws Exception { /** */ @Test public void testReReadWhenStateWasNotStored() throws Exception { - IgniteEx ign = startGrid(getConfiguration("ignite-0")); + Supplier restart = () -> { + stopAllGrids(false); - ign.cluster().state(ACTIVE); + try { + IgniteEx ign = startGrid(getConfiguration("ignite-0")); + + ign.cluster().state(ACTIVE); + + return ign; + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + + IgniteEx ign = restart.get(); IgniteCache cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); - addData(cache, 0, KEYS_CNT); + addData(cache, 0, KEYS_CNT / 2); + + ign = restart.get(); + + cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); + + addData(cache, KEYS_CNT / 2, KEYS_CNT); for (int i = 0; i < 3; i++) { UserCdcConsumer cnsmr = new UserCdcConsumer() { @@ -566,14 +655,6 @@ public void testReReadWhenStateWasNotStored() throws Exception { assertTrue(cnsmr.stopped()); } - /** {@inheritDoc} */ - @Override public MetricExporterSpi[] metricExporters() { - if (metricExporter == null) - return null; - - return new MetricExporterSpi[] {metricExporter.get()}; - } - /** */ public static void addData(IgniteCache cache, int from, int to) { for (int i = from; i < to; i++) diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java new file mode 100644 index 0000000000000..953f88827f2c8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT; +import static org.apache.ignite.testframework.GridTestUtils.getFieldValue; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** Check only {@link DataRecord} written to the WAL for in-memory cache. */ +@RunWith(Parameterized.class) +public class WalForCdcTest extends GridCommonAbstractTest { + /** */ + private static final int RECORD_COUNT = 10; + + /** */ + @Parameterized.Parameter + public CacheMode mode; + + /** */ + @Parameterized.Parameter(1) + public CacheAtomicityMode atomicityMode; + + /** */ + private boolean persistenceEnabled; + + /** */ + private boolean cdcEnabled; + + /** */ + @Parameterized.Parameters(name = "mode={0}, atomicityMode={1}") + public static Collection parameters() { + List params = new ArrayList<>(); + + for (CacheMode mode : Arrays.asList(REPLICATED, PARTITIONED)) + for (CacheAtomicityMode atomicityMode : Arrays.asList(ATOMIC, TRANSACTIONAL)) + params.add(new Object[] {mode, atomicityMode}); + + return params; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(persistenceEnabled) + .setCdcEnabled(cdcEnabled))); + + cfg.setConsistentId(igniteInstanceName); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + cdcEnabled = true; + persistenceEnabled = false; + } + + /** */ + @Test + public void testOnlyDataRecordWritten() throws Exception { + IgniteEx ignite1 = startGrid(0); + + ignite1.cluster().state(ClusterState.ACTIVE); + + AtomicInteger cntr = new AtomicInteger(); + + // Check only `DataRecords` written in WAL for in-memory cache with CDC enabled. + doTestWal(ignite1, cache -> { + for (int i = 0; i < RECORD_COUNT; i++) + cache.put(keyForNode(ignite1.affinity(DEFAULT_CACHE_NAME), cntr, ignite1.localNode()), i); + }, RECORD_COUNT); + + // Check no WAL written during rebalance. + IgniteEx ignite2 = startGrid(1); + + awaitPartitionMapExchange(false, true, null); + + // Can't use `waitForCondition` because if test passed + // then no `DataRecords` loged therefore no segment archivation. + Thread.sleep(3 * WAL_ARCHIVE_TIMEOUT); + + int walRecCnt = checkDataRecords(ignite2); + + assertEquals(0, walRecCnt); + + // Check `DataRecords` written on second node after rebalance. + doTestWal(ignite1, cache -> { + for (int i = 0; i < RECORD_COUNT; i++) + cache.put(keyForNode(ignite1.affinity(DEFAULT_CACHE_NAME), cntr, ignite1.localNode()), i); + }, RECORD_COUNT * 2); + + doTestWal(ignite2, cache -> { + for (int i = 0; i < RECORD_COUNT; i++) + cache.put(keyForNode(ignite2.affinity(DEFAULT_CACHE_NAME), cntr, ignite2.localNode()), i); + }, RECORD_COUNT * (mode == REPLICATED ? 2 : 1)); + } + + /** */ + @Test + public void testWalDisable() throws Exception { + persistenceEnabled = true; + + IgniteEx ignite = startGrid(0); + + ignite.cluster().state(ClusterState.ACTIVE); + + doTestWal(ignite, cache -> { + for (int i = 0; i < RECORD_COUNT / 2; i++) + cache.put(i, i); + + ignite.cluster().disableWal(DEFAULT_CACHE_NAME); + + for (int i = 0; i < RECORD_COUNT; i++) + cache.put(i, i); + + ignite.cluster().enableWal(DEFAULT_CACHE_NAME); + + for (int i = RECORD_COUNT / 2; i < RECORD_COUNT; i++) + cache.put(i, i); + }, RECORD_COUNT); + } + + /** */ + @Test + public void testWalDisabledIfPersistenceAndCdcDisabled() throws Exception { + persistenceEnabled = false; + cdcEnabled = false; + + IgniteEx ignite = startGrid(0); + + ignite.cluster().state(ClusterState.ACTIVE); + + ignite.getOrCreateCache(new CacheConfiguration(DEFAULT_CACHE_NAME) + .setCacheMode(mode) + .setAtomicityMode(atomicityMode)); + + assertNull(ignite.context().cache().context().wal()); + assertNull(getFieldValue(ignite.context().cache().context(), "cdcWalMgr")); + } + + /** */ + private void doTestWal( + IgniteEx ignite, + Consumer> putData, + int expWalRecCnt + ) throws Exception { + IgniteCache cache = ignite.getOrCreateCache( + new CacheConfiguration(DEFAULT_CACHE_NAME) + .setCacheMode(mode) + .setAtomicityMode(atomicityMode)); + + long archiveIdx = ignite.context().cache().context().wal(true).lastArchivedSegment(); + + putData.accept(cache); + + assertTrue(waitForCondition( + () -> archiveIdx < ignite.context().cache().context().wal(true).lastArchivedSegment(), + getTestTimeout() + )); + + int walRecCnt = checkDataRecords(ignite); + + assertEquals(expWalRecCnt, walRecCnt); + } + + /** */ + private int checkDataRecords(IgniteEx ignite) throws IgniteCheckedException { + String archive = U.resolveWorkDirectory( + U.defaultWorkDirectory(), + ignite.configuration().getDataStorageConfiguration().getWalArchivePath() + "/" + + U.maskForFileName(ignite.configuration().getIgniteInstanceName()), + false + ).getAbsolutePath(); + + WALIterator iter = new IgniteWalIteratorFactory(log).iterator(new IteratorParametersBuilder() + .ioFactory(new RandomAccessFileIOFactory()) + .filesOrDirs(archive)); + + int walRecCnt = 0; + + while (iter.hasNext()) { + IgniteBiTuple rec = iter.next(); + + if (persistenceEnabled && (!(rec.get2() instanceof DataRecord))) + continue; + + assertTrue(rec.get2() instanceof DataRecord); + + DataRecord dataRec = (DataRecord)rec.get2(); + + for (int i = 0; i < dataRec.entryCount(); i++) + assertEquals(CU.cacheId(DEFAULT_CACHE_NAME), dataRec.get(i).cacheId()); + + walRecCnt++; + } + + return walRecCnt; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java index 1a677165ba9bb..ce08f216495a1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java @@ -151,7 +151,7 @@ public void testLightCheckpointAbleToStoreOnlyGivenDataRegion() throws Exception grpId -> getPageMemoryForCacheGroup(grpId, db, context), PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY, context.cache().context().snapshot(), - db.persistentStoreMetricsImpl(), + db.dataStorageMetricsImpl(), context.longJvmPauseDetector(), context.failure(), context.cache() diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java index 8d65954cbeaa8..cf7b890f5d6b5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java @@ -167,7 +167,7 @@ private void checkInvariantSwitchSegmentSize(int serVer) throws Exception { null, null, null, - new IgniteCacheDatabaseSharedManager() { + new IgniteCacheDatabaseSharedManager(kctx) { @Override public int pageSize() { return DataStorageConfiguration.DFLT_PAGE_SIZE; } @@ -428,16 +428,19 @@ private T2 initiate( @Override protected IgniteConfiguration prepareIgniteConfiguration() { IgniteConfiguration cfg = super.prepareIgniteConfiguration(); - cfg.setDataStorageConfiguration( - new DataStorageConfiguration() - .setWalSegmentSize(SEGMENT_SIZE) - .setWalRecordIteratorBufferSize(SEGMENT_SIZE / 2) - .setWalMode(WALMode.FSYNC) - .setWalPath(workDir + WORK_SUB_DIR) - .setWalArchivePath(workDir + ARCHIVE_SUB_DIR) - .setFileIOFactory(new RandomAccessFileIOFactory()) - ); + DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); + if (dsCfg == null) + dsCfg = new DataStorageConfiguration(); + + dsCfg.setWalSegmentSize(SEGMENT_SIZE) + .setWalRecordIteratorBufferSize(SEGMENT_SIZE / 2) + .setWalMode(WALMode.FSYNC) + .setWalPath(workDir + WORK_SUB_DIR) + .setWalArchivePath(workDir + ARCHIVE_SUB_DIR) + .setFileIOFactory(new RandomAccessFileIOFactory()); + + cfg.setDataStorageConfiguration(dsCfg); cfg.setEventStorageSpi(new NoopEventStorageSpi()); return cfg; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java index 82ecd662ca5cd..290e8f9e56f34 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java @@ -66,7 +66,8 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest { cfg.setEncryptionSpi(new NoopEncryptionSpi()); cfg.setMetricExporterSpi(new NoopMetricExporterSpi()); cfg.setSystemViewExporterSpi(new JmxSystemViewExporterSpi()); - cfg.setDataStorageConfiguration(new DataStorageConfiguration()); + cfg.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setPersistenceEnabled(true))); GridTestKernalContext cctx = new GridTestKernalContext(log, cfg); @@ -85,7 +86,7 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest { new NoOpPageStoreManager(), new NoOpWALManager(), null, - new IgniteCacheDatabaseSharedManager(), + new IgniteCacheDatabaseSharedManager(cctx), null, null, null, diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java index a9e38006935cd..668941f65a8d6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java @@ -65,7 +65,8 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest cfg.setEncryptionSpi(new NoopEncryptionSpi()); cfg.setMetricExporterSpi(new NoopMetricExporterSpi()); cfg.setSystemViewExporterSpi(new JmxSystemViewExporterSpi()); - cfg.setDataStorageConfiguration(new DataStorageConfiguration()); + cfg.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setPersistenceEnabled(true))); GridTestKernalContext cctx = new GridTestKernalContext(log, cfg); @@ -84,7 +85,7 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest new NoOpPageStoreManager(), new NoOpWALManager(), null, - new IgniteCacheDatabaseSharedManager(), + new IgniteCacheDatabaseSharedManager(cctx), null, null, null, diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java index efccb297c76a9..1504bef1f65b4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java @@ -81,7 +81,8 @@ public class IndexStoragePageMemoryImplTest extends IndexStorageSelfTest { cfg.setEncryptionSpi(new NoopEncryptionSpi()); cfg.setMetricExporterSpi(new NoopMetricExporterSpi()); cfg.setSystemViewExporterSpi(new JmxSystemViewExporterSpi()); - cfg.setDataStorageConfiguration(new DataStorageConfiguration()); + cfg.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setPersistenceEnabled(true))); GridTestKernalContext cctx = new GridTestKernalContext(log, cfg); @@ -100,7 +101,7 @@ public class IndexStoragePageMemoryImplTest extends IndexStorageSelfTest { new NoOpPageStoreManager(), new NoOpWALManager(), null, - new IgniteCacheDatabaseSharedManager(), + new IgniteCacheDatabaseSharedManager(cctx), null, null, null, diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java index afd127ce6b37f..6e837ba7d6b72 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java @@ -70,7 +70,8 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest { cfg.setEncryptionSpi(new NoopEncryptionSpi()); cfg.setMetricExporterSpi(new NoopMetricExporterSpi()); cfg.setSystemViewExporterSpi(new JmxSystemViewExporterSpi()); - cfg.setDataStorageConfiguration(new DataStorageConfiguration()); + cfg.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setPersistenceEnabled(true))); GridTestKernalContext cctx = new GridTestKernalContext(log, cfg); @@ -89,7 +90,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest { new NoOpPageStoreManager(), new NoOpWALManager(), null, - new IgniteCacheDatabaseSharedManager(), + new IgniteCacheDatabaseSharedManager(cctx), null, null, null, diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index 89925274a0784..b682293870498 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.failure.NoOpFailureHandler; @@ -587,7 +588,8 @@ private PageMemoryImpl createPageMemory( DirectMemoryProvider provider = new UnsafeMemoryProvider(log); IgniteConfiguration igniteCfg = new IgniteConfiguration(); - igniteCfg.setDataStorageConfiguration(new DataStorageConfiguration()); + igniteCfg.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setPersistenceEnabled(true))); igniteCfg.setFailureHandler(new NoOpFailureHandler()); igniteCfg.setEncryptionSpi(new NoopEncryptionSpi()); igniteCfg.setMetricExporterSpi(new NoopMetricExporterSpi()); @@ -618,7 +620,7 @@ private PageMemoryImpl createPageMemory( mgr, new NoOpWALManager(), null, - new IgniteCacheDatabaseSharedManager(), + new IgniteCacheDatabaseSharedManager(kernalCtx), null, null, null, diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 38da7574f8838..0d765c115239d 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -74,7 +74,7 @@ public GridCacheTestContext(GridTestKernalContext ctx) throws Exception { null, null, new WalStateManager(null), - new IgniteCacheDatabaseSharedManager(), + new IgniteCacheDatabaseSharedManager(ctx), null, new IgniteCacheSnapshotManager(), new GridCacheDeploymentManager(), diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 1936c046b26b4..96a1ec075a00b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -23,6 +23,7 @@ import org.apache.ignite.cdc.CdcCacheVersionTest; import org.apache.ignite.cdc.CdcSelfTest; import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest; +import org.apache.ignite.cdc.WalForCdcTest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceHeuristicsTest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceTwoPartsInDifferentCheckpointsTest; import org.apache.ignite.internal.processors.cache.persistence.IgniteDataStorageMetricsSelfTest; @@ -150,6 +151,7 @@ public static void addRealPageStoreTests(List> suite, Collection GridTestUtils.addTestIfNeeded(suite, CdcSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CdcCacheVersionTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, RestartWithWalForceArchiveTimeoutTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, WalForCdcTest.class, ignoredTests); // new style folders with generated consistent ID test GridTestUtils.addTestIfNeeded(suite, IgniteUidAsConsistentIdMigrationTest.class, ignoredTests); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java index a6ce2a0d1ef9b..83e8ffc2b5496 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java @@ -30,6 +30,8 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE; @@ -39,6 +41,7 @@ import static org.apache.ignite.testframework.GridTestUtils.runAsync; /** */ +@RunWith(Parameterized.class) public class SqlCdcTest extends AbstractCdcTest { /** */ private static final String SARAH = "Sarah Connor"; @@ -55,14 +58,25 @@ public class SqlCdcTest extends AbstractCdcTest { /** */ public static final String MSK = "Moscow"; + /** */ + @Parameterized.Parameter + public boolean persistenceEnabled; + + /** */ + @Parameterized.Parameters(name = "persistence={0}") + public static Object[] parameters() { + return new Object[] {false, true}; + } + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); cfg.setDataStorageConfiguration(new DataStorageConfiguration() - .setCdcEnabled(true) .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) - .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))); + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(persistenceEnabled) + .setCdcEnabled(true))); return cfg; } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageRestartTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageRestartTest.java index cae82d3e6d5d7..178aa42fdea3e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageRestartTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageRestartTest.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.metastorage.persistence.ReadWriteMetaStorageMock; import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.junit.Test; import org.mockito.Mockito; @@ -60,7 +61,7 @@ public class StatisticsStorageRestartTest extends StatisticsAbstractTest { metastorage = new ReadWriteMetaStorageMock(); statStore = new IgniteStatisticsPersistenceStoreImpl( subscriptionProcessor, - new IgniteCacheDatabaseSharedManager(){}, + new IgniteCacheDatabaseSharedManager(new GridTestKernalContext(log)){}, cls -> log); } @@ -90,7 +91,7 @@ public void testRestart() throws IgniteCheckedException { assertEquals(stat2_3, statStore.getLocalPartitionStatistics(k2, 3)); IgniteStatisticsPersistenceStoreImpl statStore2 = new IgniteStatisticsPersistenceStoreImpl(subscriptionProcessor, - new IgniteCacheDatabaseSharedManager(){}, cls -> log); + new IgniteCacheDatabaseSharedManager(new GridTestKernalContext(log)){}, cls -> log); statStore2.onReadyForReadWrite(metastorage); @@ -128,7 +129,7 @@ public void testUnreadableStatistics() throws IgniteCheckedException { metastorage.write(outerStatKey, outerStatValue); IgniteStatisticsPersistenceStoreImpl statStore2 = new IgniteStatisticsPersistenceStoreImpl(subscriptionProcessor, - new IgniteCacheDatabaseSharedManager(){}, cls -> log); + new IgniteCacheDatabaseSharedManager(new GridTestKernalContext(log)){}, cls -> log); statStore2.onReadyForReadWrite(metastorage); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageUnitTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageUnitTest.java index 135c37b59c593..cb82c07bf687e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageUnitTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageUnitTest.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener; import org.apache.ignite.internal.processors.metastorage.persistence.ReadWriteMetaStorageMock; import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger; import org.junit.Test; import org.junit.runner.RunWith; @@ -70,9 +71,11 @@ public static Collection parameters() throws IgniteCheckedException { IgniteStatisticsRepository statsRepos = new IgniteStatisticsRepository(inMemoryStore, sysViewMgr, helper, cls -> log); - IgniteCacheDatabaseSharedManager dbMgr = new IgniteCacheDatabaseSharedManager(); + GridTestLog4jLogger log = new GridTestLog4jLogger(); + + IgniteCacheDatabaseSharedManager dbMgr = new IgniteCacheDatabaseSharedManager(new GridTestKernalContext(log)); IgniteStatisticsPersistenceStoreImpl persStore = new IgniteStatisticsPersistenceStoreImpl(subscriptionProcessor, - dbMgr, cls -> new GridTestLog4jLogger()); + dbMgr, cls -> log); ReadWriteMetaStorageMock metastorage = new ReadWriteMetaStorageMock(); lsnr[0].onReadyForReadWrite(metastorage); diff --git a/modules/spring/src/test/config/cdc/correct-cdc-config.xml b/modules/spring/src/test/config/cdc/correct-cdc-config.xml index 7ee68cbff81cf..b4755fe552a53 100644 --- a/modules/spring/src/test/config/cdc/correct-cdc-config.xml +++ b/modules/spring/src/test/config/cdc/correct-cdc-config.xml @@ -30,9 +30,9 @@ + -