Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23079 Make Raft storages destruction durable #4987

Merged
merged 27 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.raft.jraft.storage.impl.NoopGroupStoragesDestructionIntents;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Table;
Expand Down Expand Up @@ -1051,7 +1052,8 @@ class Node {
raftConfiguration,
hybridClock,
raftGroupEventsClientListener,
new NoOpFailureManager()
new NoOpFailureManager(),
new NoopGroupStoragesDestructionIntents()
);

var clusterStateStorage = new TestClusterStateStorage();
Expand Down
1 change: 1 addition & 0 deletions modules/raft/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dependencies {
implementation project(':ignite-rocksdb-common')
implementation project(':ignite-metrics')
implementation project(':ignite-failure-handler')
implementation project(':ignite-vault')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
implementation libs.disruptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.GroupStoragesDestructionIntents;
import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.NamedThreadFactory;
Expand Down Expand Up @@ -117,7 +118,8 @@ public Loza(
RaftConfiguration raftConfiguration,
HybridClock clock,
RaftGroupEventsClientListener raftGroupEventsClientListener,
FailureManager failureManager
FailureManager failureManager,
GroupStoragesDestructionIntents groupStorageDestructionIntents
) {
this.clusterNetSvc = clusterNetSvc;
this.raftConfiguration = raftConfiguration;
Expand All @@ -130,7 +132,13 @@ public Loza(

this.opts = options;

this.raftServer = new JraftServerImpl(clusterNetSvc, options, raftGroupEventsClientListener, failureManager);
this.raftServer = new JraftServerImpl(
clusterNetSvc,
options,
raftGroupEventsClientListener,
failureManager,
groupStorageDestructionIntents
);

this.executor = new ScheduledThreadPoolExecutor(
CLIENT_POOL_SIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.raft.server.impl;

import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.CommittedConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.storage.GroupStoragesDestructionIntents;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
Expand Down Expand Up @@ -119,6 +121,8 @@ public class JraftServerImpl implements RaftServer {
/** Failure processor that is used to handle critical errors. */
private final FailureManager failureManager;

private final GroupStoragesDestructionIntents groupStoragesDestructionIntents;

/** Server instance. */
private IgniteRpcServer rpcServer;

Expand Down Expand Up @@ -164,10 +168,12 @@ public JraftServerImpl(
ClusterService service,
NodeOptions opts,
RaftGroupEventsClientListener raftGroupEventsClientListener,
FailureManager failureManager
FailureManager failureManager,
GroupStoragesDestructionIntents groupStoragesDestructionIntents
) {
this.service = service;
this.nodeManager = new NodeManager();
this.groupStoragesDestructionIntents = groupStoragesDestructionIntents;

this.opts = opts;
this.raftGroupEventsClientListener = raftGroupEventsClientListener;
Expand Down Expand Up @@ -340,7 +346,7 @@ public CompletableFuture<Void> startAsync(ComponentContext componentContext) {

rpcServer.init(null);

return nullCompletedFuture();
return completeRaftGroupStoragesDestruction(componentContext.executor());
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -411,7 +417,11 @@ public ClusterService clusterService() {
}

public static Path getServerDataPath(Path basePath, RaftNodeId nodeId) {
return basePath.resolve(nodeId.nodeIdStringForStorage());
return getServerDataPath(basePath, nodeId.nodeIdStringForStorage());
}

private static Path getServerDataPath(Path basePath, String nodeIdStringForStorage) {
return basePath.resolve(nodeIdStringForStorage);
}

@Override
Expand Down Expand Up @@ -522,6 +532,10 @@ public boolean startRaftNode(
}

private static Path serverDataPathForNodeId(RaftNodeId nodeId, RaftGroupOptions groupOptions) {
return serverDataPathForNodeId(nodeId.nodeIdStringForStorage(), groupOptions);
}

private static Path serverDataPathForNodeId(String nodeId, RaftGroupOptions groupOptions) {
Path dataPath = groupOptions.serverDataPath();

assert dataPath != null : "Raft metadata path was not set, nodeId is " + nodeId;
Expand Down Expand Up @@ -573,16 +587,26 @@ public boolean stopRaftNodes(ReplicationGroupId groupId) {

@Override
public void destroyRaftNodeStorages(RaftNodeId nodeId, RaftGroupOptions groupOptions) {
// TODO: IGNITE-23079 - improve on what we do if it was not possible to destroy any of the storages.
groupStoragesDestructionIntents.saveDestroyStorageIntent(nodeId, groupOptions);

destroyStorage(nodeId.nodeIdStringForStorage(), groupOptions, true);
}

private void destroyStorage(String nodeId, RaftGroupOptions groupOptions, boolean destroyVolatile) {
LogStorageFactory logStorageFactory = groupOptions.getLogStorageFactory();

try {
String logUri = nodeId.nodeIdStringForStorage();
groupOptions.getLogStorageFactory().destroyLogStorage(logUri);
} finally {
Path serverDataPath = serverDataPathForNodeId(nodeId, groupOptions);
if (destroyVolatile || !groupOptions.volatileStores()) {
logStorageFactory.destroyLogStorage(nodeId);
}
rpuch marked this conversation as resolved.
Show resolved Hide resolved

// This destroys both meta storage and snapshots storage as they are stored under serverDataPath.
IgniteUtils.deleteIfExists(serverDataPath);
// This destroys both meta storage and snapshots storage as they are stored under nodeDataPath.
IgniteUtils.deleteIfExistsThrowable(serverDataPathForNodeId(nodeId, groupOptions));
} catch (IOException e) {
throw new IgniteInternalException("Failed to delete storage for node: " + nodeId, e);
}

groupStoragesDestructionIntents.removeDestroyStorageIntent(nodeId);
}

@Override
Expand Down Expand Up @@ -703,6 +727,11 @@ private Object startNodeMonitor(RaftNodeId nodeId) {
return startGroupInProgressMonitors.get(Math.abs(nodeId.hashCode() % SIMULTANEOUS_GROUP_START_PARALLELISM));
}

private CompletableFuture<Void> completeRaftGroupStoragesDestruction(ExecutorService executor) {
return runAsync(() -> groupStoragesDestructionIntents.readGroupOptionsByNodeIdForDestruction()
.forEach((nodeId, groupOptions) -> destroyStorage(nodeId, groupOptions, false)), executor);
}

/**
* Wrapper of {@link StateMachineAdapter}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.raft.storage;

import java.util.Map;
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.replicator.ReplicationGroupId;

/** Persists and retrieves intent to complete storages destruction on node start. */
rpuch marked this conversation as resolved.
Show resolved Hide resolved
public interface GroupStoragesDestructionIntents {
/** Add configurer for CMG or metastorage raft storages. */
void addGroupOptionsConfigurer(ReplicationGroupId groupId, RaftGroupOptionsConfigurer groupOptionsConfigurer);

/** Add configurer for partitions raft storages. */
void addPartitionGroupOptionsConfigurer(RaftGroupOptionsConfigurer partitionRaftConfigurer);
rpuch marked this conversation as resolved.
Show resolved Hide resolved

/** Save intent to destroy raft storages. */
void saveDestroyStorageIntent(RaftNodeId nodeId, RaftGroupOptions groupOptions);
rpuch marked this conversation as resolved.
Show resolved Hide resolved

/** Remove intent to destroy raft storages. */
void removeDestroyStorageIntent(String nodeId);

/** Returns group options needed to destroy raft storages, mapped by node id represented by String. */
Map<String, RaftGroupOptions> readGroupOptionsByNodeIdForDestruction();
rpuch marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.apache.ignite.internal.raft.storage.impl;

import java.util.Collections;
import java.util.Map;
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.storage.GroupStoragesDestructionIntents;
import org.apache.ignite.internal.replicator.ReplicationGroupId;

// Storage that doesn't save intents to destroy group storages.
public class NoopGroupStoragesDestructionIntents implements GroupStoragesDestructionIntents {
@Override
public void addGroupOptionsConfigurer(ReplicationGroupId groupId, RaftGroupOptionsConfigurer groupOptionsConfigurer) {
// No-op.
}

@Override
public void addPartitionGroupOptionsConfigurer(RaftGroupOptionsConfigurer partitionRaftConfigurer) {
// No-op.
}

@Override
public void saveDestroyStorageIntent(RaftNodeId nodeId, RaftGroupOptions groupOptions) {
// No-op.
}

@Override
public void removeDestroyStorageIntent(String nodeId) {
// No-op.
}

@Override
public Map<String, RaftGroupOptions> readGroupOptionsByNodeIdForDestruction() {
return Collections.emptyMap();
}
}
Loading