Skip to content

Commit 46379c9

Browse files
committed
Read all entries from WAL with no consideration for snapshots
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 5e543d7 commit 46379c9

File tree

3 files changed

+320
-14
lines changed

3 files changed

+320
-14
lines changed

server/storage/wal/wal.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,72 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
396396
return w, nil
397397
}
398398

399+
func ReadAllEntries(lg *zap.Logger, dirpath string) (state raftpb.HardState, ents []raftpb.Entry, err error) {
400+
names, nameIndex, err := selectWALFiles(lg, dirpath, walpb.Snapshot{Term: 0, Index: 0})
401+
if err != nil {
402+
return state, nil, fmt.Errorf("[ReadAllEntries] selectWALFiles failed: %w", err)
403+
}
404+
405+
rs, _, closer, err := openWALFiles(lg, dirpath, names, nameIndex, false)
406+
if err != nil {
407+
return state, nil, fmt.Errorf("[ReadAllEntries] openWALFiles failed: %w", err)
408+
}
409+
defer closer()
410+
rec := &walpb.Record{}
411+
decoder := NewDecoder(rs...)
412+
var firstIndex uint64
413+
var firstEntry = true
414+
for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) {
415+
switch rec.Type {
416+
case EntryType:
417+
e := MustUnmarshalEntry(rec.Data)
418+
if firstEntry {
419+
firstEntry = false
420+
firstIndex = e.Index
421+
ents = append(ents, e)
422+
continue
423+
}
424+
if e.Index > firstIndex {
425+
offset := e.Index - firstIndex
426+
if offset > uint64(len(ents)) {
427+
ents = append(ents, make([]raftpb.Entry, offset-uint64(len(ents)))...)
428+
}
429+
// The line below is potentially overriding some 'uncommitted' entries.
430+
ents = append(ents[:offset], e)
431+
}
432+
case StateType:
433+
state = MustUnmarshalState(rec.Data)
434+
case MetadataType:
435+
case CrcType:
436+
crc := decoder.LastCRC()
437+
// current crc of decoder must match the crc of the record.
438+
// do no need to match 0 crc, since the decoder is a new one at this case.
439+
if crc != 0 && rec.Validate(crc) != nil {
440+
state.Reset()
441+
return state, nil, ErrCRCMismatch
442+
}
443+
decoder.UpdateCRC(rec.Crc)
444+
case SnapshotType:
445+
default:
446+
return state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
447+
}
448+
}
449+
if err != nil && !errors.Is(err, io.EOF) {
450+
return state, nil, err
451+
}
452+
// Filter empty entries
453+
i := 0
454+
for _, e := range ents {
455+
if e.Index == 0 {
456+
continue
457+
}
458+
ents[i] = e
459+
i++
460+
}
461+
ents = ents[:i]
462+
return state, ents, nil
463+
}
464+
399465
func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) {
400466
names, err := readWALNames(lg, dirpath)
401467
if err != nil {

server/storage/wal/wal_test.go

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,3 +1180,256 @@ func TestLastRecordLengthExceedFileEnd(t *testing.T) {
11801180
// environment, but only once.
11811181
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
11821182
}
1183+
1184+
func TestWriteRead(t *testing.T) {
1185+
type batch struct {
1186+
state *raftpb.HardState
1187+
entries []raftpb.Entry
1188+
snapshot *walpb.Snapshot
1189+
}
1190+
type want struct {
1191+
wantState raftpb.HardState
1192+
wantEntries []raftpb.Entry
1193+
wantError string
1194+
}
1195+
1196+
tcs := []struct {
1197+
name string
1198+
operations []batch
1199+
readAt walpb.Snapshot
1200+
walReadAll want
1201+
readAllEntries want
1202+
}{
1203+
{
1204+
name: "single batch",
1205+
operations: []batch{
1206+
{
1207+
state: &raftpb.HardState{Commit: 5},
1208+
entries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("c")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1209+
},
1210+
},
1211+
walReadAll: want{
1212+
wantState: raftpb.HardState{Commit: 5},
1213+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("c")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1214+
},
1215+
readAllEntries: want{
1216+
wantState: raftpb.HardState{Commit: 5},
1217+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("c")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1218+
},
1219+
},
1220+
{
1221+
name: "multiple committed batches",
1222+
operations: []batch{
1223+
{
1224+
state: &raftpb.HardState{Commit: 2},
1225+
entries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}},
1226+
},
1227+
{
1228+
state: &raftpb.HardState{Commit: 4},
1229+
entries: []raftpb.Entry{{Index: 3, Data: []byte("c")}, {Index: 4, Data: []byte("d")}},
1230+
},
1231+
{
1232+
state: &raftpb.HardState{Commit: 5},
1233+
entries: []raftpb.Entry{{Index: 5, Data: []byte("e")}},
1234+
},
1235+
},
1236+
walReadAll: want{
1237+
wantState: raftpb.HardState{Commit: 5},
1238+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("c")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1239+
},
1240+
readAllEntries: want{
1241+
wantState: raftpb.HardState{Commit: 5},
1242+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("c")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1243+
},
1244+
},
1245+
{
1246+
name: "uncommitted ovewritten entries",
1247+
operations: []batch{
1248+
{
1249+
state: &raftpb.HardState{Commit: 1},
1250+
entries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("a")}},
1251+
},
1252+
{
1253+
state: &raftpb.HardState{Commit: 3},
1254+
entries: []raftpb.Entry{{Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("b")}, {Index: 4, Data: []byte("b")}},
1255+
},
1256+
{
1257+
state: &raftpb.HardState{Commit: 4},
1258+
entries: []raftpb.Entry{{Index: 4, Data: []byte("c")}, {Index: 5, Data: []byte("c")}},
1259+
},
1260+
},
1261+
walReadAll: want{
1262+
wantState: raftpb.HardState{Commit: 4},
1263+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("b")}, {Index: 4, Data: []byte("c")}, {Index: 5, Data: []byte("c")}},
1264+
},
1265+
readAllEntries: want{
1266+
wantState: raftpb.HardState{Commit: 4},
1267+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("b")}, {Index: 4, Data: []byte("c")}, {Index: 5, Data: []byte("c")}},
1268+
},
1269+
},
1270+
{
1271+
name: "entries before first index",
1272+
operations: []batch{
1273+
{
1274+
state: &raftpb.HardState{Commit: 6},
1275+
entries: []raftpb.Entry{{Index: 5, Data: []byte("a")}, {Index: 6, Data: []byte("b")}},
1276+
},
1277+
{
1278+
state: &raftpb.HardState{Commit: 2},
1279+
entries: []raftpb.Entry{{Index: 1, Data: []byte("c")}, {Index: 2, Data: []byte("d")}},
1280+
},
1281+
{
1282+
state: &raftpb.HardState{Commit: 4},
1283+
entries: []raftpb.Entry{{Index: 3, Data: []byte("e")}, {Index: 4, Data: []byte("f")}},
1284+
},
1285+
},
1286+
walReadAll: want{
1287+
wantError: "slice bounds out of range",
1288+
},
1289+
readAllEntries: want{
1290+
wantState: raftpb.HardState{Commit: 4},
1291+
wantEntries: []raftpb.Entry{{Index: 5, Data: []byte("a")}, {Index: 6, Data: []byte("b")}},
1292+
},
1293+
},
1294+
{
1295+
name: "read before snapshot",
1296+
operations: []batch{
1297+
{
1298+
state: &raftpb.HardState{Commit: 1},
1299+
entries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}},
1300+
},
1301+
{
1302+
snapshot: &walpb.Snapshot{Index: 3, ConfState: &raftpb.ConfState{}},
1303+
},
1304+
{
1305+
state: &raftpb.HardState{Commit: 5},
1306+
entries: []raftpb.Entry{{Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1307+
},
1308+
},
1309+
walReadAll: want{
1310+
wantError: "slice bounds out of range",
1311+
},
1312+
readAllEntries: want{
1313+
wantState: raftpb.HardState{Commit: 5},
1314+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1315+
},
1316+
},
1317+
{
1318+
name: "read at snapshot",
1319+
operations: []batch{
1320+
{
1321+
state: &raftpb.HardState{Commit: 1},
1322+
entries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}},
1323+
},
1324+
{
1325+
snapshot: &walpb.Snapshot{Index: 3, ConfState: &raftpb.ConfState{}},
1326+
},
1327+
{
1328+
state: &raftpb.HardState{Commit: 5},
1329+
entries: []raftpb.Entry{{Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1330+
},
1331+
},
1332+
readAt: walpb.Snapshot{Index: 3},
1333+
walReadAll: want{
1334+
wantState: raftpb.HardState{Commit: 5},
1335+
wantEntries: []raftpb.Entry{{Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1336+
},
1337+
readAllEntries: want{
1338+
wantState: raftpb.HardState{Commit: 5},
1339+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1340+
},
1341+
},
1342+
{
1343+
name: "entries preceding snapshot",
1344+
operations: []batch{
1345+
{
1346+
snapshot: &walpb.Snapshot{Index: 4, ConfState: &raftpb.ConfState{}},
1347+
},
1348+
{
1349+
state: &raftpb.HardState{Commit: 6},
1350+
entries: []raftpb.Entry{{Index: 5, Data: []byte("a")}, {Index: 6, Data: []byte("b")}},
1351+
},
1352+
{
1353+
state: &raftpb.HardState{Commit: 4},
1354+
entries: []raftpb.Entry{{Index: 3, Data: []byte("c")}, {Index: 4, Data: []byte("d")}},
1355+
},
1356+
{
1357+
state: &raftpb.HardState{Commit: 2},
1358+
entries: []raftpb.Entry{{Index: 1, Data: []byte("e")}, {Index: 2, Data: []byte("f")}},
1359+
},
1360+
},
1361+
readAt: walpb.Snapshot{Index: 4},
1362+
walReadAll: want{
1363+
wantState: raftpb.HardState{Commit: 2},
1364+
wantEntries: []raftpb.Entry{{Index: 5, Data: []byte("a")}, {Index: 6, Data: []byte("b")}},
1365+
},
1366+
readAllEntries: want{
1367+
wantState: raftpb.HardState{Commit: 2},
1368+
wantEntries: []raftpb.Entry{{Index: 5, Data: []byte("a")}, {Index: 6, Data: []byte("b")}},
1369+
},
1370+
},
1371+
{
1372+
name: "read after snapshot",
1373+
operations: []batch{
1374+
{
1375+
state: &raftpb.HardState{Commit: 1},
1376+
entries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}},
1377+
},
1378+
{
1379+
snapshot: &walpb.Snapshot{Index: 3, ConfState: &raftpb.ConfState{}},
1380+
},
1381+
{
1382+
state: &raftpb.HardState{Commit: 5},
1383+
entries: []raftpb.Entry{{Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1384+
},
1385+
},
1386+
readAt: walpb.Snapshot{Index: 4},
1387+
walReadAll: want{
1388+
wantError: "snapshot not found",
1389+
},
1390+
readAllEntries: want{
1391+
wantState: raftpb.HardState{Commit: 5},
1392+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1393+
},
1394+
},
1395+
}
1396+
for _, tc := range tcs {
1397+
t.Run(tc.name, func(t *testing.T) {
1398+
dir := t.TempDir()
1399+
lg := zaptest.NewLogger(t)
1400+
w, err := Create(lg, dir, nil)
1401+
require.NoError(t, err)
1402+
for _, op := range tc.operations {
1403+
if op.state != nil {
1404+
err = w.Save(*op.state, op.entries)
1405+
require.NoError(t, err)
1406+
}
1407+
if op.snapshot != nil {
1408+
err = w.SaveSnapshot(*op.snapshot)
1409+
require.NoError(t, err)
1410+
}
1411+
}
1412+
w.Close()
1413+
1414+
w2, err := OpenForRead(lg, dir, tc.readAt)
1415+
require.NoError(t, err)
1416+
defer w2.Close()
1417+
t.Run("wal.ReadAll", func(t *testing.T) {
1418+
_, state, entries, err := w2.ReadAll()
1419+
if tc.walReadAll.wantError != "" {
1420+
require.ErrorContains(t, err, tc.walReadAll.wantError)
1421+
return
1422+
}
1423+
require.NoError(t, err)
1424+
assert.Equal(t, tc.walReadAll.wantState, state)
1425+
assert.Equal(t, tc.walReadAll.wantEntries, entries)
1426+
})
1427+
t.Run("ReadAllEntries", func(t *testing.T) {
1428+
state, entries, err := ReadAllEntries(lg, dir)
1429+
require.NoError(t, err)
1430+
assert.Equal(t, tc.readAllEntries.wantState, state)
1431+
assert.Equal(t, tc.readAllEntries.wantEntries, entries)
1432+
})
1433+
})
1434+
}
1435+
}

