Skip to content

Commit acec9f1

Browse files
committed
Read all entries from WAL with no consideration for snapshots
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 5e543d7 commit acec9f1

File tree

3 files changed

+260
-14
lines changed

3 files changed

+260
-14
lines changed

server/storage/wal/wal.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,65 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
396396
return w, nil
397397
}
398398

399+
func ReadAllEntries(lg *zap.Logger, dirpath string) (state raftpb.HardState, ents []raftpb.Entry, err error) {
400+
names, nameIndex, err := selectWALFiles(lg, dirpath, walpb.Snapshot{Term: 0, Index: 0})
401+
if err != nil {
402+
return state, nil, fmt.Errorf("[ReadAllEntries] selectWALFiles failed: %w", err)
403+
}
404+
405+
rs, _, closer, err := openWALFiles(lg, dirpath, names, nameIndex, false)
406+
if err != nil {
407+
return state, nil, fmt.Errorf("[ReadAllEntries] openWALFiles failed: %w", err)
408+
}
409+
defer closer()
410+
rec := &walpb.Record{}
411+
decoder := NewDecoder(rs...)
412+
var startIndex uint64
413+
for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) {
414+
switch rec.Type {
415+
case EntryType:
416+
e := MustUnmarshalEntry(rec.Data)
417+
if e.Index == 0 {
418+
return state, nil , fmt.Errorf("[ReadAllEntries] entry with index 0 found")
419+
}
420+
if startIndex == 0 {
421+
startIndex = e.Index
422+
ents = append(ents, e)
423+
continue
424+
}
425+
if e.Index > startIndex {
426+
offset := e.Index - startIndex
427+
if offset > uint64(len(ents)) {
428+
ents = append(ents, make([]raftpb.Entry, offset-uint64(len(ents)))...)
429+
}
430+
// The line below is potentially overriding some 'uncommitted' entries.
431+
ents = append(ents[:offset], e)
432+
}
433+
case StateType:
434+
state = MustUnmarshalState(rec.Data)
435+
case MetadataType:
436+
case CrcType:
437+
case SnapshotType:
438+
default:
439+
return state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
440+
}
441+
}
442+
if err != nil && !errors.Is(err, io.EOF) {
443+
return state, ents, err
444+
}
445+
// Filter empty entries
446+
i := 0
447+
for _, e := range ents {
448+
if e.Index == 0 {
449+
continue
450+
}
451+
ents[i] = e
452+
i++
453+
}
454+
ents = ents[:i]
455+
return state, ents, nil
456+
}
457+
399458
func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) {
400459
names, err := readWALNames(lg, dirpath)
401460
if err != nil {

server/storage/wal/wal_test.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,3 +1180,203 @@ func TestLastRecordLengthExceedFileEnd(t *testing.T) {
11801180
// environment, but only once.
11811181
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
11821182
}
1183+
1184+
func TestWriteRead(t *testing.T) {
1185+
type batch struct {
1186+
state *raftpb.HardState
1187+
entries []raftpb.Entry
1188+
snapshot *walpb.Snapshot
1189+
}
1190+
type want struct {
1191+
wantState raftpb.HardState
1192+
wantEntries []raftpb.Entry
1193+
wantError string
1194+
}
1195+
1196+
tcs := []struct {
1197+
name string
1198+
operations []batch
1199+
readAt walpb.Snapshot
1200+
walReadAll want
1201+
readAllEntries want
1202+
}{
1203+
{
1204+
name: "single batch",
1205+
operations: []batch{
1206+
{
1207+
state: &raftpb.HardState{Commit: 5},
1208+
entries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("c")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1209+
},
1210+
},
1211+
walReadAll: want{
1212+
wantState: raftpb.HardState{Commit: 5},
1213+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("c")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1214+
},
1215+
readAllEntries: want{
1216+
wantState: raftpb.HardState{Commit: 5},
1217+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("c")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1218+
},
1219+
},
1220+
{
1221+
name: "multiple committed batches",
1222+
operations: []batch{
1223+
{
1224+
state: &raftpb.HardState{Commit: 2},
1225+
entries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}},
1226+
},
1227+
{
1228+
state: &raftpb.HardState{Commit: 4},
1229+
entries: []raftpb.Entry{{Index: 3, Data: []byte("c")}, {Index: 4, Data: []byte("d")}},
1230+
},
1231+
{
1232+
state: &raftpb.HardState{Commit: 5},
1233+
entries: []raftpb.Entry{{Index: 5, Data: []byte("e")}},
1234+
},
1235+
},
1236+
walReadAll: want{
1237+
wantState: raftpb.HardState{Commit: 5},
1238+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("c")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1239+
},
1240+
readAllEntries: want{
1241+
wantState: raftpb.HardState{Commit: 5},
1242+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("c")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1243+
},
1244+
},
1245+
{
1246+
name: "uncommitted ovewritten entries",
1247+
operations: []batch{
1248+
{
1249+
state: &raftpb.HardState{Commit: 1},
1250+
entries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("a")}},
1251+
},
1252+
{
1253+
state: &raftpb.HardState{Commit: 3},
1254+
entries: []raftpb.Entry{{Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("b")}, {Index: 4, Data: []byte("b")}},
1255+
},
1256+
{
1257+
state: &raftpb.HardState{Commit: 4},
1258+
entries: []raftpb.Entry{{Index: 4, Data: []byte("c")}, {Index: 5, Data: []byte("c")}},
1259+
},
1260+
},
1261+
walReadAll: want{
1262+
wantState: raftpb.HardState{Commit: 4},
1263+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("b")}, {Index: 4, Data: []byte("c")}, {Index: 5, Data: []byte("c")}},
1264+
},
1265+
readAllEntries: want{
1266+
wantState: raftpb.HardState{Commit: 4},
1267+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 3, Data: []byte("b")}, {Index: 4, Data: []byte("c")}, {Index: 5, Data: []byte("c")}},
1268+
},
1269+
},
1270+
{
1271+
name: "before snapshot",
1272+
operations: []batch{
1273+
{
1274+
state: &raftpb.HardState{Commit: 1},
1275+
entries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}},
1276+
},
1277+
{
1278+
snapshot: &walpb.Snapshot{Index: 3, ConfState: &raftpb.ConfState{}},
1279+
},
1280+
{
1281+
state: &raftpb.HardState{Commit: 5},
1282+
entries: []raftpb.Entry{{Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1283+
},
1284+
},
1285+
walReadAll: want{
1286+
wantError: "slice bounds out of range",
1287+
},
1288+
readAllEntries: want{
1289+
wantState: raftpb.HardState{Commit: 5},
1290+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1291+
},
1292+
},
1293+
{
1294+
name: "at snapshot",
1295+
operations: []batch{
1296+
{
1297+
state: &raftpb.HardState{Commit: 1},
1298+
entries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}},
1299+
},
1300+
{
1301+
snapshot: &walpb.Snapshot{Index: 3, ConfState: &raftpb.ConfState{}},
1302+
},
1303+
{
1304+
state: &raftpb.HardState{Commit: 5},
1305+
entries: []raftpb.Entry{{Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1306+
},
1307+
},
1308+
readAt: walpb.Snapshot{Index: 3},
1309+
walReadAll: want{
1310+
wantState: raftpb.HardState{Commit: 5},
1311+
wantEntries: []raftpb.Entry{{Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1312+
},
1313+
readAllEntries: want{
1314+
wantState: raftpb.HardState{Commit: 5},
1315+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1316+
},
1317+
},
1318+
{
1319+
name: "after snapshot",
1320+
operations: []batch{
1321+
{
1322+
state: &raftpb.HardState{Commit: 1},
1323+
entries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}},
1324+
},
1325+
{
1326+
snapshot: &walpb.Snapshot{Index: 3, ConfState: &raftpb.ConfState{}},
1327+
},
1328+
{
1329+
state: &raftpb.HardState{Commit: 5},
1330+
entries: []raftpb.Entry{{Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1331+
},
1332+
},
1333+
readAt: walpb.Snapshot{Index: 4},
1334+
walReadAll: want{
1335+
wantError: "snapshot not found",
1336+
},
1337+
readAllEntries: want{
1338+
wantState: raftpb.HardState{Commit: 5},
1339+
wantEntries: []raftpb.Entry{{Index: 1, Data: []byte("a")}, {Index: 2, Data: []byte("b")}, {Index: 4, Data: []byte("d")}, {Index: 5, Data: []byte("e")}},
1340+
},
1341+
},
1342+
}
1343+
for _, tc := range tcs {
1344+
t.Run(tc.name, func(t *testing.T) {
1345+
dir := t.TempDir()
1346+
lg := zaptest.NewLogger(t)
1347+
w, err := Create(lg, dir, nil)
1348+
require.NoError(t, err)
1349+
for _, op := range tc.operations {
1350+
if op.state != nil {
1351+
err = w.Save(*op.state, op.entries)
1352+
require.NoError(t, err)
1353+
}
1354+
if op.snapshot != nil {
1355+
err = w.SaveSnapshot(*op.snapshot)
1356+
require.NoError(t, err)
1357+
}
1358+
}
1359+
w.Close()
1360+
1361+
w2, err := OpenForRead(lg, dir, tc.readAt)
1362+
require.NoError(t, err)
1363+
defer w2.Close()
1364+
t.Run("wal.ReadAll", func(t *testing.T) {
1365+
_, state, entries, err := w2.ReadAll()
1366+
if tc.walReadAll.wantError != "" {
1367+
require.ErrorContains(t, err, tc.walReadAll.wantError)
1368+
return
1369+
}
1370+
require.NoError(t, err)
1371+
assert.Equal(t, tc.walReadAll.wantState, state)
1372+
assert.Equal(t, tc.walReadAll.wantEntries, entries)
1373+
})
1374+
t.Run("ReadAllEntries", func(t *testing.T) {
1375+
state, entries, err := ReadAllEntries(lg, dir)
1376+
require.NoError(t, err)
1377+
assert.Equal(t, tc.readAllEntries.wantState, state)
1378+
assert.Equal(t, tc.readAllEntries.wantEntries, entries)
1379+
})
1380+
})
1381+
}
1382+
}

