Skip to content

Commit

Permalink
IGNITE-15117 CDC for in-memory caches (apache#9345)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored Mar 15, 2022
1 parent 0cd4c08 commit 8b3f6eb
Show file tree
Hide file tree
Showing 40 changed files with 769 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.ignite.DataRegionMetrics;
import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.mem.MemoryAllocator;
import org.apache.ignite.mxbean.DataRegionMetricsMXBean;
import org.apache.ignite.mxbean.MetricsMxBean;
Expand Down Expand Up @@ -156,6 +157,10 @@ public final class DataRegionConfiguration implements Serializable {
/** Memory allocator. */
@Nullable private MemoryAllocator memoryAllocator = null;

/** Change Data Capture enabled flag. */
@IgniteExperimental
private boolean cdcEnabled;

/**
* Gets data region name.
*
Expand Down Expand Up @@ -556,6 +561,30 @@ public DataRegionConfiguration setWarmUpConfiguration(@Nullable WarmUpConfigurat
return warmUpCfg;
}

/**
* Sets flag indicating whether CDC enabled.
*
* @param cdcEnabled CDC enabled flag.
* @return {@code this} for chaining.
*/
@IgniteExperimental
public DataRegionConfiguration setCdcEnabled(boolean cdcEnabled) {
this.cdcEnabled = cdcEnabled;

return this;
}

/**
* Gets flag indicating whether CDC is enabled.
* Default value is {@code false}.
*
* @return CDC enabled flag.
*/
@IgniteExperimental
public boolean isCdcEnabled() {
return cdcEnabled;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataRegionConfiguration.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,6 @@ public class DataStorageConfiguration implements Serializable {
@IgniteExperimental
private String cdcWalPath = DFLT_WAL_CDC_PATH;

/** Change Data Capture enabled flag. */
@IgniteExperimental
private boolean cdcEnabled;

/** Metrics enabled flag. */
private boolean metricsEnabled = DFLT_METRICS_ENABLED;

Expand Down Expand Up @@ -800,30 +796,6 @@ public DataStorageConfiguration setCdcWalPath(String cdcWalPath) {
return this;
}

/**
* Sets flag indicating whether CDC enabled.
*
* @param cdcEnabled CDC enabled flag.
* @return {@code this} for chaining.
*/
@IgniteExperimental
public DataStorageConfiguration setCdcEnabled(boolean cdcEnabled) {
this.cdcEnabled = cdcEnabled;

return this;
}

/**
* Gets flag indicating whether CDC is enabled.
* Default value is {@code false}.
*
* @return Metrics enabled flag.
*/
@IgniteExperimental
public boolean isCdcEnabled() {
return cdcEnabled;
}

/**
* Gets flag indicating whether persistence metrics collection is enabled.
* Default value is {@link #DFLT_METRICS_ENABLED}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.ignite.cdc.CdcConfiguration;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridComponent;
Expand Down Expand Up @@ -74,7 +75,7 @@
*
* Ignite node should be explicitly configured for using {@link CdcMain}.
* <ol>
* <li>Set {@link DataStorageConfiguration#setCdcEnabled(boolean)} to true.</li>
* <li>Set {@link DataRegionConfiguration#setCdcEnabled(boolean)} to true.</li>
* <li>Optional: Set {@link DataStorageConfiguration#setCdcWalPath(String)} to path to the directory
* to store WAL segments for CDC.</li>
* <li>Optional: Set {@link DataStorageConfiguration#setWalForceArchiveTimeout(long)} to configure timeout for
Expand All @@ -101,7 +102,7 @@
* <li>Infinitely waits for new available segment and processes it.</li>
* </ol>
*
* @see DataStorageConfiguration#setCdcEnabled(boolean)
* @see DataRegionConfiguration#setCdcEnabled(boolean)
* @see DataStorageConfiguration#setCdcWalPath(String)
* @see DataStorageConfiguration#setWalForceArchiveTimeout(long)
* @see CdcCommandLineStartup
Expand All @@ -110,7 +111,7 @@
*/
public class CdcMain implements Runnable {
/** */
public static final String ERR_MSG = "Persistence disabled. Capture Data Change can't run!";
public static final String ERR_MSG = "Persistence and CDC disabled. Capture Data Change can't run!";

/** State dir. */
public static final String STATE_DIR = "state";
Expand Down Expand Up @@ -239,7 +240,7 @@ public CdcMain(
public void runX() throws Exception {
ackAsciiLogo();

if (!CU.isPersistenceEnabled(igniteCfg)) {
if (!CU.isCdcEnabled(igniteCfg)) {
log.error(ERR_MSG);

throw new IllegalArgumentException(ERR_MSG);
Expand Down Expand Up @@ -355,7 +356,7 @@ private CdcFileLockHolder lockPds() throws IgniteCheckedException {
new PdsFolderResolver<>(igniteCfg, log, igniteCfg.getConsistentId(), this::tryLock).resolve();

if (settings == null) {
throw new IgniteException("Can't find folder to read WAL segments from based on provided configuration! " +
throw new IgniteException("Can't find the folder to read WAL segments from! " +
"[workDir=" + igniteCfg.getWorkDirectory() + ", consistentId=" + igniteCfg.getConsistentId() + ']');
}

Expand Down Expand Up @@ -524,12 +525,6 @@ private void consumeSegment(Path segment) {
* @return Lock or null if lock failed.
*/
private CdcFileLockHolder tryLock(File dbStoreDirWithSubdirectory) {
if (!dbStoreDirWithSubdirectory.exists()) {
log.warning("DB store directory not exists [dir=" + dbStoreDirWithSubdirectory + ']');

return null;
}

File cdcRoot = new File(igniteCfg.getDataStorageConfiguration().getCdcWalPath());

if (!cdcRoot.isAbsolute()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -27,7 +26,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -2902,7 +2900,7 @@ void validateCacheGroup(CacheGroupDescriptor grpDesc) {
if (partsNum == 0)
return;

DataRegionConfiguration drCfg = findDataRegion(dsCfg, grpCfg.getDataRegionName());
DataRegionConfiguration drCfg = CU.findDataRegion(dsCfg, grpCfg.getDataRegionName());

if (drCfg == null)
return;
Expand Down Expand Up @@ -2941,27 +2939,5 @@ private String buildWarningMessage(CacheGroupDescriptor grpDesc,
U.sizeInMegabytes(drCfg.getMaxSize())
);
}

/**
* Finds data region by name.
*
* @param dsCfg Data storage configuration.
* @param drName Data region name.
*
* @return Found data region.
*/
@Nullable private DataRegionConfiguration findDataRegion(DataStorageConfiguration dsCfg, String drName) {
if (dsCfg.getDataRegionConfigurations() == null || drName == null)
return dsCfg.getDefaultDataRegionConfiguration();

if (dsCfg.getDefaultDataRegionConfiguration().getName().equals(drName))
return dsCfg.getDefaultDataRegionConfiguration();

Optional<DataRegionConfiguration> cfgOpt = Arrays.stream(dsCfg.getDataRegionConfigurations())
.filter(drCfg -> drCfg.getName().equals(drName))
.findFirst();

return cfgOpt.isPresent() ? cfgOpt.get() : null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
Expand All @@ -40,6 +41,8 @@
import org.apache.ignite.internal.metric.IoStatisticsHolderCache;
import org.apache.ignite.internal.metric.IoStatisticsHolderIndex;
import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
Expand Down Expand Up @@ -1095,6 +1098,19 @@ public boolean persistenceEnabled() {
return persistenceEnabled;
}

/**
* @return {@code True} if {@link DataRecord} should be loged in the WAL.
*/
public boolean logDataRecords() {
return walEnabled() && (persistenceEnabled || cdcEnabled());
}

/** @return {@code True} if CDC enabled. */
public boolean cdcEnabled() {
// Data region is null for client and non affinity nodes.
return dataRegion != null && dataRegion.config().isCdcEnabled();
}

/**
* @param nodeId Node ID.
* @param req Request.
Expand Down Expand Up @@ -1204,7 +1220,11 @@ public void onReconnected() {
}

/**
* WAL enabled flag.
* Value returned by this method can be changed runtime by the user or during rebalance.
*
* @return WAL enabled flag.
* @see IgniteCluster#disableWal(String)
* @see IgniteCluster#enableWal(String)
*/
public boolean walEnabled() {
return localWalEnabled && globalWalEnabled;
Expand Down Expand Up @@ -1321,6 +1341,13 @@ public void removeIOStatistic(boolean destroy) {
ctx.kernalContext().metric().remove(statHolderIdx.metricRegistryName(), destroy);
}

/**
* @return Write ahead log manager.
*/
public IgniteWriteAheadLogManager wal() {
return ctx.wal(cdcEnabled());
}

/**
* @param ccfg Cache configuration.
* @param plugins Ignite plugin processor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.GridCachePluginContext;
Expand Down Expand Up @@ -2306,13 +2307,19 @@ private CacheGroupDescriptor registerCacheGroup(
Map<String, Integer> caches = Collections.singletonMap(startedCacheCfg.getName(), cacheId);

boolean persistent = resolvePersistentFlag(exchActions, startedCacheCfg);
boolean walGloballyEnabled = false;
boolean walGloballyEnabled;

// client nodes cannot read wal enabled/disabled status so they should use default one
if (ctx.clientNode())
walGloballyEnabled = persistent;
else if (persistent)
walGloballyEnabled = ctx.cache().context().database().walEnabled(grpId, false);
else {
DataRegionConfiguration drCfg =
CU.findDataRegion(ctx.config().getDataStorageConfiguration(), startedCacheCfg.getDataRegionName());

walGloballyEnabled = drCfg != null && drCfg.isCdcEnabled();
}

CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
startedCacheCfg,
Expand Down
Loading

0 comments on commit 8b3f6eb

Please sign in to comment.