tests/robustness/report/wal.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"go.etcd.io/etcd/pkg/v3/pbutil"
3131
"go.etcd.io/etcd/server/v3/storage/datadir"
3232
"go.etcd.io/etcd/server/v3/storage/wal"
33-
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
3433
"go.etcd.io/etcd/tests/v3/framework/e2e"
3534
"go.etcd.io/etcd/tests/v3/robustness/model"
3635
"go.etcd.io/raft/v3/raftpb"
@@ -193,20 +192,8 @@ func ReadWAL(lg *zap.Logger, dataDir string) (state raftpb.HardState, ents []raf
193192
walDir := datadir.ToWALDir(dataDir)
194193
repaired := false
195194
for {
196-
w, err := wal.OpenForRead(lg, walDir, walpb.Snapshot{Index: 0})
195+
state, ents, err = wal.ReadAllEntries(lg, walDir)
197196
if err != nil {
198-
return state, nil, fmt.Errorf("failed to open WAL, err: %w", err)
199-
}
200-
_, state, ents, err = w.ReadAll()
201-
w.Close()
202-
if err != nil {
203-
if errors.Is(err, wal.ErrSnapshotNotFound) {
204-
lg.Info("Error occurred when reading WAL entries", zap.Error(err))
205-
return state, ents, nil
206-
}
207-
if errors.Is(err, wal.ErrSliceOutOfRange) {
208-
return state, nil, fmt.Errorf("failed to read WAL, err: %w", err)
209-
}
210197
// we can only repair ErrUnexpectedEOF and we never repair twice.
211198
if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
212199
return state, nil, fmt.Errorf("failed to read WAL, cannot be repaired, err: %w", err)

0 commit comments

Comments
 (0)