Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24206 Add index and term to RaftGroupConfiguration #5083

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public MetaStorageSnapshotStorageFactory(KeyValueStorage storage) {
assert configuration != null;

return new RaftMessagesFactory().snapshotMeta()
.cfgIndex(configuration.index())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Metastorage, these fields are not needed, so now it looks like we are just conforming to a ceremony. It would be nice if we could avoid using these new 2 fields here.

.cfgTerm(configuration.term())
.lastIncludedIndex(indexWithTerm.index())
.lastIncludedTerm(indexWithTerm.term())
.peersList(configuration.peers())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@
public class RaftGroupConfiguration implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class now seems to be identical to CommittedConfiguration. Do we need to keep both?

private static final long serialVersionUID = 0;

private final long index;
private final long term;

@IgniteToStringInclude
private final List<String> peers;
@IgniteToStringInclude
private final List<String> learners;



Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra blank line

@IgniteToStringInclude
private final @Nullable List<String> oldPeers;
@IgniteToStringInclude
Expand All @@ -46,11 +51,16 @@ public class RaftGroupConfiguration implements Serializable {
* Creates a new instance.
*/
public RaftGroupConfiguration(
long index,
long term,
Collection<String> peers,
Collection<String> learners,
@Nullable Collection<String> oldPeers,
@Nullable Collection<String> oldLearners

) {
this.index = index;
this.term = term;
this.peers = List.copyOf(peers);
this.learners = List.copyOf(learners);
this.oldPeers = oldPeers == null ? null : List.copyOf(oldPeers);
Expand All @@ -62,13 +72,23 @@ public RaftGroupConfiguration(
*/
public static RaftGroupConfiguration fromCommittedConfiguration(CommittedConfiguration config) {
return new RaftGroupConfiguration(
config.index(),
config.term(),
config.peers(),
config.learners(),
config.oldPeers(),
config.oldLearners()
);
}

public long index() {
return index;
}

public long term() {
return term;
}

/**
* Returns peers of the current configuration.
*
Expand Down Expand Up @@ -121,7 +141,7 @@ public boolean equals(Object o) {
return false;
}
RaftGroupConfiguration that = (RaftGroupConfiguration) o;
return Objects.equals(peers, that.peers) && Objects.equals(learners, that.learners)
return index == that.index && term == that.term && Objects.equals(peers, that.peers) && Objects.equals(learners, that.learners)
&& Objects.equals(oldPeers, that.oldPeers) && Objects.equals(oldLearners, that.oldLearners);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class RaftGroupConfigurationSerializer extends VersionedSerializer<RaftGr

@Override
protected void writeExternalData(RaftGroupConfiguration config, IgniteDataOutput out) throws IOException {
out.writeLong(config.index());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is not going to go to release 3.0 and this is a breaking change. Let's create second version of the serializer. Another option is to agree on the mailing list to include this issue to the release.

out.writeLong(config.term());
writeStringList(config.peers(), out);
writeStringList(config.learners(), out);
writeNullableStringList(config.oldPeers(), out);
Expand All @@ -57,12 +59,14 @@ private static void writeNullableStringList(@Nullable List<String> strings, Igni

@Override
protected RaftGroupConfiguration readExternalData(byte protoVer, IgniteDataInput in) throws IOException {
long index = in.readLong();
long term = in.readLong();
List<String> peers = readStringList(in);
List<String> learners = readStringList(in);
List<String> oldPeers = readNullableStringList(in);
List<String> oldLearners = readNullableStringList(in);

return new RaftGroupConfiguration(peers, learners, oldPeers, oldLearners);
return new RaftGroupConfiguration(index, term, peers, learners, oldPeers, oldLearners);
}

private static List<String> readStringList(IgniteDataInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class RaftGroupConfigurationSerializerTest {
@Test
void serializationAndDeserializationWithoutNulls() {
RaftGroupConfiguration originalConfig = new RaftGroupConfiguration(
13L,
37L,
List.of("peer1", "peer2"),
List.of("learner1", "learner2"),
List.of("old-peer1", "old-peer2"),
Expand All @@ -47,6 +49,8 @@ void serializationAndDeserializationWithoutNulls() {
@Test
void serializationAndDeserializationWithNulls() {
RaftGroupConfiguration originalConfig = new RaftGroupConfiguration(
13L,
37L,
List.of("peer1", "peer2"),
List.of("learner1", "learner2"),
null,
Expand All @@ -61,10 +65,12 @@ void serializationAndDeserializationWithNulls() {

@Test
void v1CanBeDeserialized() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this PR is not going to be included in the 3.0 release, then changes to this test need to be reverted and another test needs to be added (v2CanBeDeserialized())

byte[] bytes = Base64.getDecoder().decode("Ae++QwMGcGVlcjEGcGVlcjIDCWxlYXJuZXIxCWxlYXJuZXIyAwpvbGQtcGVlcjEKb2xkLXBlZXIyAw1vbGQ"
+ "tbGVhcm5lcjENb2xkLWxlYXJuZXIy");
byte[] bytes = Base64.getDecoder().decode("Ae++Qw0AAAAAAAAAJQAAAAAAAAADBnBlZXIxBnBlZXIyAwlsZWFybmVyMQlsZWFybmVyMgMKb2xkLXBl"
+ "ZXIxCm9sZC1wZWVyMgMNb2xkLWxlYXJuZXIxDW9sZC1sZWFybmVyMg==");
RaftGroupConfiguration restoredConfig = VersionedSerialization.fromBytes(bytes, serializer);

assertThat(restoredConfig.index(), is(13L));
assertThat(restoredConfig.term(), is(37L));
assertThat(restoredConfig.peers(), is(List.of("peer1", "peer2")));
assertThat(restoredConfig.learners(), is(List.of("learner1", "learner2")));
assertThat(restoredConfig.oldPeers(), is(List.of("old-peer1", "old-peer2")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.entity.Task;
import org.apache.ignite.raft.jraft.entity.UserLog;
import org.apache.ignite.raft.jraft.error.LogIndexOutOfBoundsException;
Expand Down Expand Up @@ -3506,6 +3507,102 @@ public void onRawConfigurationCommitted(ConfigurationEntry conf) {
);
}

@Test
public void testIndexAndTermOfCfgArePropagatedToSnapshotMeta() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scenario is pretty lengthy and complex. Could you please elaborate on it in the javadoc? What it tests for, what the scenario is, what verifications are made...

TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);

AtomicLong term = new AtomicLong(-1);
AtomicLong index = new AtomicLong(-1);
Comment on lines +3514 to +3515
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
AtomicLong term = new AtomicLong(-1);
AtomicLong index = new AtomicLong(-1);
AtomicLong configTerm = new AtomicLong(-1);
AtomicLong configIndex = new AtomicLong(-1);


AtomicLong metaTerm = new AtomicLong(-1);
AtomicLong metaIndex = new AtomicLong(-1);
Comment on lines +3517 to +3518
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
AtomicLong metaTerm = new AtomicLong(-1);
AtomicLong metaIndex = new AtomicLong(-1);
AtomicLong metaConfigTerm = new AtomicLong(-1);
AtomicLong metaConfigIndex = new AtomicLong(-1);


cluster = new TestCluster(
"testIndexAndTermOfCfgArePropagatedToSnapshotMeta",
dataPath,
Collections.singletonList(peer0),
new LinkedHashSet<>(),
ELECTION_TIMEOUT_MILLIS,
(peerId, opts) -> {
opts.setFsm(new MockStateMachine(peerId) {
@Override
public boolean onSnapshotLoad(SnapshotReader reader) {
SnapshotMeta meta = reader.load();

metaTerm.set(meta.cfgTerm());
metaIndex.set(meta.cfgIndex());
Comment on lines +3532 to +3533
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can any of 3 nodes write its index+term to atomics or just the leader? How about restricting this explicitly by peerId?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only need indexes and terms from the leader, then why do we need other nodes? Why one of them is stopped?


return super.onSnapshotLoad(reader);
}

@Override
public void onRawConfigurationCommitted(ConfigurationEntry conf) {
term.set(conf.getId().getTerm());
index.set(conf.getId().getIndex());
System.out.println("Index + term = " + index + " " + term);
super.onRawConfigurationCommitted(conf);
}
});
},
testInfo
);

assertTrue(cluster.start(peer0));

Node leader = cluster.waitAndGetLeader();

assertEquals(1, term.get());
assertEquals(1, index.get());

TestPeer newPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 1);
TestPeer otherPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 2);

assertTrue(cluster.start(newPeer, false, 300));
assertTrue(cluster.start(otherPeer, false, 300));

// Wait until new node node sees every other node, otherwise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Wait until new node node sees every other node, otherwise
// Wait until new node sees every other node, otherwise

// changePeersAndLearnersAsync can fail.
waitForTopologyOnEveryNode(1, cluster);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we seem to wait till every node sees at least 1 node in its physical topology. But physical topology includes the node itself, so how is it about seeing other nodes, let alone seeing each other node?


SynchronizedClosure done = new SynchronizedClosure();
leader.changePeersAndLearnersAsync(
new Configuration(List.of(peer0.getPeerId(), newPeer.getPeerId(), otherPeer.getPeerId()), List.of()), leader.getCurrentTerm(),
done
);

assertEquals(done.await(), Status.OK());

assertTrue(waitForCondition(() -> cluster.getLeader().listAlivePeers().contains(newPeer.getPeerId()), 10_000));

assertTrue(cluster.stop(newPeer.getPeerId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we stop newPeer? Also, is there a guarantee it will not be a leader? Or that it will be a leader?


// apply something more
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// apply something more
// Apply something more.

sendTestTaskAndWait(leader);
triggerLeaderSnapshot(cluster, leader);

sendTestTaskAndWait(leader, 10);

triggerLeaderSnapshot(cluster, leader, 2);

//restart follower.
cluster.clean(newPeer.getPeerId());
assertTrue(cluster.start(newPeer, false, 300));

cluster.ensureSame();

assertEquals(3, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(20, fsm.getLogs().size(), fsm.getPeerId().toString());
Comment on lines +3593 to +3595
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these assertions needed? They don't seem to relate to the thing the test is testing, so they just serve as an obstacle when trying to understand what happens here


// Leader hasn't been changed, term must stay the same
assertEquals(1, term.get());
// idx_2 == joint consensus, idx_3 is expected final cfg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, the scenario starts with having just [A] as voting set and then expands it to [ABC]. It looks like joint config is the final configuration. Does it really require 2 steps?

assertEquals(3, index.get());

assertEquals(1, metaTerm.get());
assertEquals(3, metaIndex.get());
}

@Test
public void testOnNewPeersConfigurationAppliedIsNotCalledAfterResetPeers() throws Exception {
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ private void doSnapshotSave(final SaveSnapshotClosure done) {
}

SnapshotMetaBuilder metaBuilder = msgFactory.snapshotMeta()
.cfgIndex(confEntry.getId().getIndex())
.cfgTerm(confEntry.getId().getTerm())
Comment on lines +606 to +607
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 fields are only needed for partitions, and for partitions we don't save snapshot files. Hence, a question: do we really need to fill them here?

.lastIncludedIndex(lastAppliedIndex)
.lastIncludedTerm(this.lastAppliedTerm)
.peersList(confEntry.getConf().getPeers().stream().map(Object::toString).collect(toList()))
Expand Down Expand Up @@ -700,7 +702,7 @@ private void doSnapshotLoad(final LoadSnapshotClosure done) {
// so we have to protect from this. In production, these methods never return null.
if (meta.peersList() != null && meta.learnersList() != null) {
ConfigurationEntry configurationEntry = new ConfigurationEntry(
snapshotId.copy(),
new LogId(meta.cfgIndex(), meta.cfgTerm()),
new Configuration(
meta.peersList().stream().map(PeerId::parsePeer).collect(toList()),
meta.learnersList().stream().map(PeerId::parsePeer).collect(toList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public interface SnapshotMeta extends Message {

long lastIncludedTerm();

long cfgIndex();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these fields for all raft groups or just for partitions (or for partitions + metastorage)? If not for all of them, then probably they don't need to be added here as they are not part of the Raft protocol. If only partitions need them, PartitionSnapshotMeta seems to be a more suitable place to add these fields.


long cfgTerm();

@Nullable Collection<String> peersList();

@Nullable Collection<String> oldPeersList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,8 @@ private CompletableFuture<Void> completeRebalance(@Nullable Throwable throwable)
PartitionSnapshotMeta meta = snapshotMeta;

RaftGroupConfiguration raftGroupConfig = new RaftGroupConfiguration(
meta.cfgIndex(),
meta.cfgTerm(),
meta.peersList(),
meta.learnersList(),
meta.oldPeersList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public static PartitionSnapshotMeta snapshotMetaAt(
@Nullable String primaryReplicaNodeName
) {
PartitionSnapshotMetaBuilder metaBuilder = new PartitionReplicationMessagesFactory().partitionSnapshotMeta()
.cfgIndex(config.index())
.cfgTerm(config.term())
.lastIncludedIndex(logIndex)
.lastIncludedTerm(term)
.peersList(config.peers())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,11 +620,15 @@ void locksOnCommandApplication() {

@Test
void updatesGroupConfigurationOnConfigCommit() {
long index = raftIndex.incrementAndGet();

commandListener.onConfigurationCommitted(new CommittedConfiguration(
raftIndex.incrementAndGet(), 2, List.of("peer"), List.of("learner"), List.of("old-peer"), List.of("old-learner")
index, 2, List.of("peer"), List.of("learner"), List.of("old-peer"), List.of("old-learner")
));

RaftGroupConfiguration expectedConfig = new RaftGroupConfiguration(
index,
2,
List.of("peer"),
List.of("learner"),
List.of("old-peer"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class RaftGroupConfigurationConverterTest {
@Test
void convertsAndParses() {
RaftGroupConfiguration configuration = new RaftGroupConfiguration(
13L,
37L,
List.of("peer"),
List.of("learner"),
List.of("old-peer"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,8 @@ private static List<UUID> generateTxIds() {

private static RaftGroupConfiguration generateRaftGroupConfig() {
return new RaftGroupConfiguration(
13L,
37L,
List.of("peer"),
List.of("learner"),
List.of("old-peer"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ void sendsSnapshotMeta() {
when(partitionAccess.maxLastAppliedIndex()).thenReturn(100L);
when(partitionAccess.maxLastAppliedTerm()).thenReturn(3L);
when(partitionAccess.committedGroupConfiguration()).thenReturn(new RaftGroupConfiguration(
13L,
37L,
List.of("peer1:3000", "peer2:3000"),
List.of("learner1:3000", "learner2:3000"),
List.of("peer1:3000"),
Expand All @@ -89,6 +91,8 @@ void sendsSnapshotMeta() {

SnapshotMetaResponse response = getSnapshotMetaResponse();

assertThat(response.meta().cfgIndex(), is(13L));
assertThat(response.meta().cfgTerm(), is(37L));
assertThat(response.meta().lastIncludedIndex(), is(100L));
assertThat(response.meta().lastIncludedTerm(), is(3L));
assertThat(response.meta().peersList(), is(List.of("peer1:3000", "peer2:3000")));
Expand Down Expand Up @@ -121,7 +125,7 @@ private SnapshotMetaResponse getNullableSnapshotMetaResponse() {
@Test
void doesNotSendOldConfigWhenItIsNotThere() {
when(partitionAccess.committedGroupConfiguration()).thenReturn(new RaftGroupConfiguration(
List.of(), List.of(), null, null
13L, 37L, List.of(), List.of(), null, null
));

snapshot.freezeScopeUnderMvLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ void delegatesLastAppliedSetter() {
@Test
void convertsCommittedGroupConfigurationOnSave() {
RaftGroupConfiguration config = new RaftGroupConfiguration(
13L,
37L,
List.of("peer"),
List.of("learner"),
List.of("old-peer"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class SnapshotMetaUtilsTest extends BaseIgniteAbstractTest {
@Test
void buildsSnapshotMeta() {
RaftGroupConfiguration config = new RaftGroupConfiguration(
13L,
37L,
List.of("peer1:3000", "peer2:3000"), List.of("learner1:3000", "learner2:3000"),
List.of("peer1:3000"), List.of("learner1:3000")
);
Expand All @@ -78,6 +80,8 @@ void buildsSnapshotMeta() {
"primary"
);

assertThat(meta.cfgIndex(), is(13L));
assertThat(meta.cfgTerm(), is(37L));
assertThat(meta.lastIncludedIndex(), is(100L));
assertThat(meta.lastIncludedTerm(), is(3L));
assertThat(meta.peersList(), is(List.of("peer1:3000", "peer2:3000")));
Expand All @@ -96,7 +100,7 @@ void doesNotIncludeOldConfigWhenItIsNotThere() {
PartitionSnapshotMeta meta = snapshotMetaAt(
100,
3,
new RaftGroupConfiguration(List.of(), List.of(), null, null),
new RaftGroupConfiguration(13, 37, List.of(), List.of(), null, null),
42,
Map.of(),
777L,
Expand Down