Skip to content

Commit

Permalink
IGNITE-15126 Fixed in-memory cluster hanging when error is thrown on …
Browse files Browse the repository at this point in the history
…activation
  • Loading branch information
petrov-mg committed Dec 9, 2024
1 parent bb4dc19 commit fb27a71
Show file tree
Hide file tree
Showing 12 changed files with 361 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public final class IgniteNodeAttributes {
public static final String ATTR_REBALANCE_POOL_SIZE = ATTR_PREFIX + ".rebalance.pool.size";

/** Internal attribute name constant. */
@Deprecated
public static final String ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED = ATTR_PREFIX + ".dynamic.cache.start.rollback.supported";

/** Supported features. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ public void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNo
* @param failMsg Dynamic change request fail message.
* @param topVer Current topology version.
*/
public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, AffinityTopologyVersion topVer) {
public void onCacheChangeRequested(ExchangeFailureMessage failMsg, AffinityTopologyVersion topVer) {
AffinityTopologyVersion actualTopVer = failMsg.exchangeId().topologyVersion();

ExchangeActions exchangeActions = new ExchangeActions();
Expand Down Expand Up @@ -610,7 +610,7 @@ public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, Aff
processStopCacheRequest(exchangeActions, req, res, req.cacheName(), cacheDesc, actualTopVer, true);
}

failMsg.exchangeActions(exchangeActions);
failMsg.exchangeRollbackActions(exchangeActions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.cache;

import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
Expand All @@ -34,49 +36,48 @@
/**
* This class represents discovery message that is used to provide information about dynamic cache start failure.
*/
public class DynamicCacheChangeFailureMessage implements DiscoveryCustomMessage {
public class ExchangeFailureMessage implements DiscoveryCustomMessage {
/** */
private static final long serialVersionUID = 0L;

/** Cache names. */
@GridToStringInclude
private Collection<String> cacheNames;
private final Collection<String> cacheNames;

/** Custom message ID. */
private IgniteUuid id;
private final IgniteUuid id;

/** */
private GridDhtPartitionExchangeId exchId;
private final GridDhtPartitionExchangeId exchId;

/** */
@GridToStringInclude
private IgniteCheckedException cause;
private final Map<UUID, Exception> exchangeErrors;

/** Cache updates to be executed on exchange. */
private transient ExchangeActions exchangeActions;
/** Actions to be done to rollback changes done before the exchange failure. */
private transient ExchangeActions exchangeRollbackActions;

/**
* Creates new DynamicCacheChangeFailureMessage instance.
*
* @param locNode Local node.
* @param exchId Exchange Id.
* @param cause Cache start error.
* @param cacheNames Cache names.
* @param exchangeErrors Errors that caused PME to fail.
*/
public DynamicCacheChangeFailureMessage(
public ExchangeFailureMessage(
ClusterNode locNode,
GridDhtPartitionExchangeId exchId,
IgniteCheckedException cause,
Map<UUID, Exception> exchangeErrors,
Collection<String> cacheNames

) {
assert exchId != null;
assert cause != null;
assert !F.isEmpty(cacheNames) : cacheNames;
assert !F.isEmpty(exchangeErrors);

this.id = IgniteUuid.fromUuid(locNode.id());
this.exchId = exchId;
this.cause = cause;
this.cacheNames = cacheNames;
this.exchangeErrors = exchangeErrors;
}

/** {@inheritDoc} */
Expand All @@ -91,27 +92,42 @@ public Collection<String> cacheNames() {
return cacheNames;
}

/** */
public Map<UUID, Exception> exchangeErrors() {
return exchangeErrors;
}

/**
* @return Cache start error.
* @return Cache updates to be executed on exchange.
*/
public IgniteCheckedException error() {
return cause;
public ExchangeActions exchangeRollbackActions() {
return exchangeRollbackActions;
}

/**
* @return Cache updates to be executed on exchange.
* @param exchangeRollbackActions Cache updates to be executed on exchange.
*/
public ExchangeActions exchangeActions() {
return exchangeActions;
public void exchangeRollbackActions(ExchangeActions exchangeRollbackActions) {
assert exchangeRollbackActions != null && !exchangeRollbackActions.empty() : exchangeRollbackActions;

this.exchangeRollbackActions = exchangeRollbackActions;
}

/**
* @param exchangeActions Cache updates to be executed on exchange.
* Creates an IgniteCheckedException that is used as root cause of the exchange initialization failure. This method
* aggregates all the exceptions provided from all participating nodes.
*
* @return Exception that represents a cause of the exchange initialization failure.
*/
public void exchangeActions(ExchangeActions exchangeActions) {
assert exchangeActions != null && !exchangeActions.empty() : exchangeActions;
public IgniteCheckedException createFailureCompoundException() {
IgniteCheckedException ex = new IgniteCheckedException("Failed to complete exchange process.");

for (Map.Entry<UUID, Exception> entry : exchangeErrors.entrySet()) {
if (ex != entry.getValue())
ex.addSuppressed(entry.getValue());
}

this.exchangeActions = exchangeActions;
return ex;
}

/**
Expand Down Expand Up @@ -141,6 +157,6 @@ public void exchangeActions(ExchangeActions exchangeActions) {

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheChangeFailureMessage.class, this);
return S.toString(ExchangeFailureMessage.class, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,13 @@
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterGroupEmptyException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.BaselineChangedEvent;
import org.apache.ignite.events.ClusterActivationEvent;
import org.apache.ignite.events.ClusterStateChangeEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDiagnosticAware;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
Expand All @@ -83,7 +75,6 @@
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
Expand Down Expand Up @@ -116,7 +107,6 @@
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.BooleanMetricImpl;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
Expand All @@ -141,7 +131,6 @@
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand All @@ -162,9 +151,6 @@
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_ACTIVATED;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_DEACTIVATED;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_STATE_CHANGED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
Expand Down Expand Up @@ -613,23 +599,6 @@ private void onDiscoveryEvent(DiscoveryEvent evt, DiscoCache cache) {
exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);

exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);

boolean baselineChanging;
if (stateChangeMsg.forceChangeBaselineTopology())
baselineChanging = true;
else {
DiscoveryDataClusterState state = cctx.kernalContext().state().clusterState();

assert state.transition() : state;

baselineChanging = exchActions.changedBaseline()
// Or it is the first activation.
|| state.state() != ClusterState.INACTIVE
&& !state.previouslyActive()
&& state.previousBaselineTopology() == null;
}

exchFut.listen(f -> onClusterStateChangeFinish(exchActions, baselineChanging));
}
}
else if (customMsg instanceof DynamicCacheChangeBatch) {
Expand Down Expand Up @@ -657,13 +626,14 @@ else if (msg.exchangeId().topologyVersion().topologyVersion() >= cctx.discovery(
exchangeFuture(msg.exchangeId(), null, null, null, null)
.onAffinityChangeMessage(evt.eventNode(), msg);
}
else if (customMsg instanceof DynamicCacheChangeFailureMessage) {
DynamicCacheChangeFailureMessage msg = (DynamicCacheChangeFailureMessage)customMsg;
else if (customMsg instanceof ExchangeFailureMessage) {
ExchangeFailureMessage msg = (ExchangeFailureMessage)customMsg;

if (msg.exchangeId().topologyVersion().topologyVersion() >=
affinityTopologyVersion(cctx.discovery().localJoinEvent()).topologyVersion())
exchangeFuture(msg.exchangeId(), null, null, null, null)
.onDynamicCacheChangeFail(evt.eventNode(), msg);
long exchangeTopVer = msg.exchangeId().topologyVersion().topologyVersion();
long locNodeJoinTopVer = affinityTopologyVersion(cctx.discovery().localJoinEvent()).topologyVersion();

if (exchangeTopVer >= locNodeJoinTopVer)
exchangeFuture(msg.exchangeId(), null, null, null, null).onExchangeFailureMessage(evt.eventNode(), msg);
}
else if (customMsg instanceof SnapshotDiscoveryMessage
&& ((SnapshotDiscoveryMessage)customMsg).needExchange()) {
Expand Down Expand Up @@ -734,74 +704,6 @@ else if (customMsg instanceof WalStateAbstractMessage
}
}

/** */
private void onClusterStateChangeFinish(ExchangeActions exchActions, boolean baselineChanging) {
A.notNull(exchActions, "exchActions");

GridEventStorageManager evtMngr = cctx.kernalContext().event();

if (exchActions.activate() && evtMngr.isRecordable(EVT_CLUSTER_ACTIVATED) ||
exchActions.deactivate() && evtMngr.isRecordable(EVT_CLUSTER_DEACTIVATED) ||
exchActions.changedClusterState() && evtMngr.isRecordable(EVT_CLUSTER_STATE_CHANGED)
) {
List<Event> evts = new ArrayList<>(2);

ClusterNode locNode = cctx.kernalContext().discovery().localNode();

Collection<BaselineNode> bltNodes = cctx.kernalContext().cluster().get().currentBaselineTopology();

boolean collectionUsed = false;

if (exchActions.activate() && evtMngr.isRecordable(EVT_CLUSTER_ACTIVATED)) {
assert !exchActions.deactivate() : exchActions;

collectionUsed = true;

evts.add(new ClusterActivationEvent(locNode, "Cluster activated.", EVT_CLUSTER_ACTIVATED, bltNodes));
}

if (exchActions.deactivate() && evtMngr.isRecordable(EVT_CLUSTER_DEACTIVATED)) {
assert !exchActions.activate() : exchActions;

collectionUsed = true;

evts.add(new ClusterActivationEvent(locNode, "Cluster deactivated.", EVT_CLUSTER_DEACTIVATED, bltNodes));
}

if (exchActions.changedClusterState() && evtMngr.isRecordable(EVT_CLUSTER_STATE_CHANGED)) {
StateChangeRequest req = exchActions.stateChangeRequest();

if (collectionUsed && bltNodes != null)
bltNodes = new ArrayList<>(bltNodes);

evts.add(new ClusterStateChangeEvent(req.prevState(), req.state(), bltNodes, locNode, "Cluster state changed."));
}

A.notEmpty(evts, "events " + exchActions);

cctx.kernalContext().pools().getSystemExecutorService()
.submit(() -> evts.forEach(e -> cctx.kernalContext().event().record(e)));
}

GridKernalContext ctx = cctx.kernalContext();
DiscoveryDataClusterState state = ctx.state().clusterState();

if (baselineChanging) {
ctx.pools().getStripedExecutorService().execute(new Runnable() {
@Override public void run() {
if (ctx.event().isRecordable(EventType.EVT_BASELINE_CHANGED)) {
ctx.event().record(new BaselineChangedEvent(
ctx.discovery().localNode(),
"Baseline changed.",
EventType.EVT_BASELINE_CHANGED,
ctx.cluster().get().currentBaselineTopology()
));
}
}
});
}
}

/**
* @param task Task to run in exchange worker thread.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4271,8 +4271,8 @@ else if (msg0 instanceof WalStateFinishMessage)
return changeRequested;
}

if (msg instanceof DynamicCacheChangeFailureMessage)
cachesInfo.onCacheChangeRequested((DynamicCacheChangeFailureMessage)msg, topVer);
if (msg instanceof ExchangeFailureMessage)
cachesInfo.onCacheChangeRequested((ExchangeFailureMessage)msg, topVer);

if (msg instanceof ClientCacheChangeDiscoveryMessage)
cachesInfo.onClientCacheChange((ClientCacheChangeDiscoveryMessage)msg, node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class StateChangeRequest {
/** */
private final AffinityTopologyVersion topVer;

/** */
private final boolean isBaselineChangeRequest;

/**
* @param msg Message.
* @param bltHistItem Baseline history item.
Expand All @@ -53,12 +56,14 @@ public StateChangeRequest(
ChangeGlobalStateMessage msg,
BaselineTopologyHistoryItem bltHistItem,
ClusterState prevState,
AffinityTopologyVersion topVer
AffinityTopologyVersion topVer,
boolean isBaselineChangeRequest
) {
this.msg = msg;
prevBltHistItem = bltHistItem;
this.prevState = prevState;
this.topVer = topVer;
this.isBaselineChangeRequest = isBaselineChangeRequest;
}

/**
Expand Down Expand Up @@ -98,6 +103,11 @@ public ClusterState state() {
return msg.state();
}

/** */
public boolean isBaselineChangeRequest() {
return isBaselineChangeRequest;
}

/**
* @return Previous cluster state.
*/
Expand Down
Loading

0 comments on commit fb27a71

Please sign in to comment.