Skip to content

Commit 4f7f341

Browse files
authored
IGNITE-23597 Cache latest term values in log manager (#5005)
1 parent 3739f46 commit 4f7f341

File tree

3 files changed

+247
-9
lines changed

3 files changed

+247
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.raft.storage;
19+
20+
import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
21+
22+
import org.apache.ignite.raft.jraft.entity.LogId;
23+
24+
/**
25+
* Cyclic buffer to cache several last term values for log storage.
26+
*/
27+
public class TermCache {
28+
private final int mask;
29+
private final long[] indexes;
30+
private final long[] terms;
31+
32+
// Head position. -1 means the cache is empty.
33+
private int head = -1;
34+
35+
// Tail position. Might be equal to head if the cache only has a single term.
36+
private int tail;
37+
38+
/**
39+
* Constructor.
40+
*
41+
* @param capacity Cache capacity. Must be a power of 2. Should be a small value, term update is a rare operation.
42+
*/
43+
public TermCache(int capacity) {
44+
assert isPow2(capacity) : "Capacity must be a power of 2";
45+
46+
this.mask = capacity - 1;
47+
this.indexes = new long[capacity];
48+
this.terms = new long[capacity];
49+
}
50+
51+
/**
52+
* Should be called when appending a new log entry.
53+
*/
54+
public void append(LogId id) {
55+
if (isEmpty()) {
56+
head = 0;
57+
indexes[tail] = id.getIndex();
58+
terms[tail] = id.getTerm();
59+
60+
return;
61+
}
62+
63+
// Term has not changed, nothing to update.
64+
if (terms[tail] == id.getTerm()) {
65+
return;
66+
}
67+
68+
tail = next(tail);
69+
indexes[tail] = id.getIndex();
70+
terms[tail] = id.getTerm();
71+
72+
// Handle buffer overflow by moving head to the next position.
73+
if (tail == head) {
74+
head = next(head);
75+
}
76+
}
77+
78+
private int prev(int i) {
79+
return (i - 1) & mask;
80+
}
81+
82+
private int next(int i) {
83+
return (i + 1) & mask;
84+
}
85+
86+
private boolean isEmpty() {
87+
return head == -1;
88+
}
89+
90+
private int findIndex(long idx) {
91+
// Could be replaced with a binary search, but why bother for such a small cache.
92+
for (int i = tail; i != head; i = prev(i)) {
93+
if (idx >= indexes[i]) {
94+
return i;
95+
}
96+
}
97+
98+
return head;
99+
}
100+
101+
/**
102+
* Lookup term for the given index. Returns {@code -1} if the index is not found in the cache.
103+
*/
104+
public long lookup(long idx) {
105+
if (isEmpty() || idx < indexes[head]) {
106+
return -1;
107+
}
108+
109+
return terms[findIndex(idx)];
110+
}
111+
112+
/**
113+
* Resets the cache to the initial state.
114+
*/
115+
public void reset() {
116+
head = -1;
117+
tail = 0;
118+
}
119+
120+
/**
121+
* Truncates the cache to the given index, deleting all information for indexes greater than or equal to the given one.
122+
*/
123+
public void truncateTail(long idx) {
124+
if (isEmpty() || idx < indexes[head]) {
125+
reset();
126+
127+
return;
128+
}
129+
130+
tail = findIndex(idx);
131+
132+
if (indexes[tail] == idx) {
133+
if (head == tail) {
134+
reset();
135+
} else {
136+
tail = prev(tail);
137+
}
138+
}
139+
}
140+
}

modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java

+17-9
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.apache.ignite.raft.jraft.storage.impl;
1818

1919
import com.lmax.disruptor.EventHandler;
20-
import com.lmax.disruptor.EventTranslator;
2120
import com.lmax.disruptor.RingBuffer;
2221
import java.util.ArrayList;
2322
import java.util.HashMap;
@@ -30,13 +29,13 @@
3029
import java.util.concurrent.locks.ReentrantReadWriteLock;
3130
import org.apache.ignite.internal.logger.IgniteLogger;
3231
import org.apache.ignite.internal.logger.Loggers;
32+
import org.apache.ignite.internal.raft.storage.TermCache;
3333
import org.apache.ignite.raft.jraft.FSMCaller;
3434
import org.apache.ignite.raft.jraft.Status;
3535
import org.apache.ignite.raft.jraft.conf.Configuration;
3636
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
3737
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
3838
import org.apache.ignite.raft.jraft.core.NodeMetrics;
39-
import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType;
4039
import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
4140
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
4241
import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
@@ -59,7 +58,6 @@
5958
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
6059
import org.apache.ignite.raft.jraft.util.Requires;
6160
import org.apache.ignite.raft.jraft.util.SegmentList;
62-
import org.apache.ignite.raft.jraft.util.ThreadHelper;
6361
import org.apache.ignite.raft.jraft.util.Utils;
6462

6563
/**
@@ -83,6 +81,7 @@ public class LogManagerImpl implements LogManager {
8381
private LogId diskId = new LogId(0, 0); // Last log entry written to disk.
8482
private LogId appliedId = new LogId(0, 0);
8583
private final SegmentList<LogEntry> logsInMemory = new SegmentList<>(true);
84+
private final TermCache termCache = new TermCache(8);
8685
private volatile long firstLogIndex;
8786
private volatile long lastLogIndex;
8887
private volatile LogId lastSnapshotId = new LogId(0, 0);
@@ -321,6 +320,10 @@ public void appendEntries(final List<LogEntry> entries, final StableClosure done
321320
if (!entries.isEmpty()) {
322321
done.setFirstLogIndex(entries.get(0).getId().getIndex());
323322
this.logsInMemory.addAll(entries);
323+
324+
for (LogEntry entry : entries) {
325+
this.termCache.append(entry.getId());
326+
}
324327
}
325328
done.setEntries(entries);
326329

@@ -821,9 +824,10 @@ public long getTerm(final long index) {
821824
if (index > this.lastLogIndex || index < this.firstLogIndex) {
822825
return 0;
823826
}
824-
final LogEntry entry = getEntryFromMemory(index);
825-
if (entry != null) {
826-
return entry.getId().getTerm();
827+
828+
long term = termCache.lookup(index);
829+
if (term != -1) {
830+
return term;
827831
}
828832
}
829833
finally {
@@ -913,10 +917,12 @@ private long unsafeGetTerm(final long index) {
913917
if (index > this.lastLogIndex || index < this.firstLogIndex) {
914918
return 0;
915919
}
916-
final LogEntry entry = getEntryFromMemory(index);
917-
if (entry != null) {
918-
return entry.getId().getTerm();
920+
921+
long term = termCache.lookup(index);
922+
if (term != -1) {
923+
return term;
919924
}
925+
920926
return getTermFromLogStorage(index);
921927
}
922928

@@ -1029,6 +1035,7 @@ private boolean reset(final long nextLogIndex) {
10291035
this.writeLock.lock();
10301036
try {
10311037
this.logsInMemory.clear();
1038+
this.termCache.reset();
10321039
this.firstLogIndex = nextLogIndex;
10331040
this.lastLogIndex = nextLogIndex - 1;
10341041
this.configManager.truncatePrefix(this.firstLogIndex);
@@ -1050,6 +1057,7 @@ private void unsafeTruncateSuffix(final long lastIndexKept, final Lock lock) {
10501057
}
10511058

10521059
this.logsInMemory.removeFromLastWhen(entry -> entry.getId().getIndex() > lastIndexKept);
1060+
termCache.truncateTail(lastIndexKept + 1);
10531061

10541062
this.lastLogIndex = lastIndexKept;
10551063
final long lastTermKept = unsafeGetTerm(lastIndexKept);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.raft.storage;
19+
20+
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
23+
import org.apache.ignite.raft.jraft.entity.LogId;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
27+
class TermCacheTest {
28+
private TermCache termCache;
29+
30+
@BeforeEach
31+
void setUp() {
32+
termCache = new TermCache(4);
33+
}
34+
35+
@Test
36+
void testAppendAndLookup() {
37+
termCache.append(new LogId(1, 1));
38+
termCache.append(new LogId(2, 2));
39+
termCache.append(new LogId(3, 3));
40+
41+
assertEquals(-1, termCache.lookup(0));
42+
assertEquals(1, termCache.lookup(1));
43+
assertEquals(2, termCache.lookup(2));
44+
assertEquals(3, termCache.lookup(3));
45+
assertEquals(3, termCache.lookup(4));
46+
}
47+
48+
@Test
49+
void testAppendSameTerm() {
50+
termCache.append(new LogId(1, 1));
51+
termCache.append(new LogId(2, 1));
52+
53+
assertEquals(1, termCache.lookup(1));
54+
assertEquals(1, termCache.lookup(2));
55+
}
56+
57+
@Test
58+
void testReset() {
59+
termCache.append(new LogId(1, 1));
60+
termCache.reset();
61+
62+
assertEquals(-1, termCache.lookup(1));
63+
}
64+
65+
@Test
66+
void testTruncateTail() {
67+
termCache.append(new LogId(1, 1));
68+
termCache.append(new LogId(2, 1));
69+
termCache.append(new LogId(3, 2));
70+
71+
termCache.truncateTail(2);
72+
73+
assertEquals(1, termCache.lookup(1));
74+
assertEquals(1, termCache.lookup(2));
75+
assertEquals(1, termCache.lookup(3));
76+
}
77+
78+
@Test
79+
void testTruncateTailExactMatch() {
80+
termCache.append(new LogId(1, 1));
81+
termCache.append(new LogId(2, 2));
82+
termCache.append(new LogId(3, 2));
83+
84+
termCache.truncateTail(2);
85+
86+
assertEquals(1, termCache.lookup(1));
87+
assertEquals(1, termCache.lookup(2));
88+
assertEquals(1, termCache.lookup(3));
89+
}
90+
}

0 commit comments

Comments
 (0)