tests/robustness/report/wal.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"go.etcd.io/etcd/pkg/v3/pbutil"
3131
"go.etcd.io/etcd/server/v3/storage/datadir"
3232
"go.etcd.io/etcd/server/v3/storage/wal"
33-
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
3433
"go.etcd.io/etcd/tests/v3/framework/e2e"
3534
"go.etcd.io/etcd/tests/v3/robustness/model"
3635
"go.etcd.io/raft/v3/raftpb"
@@ -193,20 +192,8 @@ func ReadWAL(lg *zap.Logger, dataDir string) (state raftpb.HardState, ents []raf
193192
walDir := datadir.ToWALDir(dataDir)
194193
repaired := false
195194
for {
196-
w, err := wal.OpenForRead(lg, walDir, walpb.Snapshot{Index: 0})
195+
state, ents, err = wal.ReadAllEntries(lg, walDir)
197196
if err != nil {
198-
return state, nil, fmt.Errorf("failed to open WAL, err: %w", err)
199-
}
200-
_, state, ents, err = w.ReadAll()
201-
w.Close()
202-
if err != nil {
203-
if errors.Is(err, wal.ErrSnapshotNotFound) {
204-
lg.Info("Error occurred when reading WAL entries", zap.Error(err))
205-
return state, ents, nil
206-
}
207-
if errors.Is(err, wal.ErrSliceOutOfRange) {
208-
return state, nil, fmt.Errorf("failed to read WAL, err: %w", err)
209-
}
210197
// we can only repair ErrUnexpectedEOF and we never repair twice.
211198
if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
212199
return state, nil, fmt.Errorf("failed to read WAL, cannot be repaired, err: %w", err)

0 commit comments

Comments
 (0)