@@ -1180,3 +1180,256 @@ func TestLastRecordLengthExceedFileEnd(t *testing.T) {
11801180 // environment, but only once.
11811181 require .ErrorIs (t , err , io .ErrUnexpectedEOF )
11821182}
1183+
1184+ func TestWriteRead (t * testing.T ) {
1185+ type batch struct {
1186+ state * raftpb.HardState
1187+ entries []raftpb.Entry
1188+ snapshot * walpb.Snapshot
1189+ }
1190+ type want struct {
1191+ wantState raftpb.HardState
1192+ wantEntries []raftpb.Entry
1193+ wantError string
1194+ }
1195+
1196+ tcs := []struct {
1197+ name string
1198+ operations []batch
1199+ readAt walpb.Snapshot
1200+ walReadAll want
1201+ readAllEntries want
1202+ }{
1203+ {
1204+ name : "single batch" ,
1205+ operations : []batch {
1206+ {
1207+ state : & raftpb.HardState {Commit : 5 },
1208+ entries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}, {Index : 3 , Data : []byte ("c" )}, {Index : 4 , Data : []byte ("d" )}, {Index : 5 , Data : []byte ("e" )}},
1209+ },
1210+ },
1211+ walReadAll : want {
1212+ wantState : raftpb.HardState {Commit : 5 },
1213+ wantEntries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}, {Index : 3 , Data : []byte ("c" )}, {Index : 4 , Data : []byte ("d" )}, {Index : 5 , Data : []byte ("e" )}},
1214+ },
1215+ readAllEntries : want {
1216+ wantState : raftpb.HardState {Commit : 5 },
1217+ wantEntries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}, {Index : 3 , Data : []byte ("c" )}, {Index : 4 , Data : []byte ("d" )}, {Index : 5 , Data : []byte ("e" )}},
1218+ },
1219+ },
1220+ {
1221+ name : "multiple committed batches" ,
1222+ operations : []batch {
1223+ {
1224+ state : & raftpb.HardState {Commit : 2 },
1225+ entries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}},
1226+ },
1227+ {
1228+ state : & raftpb.HardState {Commit : 4 },
1229+ entries : []raftpb.Entry {{Index : 3 , Data : []byte ("c" )}, {Index : 4 , Data : []byte ("d" )}},
1230+ },
1231+ {
1232+ state : & raftpb.HardState {Commit : 5 },
1233+ entries : []raftpb.Entry {{Index : 5 , Data : []byte ("e" )}},
1234+ },
1235+ },
1236+ walReadAll : want {
1237+ wantState : raftpb.HardState {Commit : 5 },
1238+ wantEntries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}, {Index : 3 , Data : []byte ("c" )}, {Index : 4 , Data : []byte ("d" )}, {Index : 5 , Data : []byte ("e" )}},
1239+ },
1240+ readAllEntries : want {
1241+ wantState : raftpb.HardState {Commit : 5 },
1242+ wantEntries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}, {Index : 3 , Data : []byte ("c" )}, {Index : 4 , Data : []byte ("d" )}, {Index : 5 , Data : []byte ("e" )}},
1243+ },
1244+ },
1245+ {
1246+ name : "uncommitted ovewritten entries" ,
1247+ operations : []batch {
1248+ {
1249+ state : & raftpb.HardState {Commit : 1 },
1250+ entries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("a" )}},
1251+ },
1252+ {
1253+ state : & raftpb.HardState {Commit : 3 },
1254+ entries : []raftpb.Entry {{Index : 2 , Data : []byte ("b" )}, {Index : 3 , Data : []byte ("b" )}, {Index : 4 , Data : []byte ("b" )}},
1255+ },
1256+ {
1257+ state : & raftpb.HardState {Commit : 4 },
1258+ entries : []raftpb.Entry {{Index : 4 , Data : []byte ("c" )}, {Index : 5 , Data : []byte ("c" )}},
1259+ },
1260+ },
1261+ walReadAll : want {
1262+ wantState : raftpb.HardState {Commit : 4 },
1263+ wantEntries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}, {Index : 3 , Data : []byte ("b" )}, {Index : 4 , Data : []byte ("c" )}, {Index : 5 , Data : []byte ("c" )}},
1264+ },
1265+ readAllEntries : want {
1266+ wantState : raftpb.HardState {Commit : 4 },
1267+ wantEntries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}, {Index : 3 , Data : []byte ("b" )}, {Index : 4 , Data : []byte ("c" )}, {Index : 5 , Data : []byte ("c" )}},
1268+ },
1269+ },
1270+ {
1271+ name : "entries before first index" ,
1272+ operations : []batch {
1273+ {
1274+ state : & raftpb.HardState {Commit : 6 },
1275+ entries : []raftpb.Entry {{Index : 5 , Data : []byte ("a" )}, {Index : 6 , Data : []byte ("b" )}},
1276+ },
1277+ {
1278+ state : & raftpb.HardState {Commit : 2 },
1279+ entries : []raftpb.Entry {{Index : 1 , Data : []byte ("c" )}, {Index : 2 , Data : []byte ("d" )}},
1280+ },
1281+ {
1282+ state : & raftpb.HardState {Commit : 4 },
1283+ entries : []raftpb.Entry {{Index : 3 , Data : []byte ("e" )}, {Index : 4 , Data : []byte ("f" )}},
1284+ },
1285+ },
1286+ walReadAll : want {
1287+ wantError : "slice bounds out of range" ,
1288+ },
1289+ readAllEntries : want {
1290+ wantState : raftpb.HardState {Commit : 4 },
1291+ wantEntries : []raftpb.Entry {{Index : 5 , Data : []byte ("a" )}, {Index : 6 , Data : []byte ("b" )}},
1292+ },
1293+ },
1294+ {
1295+ name : "read before snapshot" ,
1296+ operations : []batch {
1297+ {
1298+ state : & raftpb.HardState {Commit : 1 },
1299+ entries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}},
1300+ },
1301+ {
1302+ snapshot : & walpb.Snapshot {Index : 3 , ConfState : & raftpb.ConfState {}},
1303+ },
1304+ {
1305+ state : & raftpb.HardState {Commit : 5 },
1306+ entries : []raftpb.Entry {{Index : 4 , Data : []byte ("d" )}, {Index : 5 , Data : []byte ("e" )}},
1307+ },
1308+ },
1309+ walReadAll : want {
1310+ wantError : "slice bounds out of range" ,
1311+ },
1312+ readAllEntries : want {
1313+ wantState : raftpb.HardState {Commit : 5 },
1314+ wantEntries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}, {Index : 4 , Data : []byte ("d" )}, {Index : 5 , Data : []byte ("e" )}},
1315+ },
1316+ },
1317+ {
1318+ name : "read at snapshot" ,
1319+ operations : []batch {
1320+ {
1321+ state : & raftpb.HardState {Commit : 1 },
1322+ entries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}},
1323+ },
1324+ {
1325+ snapshot : & walpb.Snapshot {Index : 3 , ConfState : & raftpb.ConfState {}},
1326+ },
1327+ {
1328+ state : & raftpb.HardState {Commit : 5 },
1329+ entries : []raftpb.Entry {{Index : 4 , Data : []byte ("d" )}, {Index : 5 , Data : []byte ("e" )}},
1330+ },
1331+ },
1332+ readAt : walpb.Snapshot {Index : 3 },
1333+ walReadAll : want {
1334+ wantState : raftpb.HardState {Commit : 5 },
1335+ wantEntries : []raftpb.Entry {{Index : 4 , Data : []byte ("d" )}, {Index : 5 , Data : []byte ("e" )}},
1336+ },
1337+ readAllEntries : want {
1338+ wantState : raftpb.HardState {Commit : 5 },
1339+ wantEntries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}, {Index : 4 , Data : []byte ("d" )}, {Index : 5 , Data : []byte ("e" )}},
1340+ },
1341+ },
1342+ {
1343+ name : "entries preceeding snapshot" ,
1344+ operations : []batch {
1345+ {
1346+ snapshot : & walpb.Snapshot {Index : 4 , ConfState : & raftpb.ConfState {}},
1347+ },
1348+ {
1349+ state : & raftpb.HardState {Commit : 6 },
1350+ entries : []raftpb.Entry {{Index : 5 , Data : []byte ("a" )}, {Index : 6 , Data : []byte ("b" )}},
1351+ },
1352+ {
1353+ state : & raftpb.HardState {Commit : 4 },
1354+ entries : []raftpb.Entry {{Index : 3 , Data : []byte ("c" )}, {Index : 4 , Data : []byte ("d" )}},
1355+ },
1356+ {
1357+ state : & raftpb.HardState {Commit : 2 },
1358+ entries : []raftpb.Entry {{Index : 1 , Data : []byte ("e" )}, {Index : 2 , Data : []byte ("f" )}},
1359+ },
1360+ },
1361+ readAt : walpb.Snapshot {Index : 4 },
1362+ walReadAll : want {
1363+ wantState : raftpb.HardState {Commit : 2 },
1364+ wantEntries : []raftpb.Entry {{Index : 5 , Data : []byte ("a" )}, {Index : 6 , Data : []byte ("b" )}},
1365+ },
1366+ readAllEntries : want {
1367+ wantState : raftpb.HardState {Commit : 2 },
1368+ wantEntries : []raftpb.Entry {{Index : 5 , Data : []byte ("a" )}, {Index : 6 , Data : []byte ("b" )}},
1369+ },
1370+ },
1371+ {
1372+ name : "read after snapshot" ,
1373+ operations : []batch {
1374+ {
1375+ state : & raftpb.HardState {Commit : 1 },
1376+ entries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}},
1377+ },
1378+ {
1379+ snapshot : & walpb.Snapshot {Index : 3 , ConfState : & raftpb.ConfState {}},
1380+ },
1381+ {
1382+ state : & raftpb.HardState {Commit : 5 },
1383+ entries : []raftpb.Entry {{Index : 4 , Data : []byte ("d" )}, {Index : 5 , Data : []byte ("e" )}},
1384+ },
1385+ },
1386+ readAt : walpb.Snapshot {Index : 4 },
1387+ walReadAll : want {
1388+ wantError : "snapshot not found" ,
1389+ },
1390+ readAllEntries : want {
1391+ wantState : raftpb.HardState {Commit : 5 },
1392+ wantEntries : []raftpb.Entry {{Index : 1 , Data : []byte ("a" )}, {Index : 2 , Data : []byte ("b" )}, {Index : 4 , Data : []byte ("d" )}, {Index : 5 , Data : []byte ("e" )}},
1393+ },
1394+ },
1395+ }
1396+ for _ , tc := range tcs {
1397+ t .Run (tc .name , func (t * testing.T ) {
1398+ dir := t .TempDir ()
1399+ lg := zaptest .NewLogger (t )
1400+ w , err := Create (lg , dir , nil )
1401+ require .NoError (t , err )
1402+ for _ , op := range tc .operations {
1403+ if op .state != nil {
1404+ err = w .Save (* op .state , op .entries )
1405+ require .NoError (t , err )
1406+ }
1407+ if op .snapshot != nil {
1408+ err = w .SaveSnapshot (* op .snapshot )
1409+ require .NoError (t , err )
1410+ }
1411+ }
1412+ w .Close ()
1413+
1414+ w2 , err := OpenForRead (lg , dir , tc .readAt )
1415+ require .NoError (t , err )
1416+ defer w2 .Close ()
1417+ t .Run ("wal.ReadAll" , func (t * testing.T ) {
1418+ _ , state , entries , err := w2 .ReadAll ()
1419+ if tc .walReadAll .wantError != "" {
1420+ require .ErrorContains (t , err , tc .walReadAll .wantError )
1421+ return
1422+ }
1423+ require .NoError (t , err )
1424+ assert .Equal (t , tc .walReadAll .wantState , state )
1425+ assert .Equal (t , tc .walReadAll .wantEntries , entries )
1426+ })
1427+ t .Run ("ReadAllEntries" , func (t * testing.T ) {
1428+ state , entries , err := ReadAllEntries (lg , dir )
1429+ require .NoError (t , err )
1430+ assert .Equal (t , tc .readAllEntries .wantState , state )
1431+ assert .Equal (t , tc .readAllEntries .wantEntries , entries )
1432+ })
1433+ })
1434+ }
1435+ }
0 commit comments