diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go index 0ded1aa2fc5..0b90cb7a31c 100644 --- a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go +++ b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go @@ -276,10 +276,12 @@ func TestMultiSSTWriterInitSST(t *testing.T) { EndKey: roachpb.RKeyMax, } keySpans := rditer.MakeReplicatedKeySpans(&desc) + localSpans := keySpans[:len(keySpans)-1] + mvccSpan := keySpans[len(keySpans)-1] msstw, err := newMultiSSTWriter( - ctx, cluster.MakeTestingClusterSettings(), scratch, keySpans, 0, - false, /* skipRangeDelForLastSpan */ + ctx, cluster.MakeTestingClusterSettings(), scratch, localSpans, mvccSpan, 0, + false, /* skipRangeDelForMVCCSpan */ ) require.NoError(t, err) _, err = msstw.Finish(ctx) @@ -315,6 +317,51 @@ func TestMultiSSTWriterInitSST(t *testing.T) { } } +// TestMultiSSTWriterSize tests the effect of lowering the max size +// of sstables in a multiSSTWriter, and ensuring that the produced sstables +// are still correct. +func TestMultiSSTWriterSize(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + testRangeID := roachpb.RangeID(1) + testSnapUUID := uuid.Must(uuid.FromBytes([]byte("foobar1234567890"))) + testLimiter := rate.NewLimiter(rate.Inf, 0) + + cleanup, eng := newOnDiskEngine(ctx, t) + defer cleanup() + defer eng.Close() + + sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter) + scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID) + settings := cluster.MakeTestingClusterSettings() + MaxSnapshotSSTableSize.Override(ctx, &settings.SV, 100) + + desc := roachpb.RangeDescriptor{ + StartKey: roachpb.RKey("d"), + EndKey: roachpb.RKeyMax, + } + keySpans := rditer.MakeReplicatedKeySpans(&desc) + localSpans := keySpans[:len(keySpans)-1] + mvccSpan := keySpans[len(keySpans)-1] + + multiSSTWriter, err := newMultiSSTWriter(ctx, settings, scratch, localSpans, mvccSpan, 0, false) + require.NoError(t, err) + require.Equal(t, int64(0), multiSSTWriter.dataSize) + + for i := range localSpans { + require.NoError(t, multiSSTWriter.Put(ctx, storage.EngineKey{Key: localSpans[i].Key}, []byte("foo"))) + } + + for i := 0; i < 100; i++ { + require.NoError(t, multiSSTWriter.Put(ctx, storage.EngineKey{Key: roachpb.Key(append(desc.StartKey, byte(i)))}, []byte("foobarbaz"))) + } + + _, err = multiSSTWriter.Finish(ctx) + require.NoError(t, err) + require.Greater(t, len(scratch.SSTs()), len(keySpans)) +} + // TestMultiSSTWriterAddLastSpan tests that multiSSTWriter initializes each of // the SST files associated with the replicated key ranges by writing a range // deletion tombstone that spans the entire range of each respectively, except @@ -342,14 +389,16 @@ func TestMultiSSTWriterAddLastSpan(t *testing.T) { EndKey: roachpb.RKeyMax, } keySpans := rditer.MakeReplicatedKeySpans(&desc) + localSpans := keySpans[:len(keySpans)-1] + mvccSpan := keySpans[len(keySpans)-1] msstw, err := newMultiSSTWriter( - ctx, cluster.MakeTestingClusterSettings(), scratch, keySpans, 0, - true, /* skipRangeDelForLastSpan */ + ctx, cluster.MakeTestingClusterSettings(), scratch, localSpans, mvccSpan, 0, + true, /* skipRangeDelForMVCCSpan */ ) require.NoError(t, err) if addRangeDel { - require.NoError(t, msstw.addRangeDelForLastSpan()) + require.NoError(t, msstw.addClearForMVCCSpan()) } testKey := storage.MVCCKey{Key: roachpb.RKey("d1").AsRawKey(), Timestamp: hlc.Timestamp{WallTime: 1}} testEngineKey, _ := storage.DecodeEngineKey(storage.EncodeMVCCKey(testKey)) diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 997132b4411..130f600f97f 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -11,6 +11,7 @@ package kvserver import ( + "bytes" "context" "io" "time" @@ -62,6 +63,16 @@ const ( tagSnapshotTiming = "snapshot_timing_tag" ) +// MaxSnapshotSSTableSize is the maximum size of an sstable containing MVCC/user keys +// in a snapshot before we truncate and write a new snapshot sstable. +var MaxSnapshotSSTableSize = settings.RegisterByteSizeSetting( + settings.SystemOnly, + "kv.snapshot_rebalance.max_sst_size", + "maximum size of a rebalance or recovery SST size", + 128<<20, // 128 MB + settings.PositiveInt, +) + // snapshotMetrics contains metrics on the number and size of snapshots in // progress or in the snapshot queue. type snapshotMetrics struct { @@ -115,10 +126,21 @@ type kvBatchSnapshotStrategy struct { // multiSSTWriter is a wrapper around an SSTWriter and SSTSnapshotStorageScratch // that handles chunking SSTs and persisting them to disk. type multiSSTWriter struct { - st *cluster.Settings - scratch *SSTSnapshotStorageScratch - currSST storage.SSTWriter - keySpans []roachpb.Span + st *cluster.Settings + scratch *SSTSnapshotStorageScratch + currSST storage.SSTWriter + // localKeySpans are key spans that are considered unsplittable across sstables. + // mvccKeySpan can be split across multiple sstables if one of them exceeds + // maxSSTSize. + localKeySpans []roachpb.Span + mvccKeySpan roachpb.Span + // mvccSSTSpans reflects the actual split of the mvccKeySpan into constituent + // sstables. + mvccSSTSpans []storage.EngineKeyRange + // currSpan is the index of the current span being written to. The first + // len(localKeySpans) spans are localKeySpans, and the rest are mvccSSTSpans. + // In a sense, currSpan indexes into a slice composed of + // append(localKeySpans, mvccSSTSpans). currSpan int // The approximate size of the SST chunk to buffer in memory on the receiver // before flushing to disk. @@ -128,33 +150,84 @@ type multiSSTWriter struct { dataSize int64 // The total size of the SSTs. sstSize int64 - // if skipRangeDelForLastSpan is true, the last span is not ClearRanged in the - // same sstable. We rely on the caller to take care of clearing this span - // through a different process (eg. IngestAndExcise on pebble). - skipRangeDelForLastSpan bool + // if skipClearForMVCCSpan is true, the MVCC span is not ClearEngineRange()d in + // the same sstable. We rely on the caller to take care of clearing this span + // through a different process (eg. IngestAndExcise on pebble). Note that + // having this bool to true also disables all range key fragmentation + // and splitting of sstables in the mvcc span. + skipClearForMVCCSpan bool + // maxSSTSize is the maximum size to use for SSTs containing MVCC/user keys. + // Once the sstable writer reaches this size, it will be finalized and a new + // sstable will be created. + maxSSTSize int64 + // rangeKeyFrag is used to fragment range keys across the mvcc key spans. + rangeKeyFrag rangekey.Fragmenter } func newMultiSSTWriter( ctx context.Context, st *cluster.Settings, scratch *SSTSnapshotStorageScratch, - keySpans []roachpb.Span, + localKeySpans []roachpb.Span, + mvccKeySpan roachpb.Span, sstChunkSize int64, - skipRangeDelForLastSpan bool, -) (multiSSTWriter, error) { - msstw := multiSSTWriter{ - st: st, - scratch: scratch, - keySpans: keySpans, - sstChunkSize: sstChunkSize, - skipRangeDelForLastSpan: skipRangeDelForLastSpan, + skipClearForMVCCSpan bool, +) (*multiSSTWriter, error) { + msstw := &multiSSTWriter{ + st: st, + scratch: scratch, + localKeySpans: localKeySpans, + mvccKeySpan: mvccKeySpan, + mvccSSTSpans: []storage.EngineKeyRange{{ + Start: storage.EngineKey{Key: mvccKeySpan.Key}, + End: storage.EngineKey{Key: mvccKeySpan.EndKey}, + }}, + sstChunkSize: sstChunkSize, + skipClearForMVCCSpan: skipClearForMVCCSpan, + } + if !skipClearForMVCCSpan { + // If skipClearForMVCCSpan is true, we don't split the MVCC span across + // multiple sstables, as addClearForMVCCSpan could be called by the caller + // at any time. + msstw.maxSSTSize = MaxSnapshotSSTableSize.Get(&st.SV) + } + msstw.rangeKeyFrag = rangekey.Fragmenter{ + Cmp: storage.EngineComparer.Compare, + Format: storage.EngineComparer.FormatKey, + Emit: msstw.emitRangeKey, } + if err := msstw.initSST(ctx); err != nil { return msstw, err } return msstw, nil } +func (msstw *multiSSTWriter) emitRangeKey(key rangekey.Span) { + for i := range key.Keys { + if err := msstw.currSST.PutInternalRangeKey(key.Start, key.End, key.Keys[i]); err != nil { + panic("failed to put range key in sst") + } + } +} + +// currentSpan returns the current user-provided span that +// is being written to. Note that this does not account for +// mvcc keys being split across multiple sstables. +func (msstw *multiSSTWriter) currentSpan() roachpb.Span { + if msstw.currSpan >= len(msstw.localKeySpans) { + if msstw.currSpan >= len(msstw.localKeySpans)+len(msstw.mvccSSTSpans) { + panic("current span is out of bounds") + } + return msstw.mvccKeySpan + } + return msstw.localKeySpans[msstw.currSpan] +} + +func (msstw *multiSSTWriter) currSpanIsMVCCSpan() bool { + return msstw.currSpan >= len(msstw.localKeySpans) +} + func (msstw *multiSSTWriter) initSST(ctx context.Context) error { newSSTFile, err := msstw.scratch.NewFile(ctx, msstw.sstChunkSize) if err != nil { @@ -162,26 +235,95 @@ func (msstw *multiSSTWriter) initSST(ctx context.Context) error { } newSST := storage.MakeIngestionSSTWriter(ctx, msstw.st, newSSTFile) msstw.currSST = newSST - if msstw.skipRangeDelForLastSpan && msstw.currSpan == len(msstw.keySpans)-1 { - // Skip this ClearRange, as it will be excised at ingestion time in the - // engine instead. - return nil - } - if err := msstw.currSST.ClearRawRange( - msstw.keySpans[msstw.currSpan].Key, msstw.keySpans[msstw.currSpan].EndKey, - true /* pointKeys */, true, /* rangeKeys */ - ); err != nil { - msstw.currSST.Close() - return errors.Wrap(err, "failed to clear range on sst file writer") + if msstw.currSpan < len(msstw.localKeySpans) || (!msstw.skipClearForMVCCSpan && msstw.currSpan <= len(msstw.localKeySpans)) { + // We're either in a local key span, or we're in the first MVCC sstable + // span (before any splits). Add a RangeKeyDel for the whole MVCC span + // to the fragmenter; we don't need to keep re-adding it as the fragmenter + // will take care of splits. + startKey := storage.EngineKey{Key: msstw.currentSpan().Key}.Encode() + endKey := storage.EngineKey{Key: msstw.currentSpan().EndKey}.Encode() + // TODO(bilal): Export pebble's MakeTrailer so we don't have to create an internal key, + // and use that here. + ikey := pebble.MakeInternalKey(startKey, 0, pebble.InternalKeyKindRangeKeyDelete) + s := rangekey.Span{Start: startKey, End: endKey, Keys: []rangekey.Key{{Trailer: ikey.Trailer}}} + msstw.rangeKeyFrag.Add(s) } return nil } -func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error { +func (msstw *multiSSTWriter) finalizeSST(ctx context.Context, nextKey *storage.EngineKey) error { + currSpan := msstw.currentSpan() + if msstw.currSpanIsMVCCSpan() { + // We're in the MVCC span (ie. MVCC / user keys). If skipClearForMVCCSpan + // is true, we don't write a clearRange for the last span at all. Otherwise, + // we need to write a clearRange for all keys leading up to the current key + // we're writing. + currEngineSpan := msstw.mvccSSTSpans[msstw.currSpan-len(msstw.localKeySpans)] + if !msstw.skipClearForMVCCSpan { + if err := msstw.currSST.ClearEngineRange( + currEngineSpan.Start, currEngineSpan.End, + ); err != nil { + msstw.currSST.Close() + return errors.Wrap(err, "failed to clear range on sst file writer") + } + } + } else { + if err := msstw.currSST.ClearRawRange( + currSpan.Key, currSpan.EndKey, + true /* pointKeys */, false, /* rangeKeys */ + ); err != nil { + msstw.currSST.Close() + return errors.Wrap(err, "failed to clear range on sst file writer") + } + } + + // If we're at the last span, call Finish on the fragmenter. If we're not at the + // last span, call Truncate. + if msstw.currSpan == len(msstw.localKeySpans)+len(msstw.mvccSSTSpans)-1 { + msstw.rangeKeyFrag.Finish() + } else { + endKey := storage.EngineKey{Key: currSpan.EndKey} + if msstw.currSpanIsMVCCSpan() { + endKey = msstw.mvccSSTSpans[msstw.currSpan-len(msstw.localKeySpans)].End + } + msstw.rangeKeyFrag.Truncate(endKey.Encode()) + } + err := msstw.currSST.Finish() if err != nil { return errors.Wrap(err, "failed to finish sst") } + if nextKey != nil { + meta := msstw.currSST.Meta + encodedNextKey := nextKey.Encode() + if meta.HasPointKeys && storage.EngineKeyCompare(meta.LargestPoint.UserKey, encodedNextKey) > 0 { + metaEndKey, ok := storage.DecodeEngineKey(meta.LargestPoint.UserKey) + if !ok { + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest point key %s > next sstable start key %s", + meta.LargestPoint.UserKey, nextKey) + } + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest point key %s > next sstable start key %s", + metaEndKey, nextKey) + } + if meta.HasRangeDelKeys && storage.EngineKeyCompare(meta.LargestRangeDel.UserKey, encodedNextKey) > 0 { + metaEndKey, ok := storage.DecodeEngineKey(meta.LargestRangeDel.UserKey) + if !ok { + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest range del %s > next sstable start key %s", + meta.LargestRangeDel.UserKey, nextKey) + } + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest range del %s > next sstable start key %s", + metaEndKey, nextKey) + } + if meta.HasRangeKeys && storage.EngineKeyCompare(meta.LargestRangeKey.UserKey, encodedNextKey) > 0 { + metaEndKey, ok := storage.DecodeEngineKey(meta.LargestRangeKey.UserKey) + if !ok { + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest range key %s > next sstable start key %s", + meta.LargestRangeKey.UserKey, nextKey) + } + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest range key %s > next sstable start key %s", + metaEndKey, nextKey) + } + } msstw.dataSize += msstw.currSST.DataSize msstw.sstSize += int64(msstw.currSST.Meta.Size) msstw.currSpan++ @@ -189,22 +331,22 @@ func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error { return nil } -// addRangeDelForLastSpan allows us to explicitly add a deletion tombstone -// for the last span in the msstw, if it was instantiated with the expectation +// addClearForMVCCSpan allows us to explicitly add a deletion tombstone +// for the mvcc span in the msstw, if it was instantiated with the expectation // that no tombstone was necessary. -func (msstw *multiSSTWriter) addRangeDelForLastSpan() error { - if !msstw.skipRangeDelForLastSpan { +func (msstw *multiSSTWriter) addClearForMVCCSpan() error { + if !msstw.skipClearForMVCCSpan { // Nothing to do. return nil } - if msstw.currSpan < len(msstw.keySpans)-1 { - // When we switch to the last key span, we will just add a rangedel for it. - // Set skipRangeDelForLastSpan to false. - msstw.skipRangeDelForLastSpan = false + if msstw.currSpan < len(msstw.localKeySpans) { + // When we switch to the mvcc key span, we will just add a rangedel for it. + // Set skipClearForMVCCSpan to false. + msstw.skipClearForMVCCSpan = false return nil } - if msstw.currSpan > len(msstw.keySpans)-1 { - panic("cannot addRangeDel if sst writer has moved past user keys") + if msstw.currSpan >= len(msstw.localKeySpans) { + panic("cannot clearEngineRange if sst writer has moved past user keys") } panic("multiSSTWriter already added keys to sstable that cannot be deleted by a rangedel/rangekeydel within it") } @@ -213,31 +355,57 @@ func (msstw *multiSSTWriter) addRangeDelForLastSpan() error { // writer for writing a point/range key at key. For point keys, endKey and key // must equal each other. func (msstw *multiSSTWriter) rolloverSST( - ctx context.Context, key roachpb.Key, endKey roachpb.Key, + ctx context.Context, key storage.EngineKey, endKey storage.EngineKey, ) error { - for msstw.keySpans[msstw.currSpan].EndKey.Compare(key) <= 0 { + for msstw.currentSpan().EndKey.Compare(key.Key) <= 0 { // Finish the current SST, write to the file, and move to the next key // range. - if err := msstw.finalizeSST(ctx); err != nil { + if err := msstw.finalizeSST(ctx, &key); err != nil { return err } if err := msstw.initSST(ctx); err != nil { return err } } - if msstw.keySpans[msstw.currSpan].Key.Compare(key) > 0 || - msstw.keySpans[msstw.currSpan].EndKey.Compare(endKey) < 0 { - if !key.Equal(endKey) { - return errors.AssertionFailedf("client error: expected %s to fall in one of %s", - roachpb.Span{Key: key, EndKey: endKey}, msstw.keySpans) + currSpan := msstw.currentSpan() + if currSpan.Key.Compare(key.Key) > 0 || currSpan.EndKey.Compare(endKey.Key) < 0 { + if !key.Key.Equal(endKey.Key) { + return errors.AssertionFailedf("client error: expected %s to fall in one of %s or %s", + roachpb.Span{Key: key.Key, EndKey: endKey.Key}, msstw.localKeySpans, msstw.mvccKeySpan) + } + return errors.AssertionFailedf("client error: expected %s to fall in one of %s or %s", key, msstw.localKeySpans, msstw.mvccKeySpan) + } + if msstw.currSpanIsMVCCSpan() && msstw.maxSSTSize > 0 && msstw.currSST.DataSize > msstw.maxSSTSize { + // We're in an MVCC / user keys span, and the current sstable has exceeded + // the max size for MVCC sstables that we should be creating. Split this + // sstable into smaller ones. We do this by splitting the mvccKeySpan + // from [oldStartKey, oldEndKey) to [oldStartKey, key) and [key, oldEndKey). + // The split spans are added to msstw.mvccSSTSpans. + currSpan := &msstw.mvccSSTSpans[msstw.currSpan-len(msstw.localKeySpans)] + if bytes.Equal(currSpan.Start.Key, key.Key) && bytes.Equal(currSpan.Start.Version, key.Version) { + panic("unexpectedly reached max sstable size at start of an mvcc sstable span") + } + oldEndKey := currSpan.End + currSpan.End = key.Copy() + newSpan := storage.EngineKeyRange{Start: currSpan.End, End: oldEndKey} + msstw.mvccSSTSpans = append(msstw.mvccSSTSpans, newSpan) + if msstw.currSpan < len(msstw.localKeySpans)+len(msstw.mvccSSTSpans)-2 { + // This should never happen; we only split sstables when we're at the end + // of mvccSSTSpans. + panic("unexpectedly split an earlier mvcc sstable span in multiSSTWriter") + } + if err := msstw.finalizeSST(ctx, &key); err != nil { + return err + } + if err := msstw.initSST(ctx); err != nil { + return err } - return errors.AssertionFailedf("client error: expected %s to fall in one of %s", key, msstw.keySpans) } return nil } func (msstw *multiSSTWriter) Put(ctx context.Context, key storage.EngineKey, value []byte) error { - if err := msstw.rolloverSST(ctx, key.Key, key.Key); err != nil { + if err := msstw.rolloverSST(ctx, key, key); err != nil { return err } if err := msstw.currSST.PutEngineKey(key, value); err != nil { @@ -253,7 +421,7 @@ func (msstw *multiSSTWriter) PutInternalPointKey( if !ok { return errors.New("cannot decode engine key") } - if err := msstw.rolloverSST(ctx, decodedKey.Key, decodedKey.Key); err != nil { + if err := msstw.rolloverSST(ctx, decodedKey, decodedKey); err != nil { return err } var err error @@ -290,11 +458,14 @@ func decodeRangeStartEnd( } func (msstw *multiSSTWriter) PutInternalRangeDelete(ctx context.Context, start, end []byte) error { + if !msstw.skipClearForMVCCSpan { + panic("can only add internal range deletes to multiSSTWriter if skipClearForMVCCSpan is true") + } decodedStart, decodedEnd, err := decodeRangeStartEnd(start, end) if err != nil { return err } - if err := msstw.rolloverSST(ctx, decodedStart.Key, decodedEnd.Key); err != nil { + if err := msstw.rolloverSST(ctx, decodedStart, decodedEnd); err != nil { return err } if err := msstw.currSST.ClearRawEncodedRange(start, end); err != nil { @@ -306,11 +477,14 @@ func (msstw *multiSSTWriter) PutInternalRangeDelete(ctx context.Context, start, func (msstw *multiSSTWriter) PutInternalRangeKey( ctx context.Context, start, end []byte, key rangekey.Key, ) error { + if !msstw.skipClearForMVCCSpan { + panic("can only add internal range deletes to multiSSTWriter if skipClearForMVCCSpan is true") + } decodedStart, decodedEnd, err := decodeRangeStartEnd(start, end) if err != nil { return err } - if err := msstw.rolloverSST(ctx, decodedStart.Key, decodedEnd.Key); err != nil { + if err := msstw.rolloverSST(ctx, decodedStart, decodedEnd); err != nil { return err } if err := msstw.currSST.PutInternalRangeKey(start, end, key); err != nil { @@ -325,22 +499,34 @@ func (msstw *multiSSTWriter) PutRangeKey( if start.Compare(end) >= 0 { return errors.AssertionFailedf("start key %s must be before end key %s", end, start) } - if err := msstw.rolloverSST(ctx, start, end); err != nil { + if err := msstw.rolloverSST(ctx, storage.EngineKey{Key: start}, storage.EngineKey{Key: end}); err != nil { return err } - if err := msstw.currSST.PutEngineRangeKey(start, end, suffix, value); err != nil { - return errors.Wrap(err, "failed to put range key in sst") + if msstw.skipClearForMVCCSpan { + // Skip the fragmenter. See the comment in skipClearForMVCCSpan. + if err := msstw.currSST.PutEngineRangeKey(start, end, suffix, value); err != nil { + return errors.Wrap(err, "failed to put range key in sst") + } + return nil } + startKey, endKey := storage.EngineKey{Key: start}.Encode(), storage.EngineKey{Key: end}.Encode() + // TODO(bilal): Use pebble.MakeTrailer instead of this, once that method is exported. + startIKey := pebble.MakeInternalKey(startKey, 0, pebble.InternalKeyKindRangeKeySet) + msstw.rangeKeyFrag.Add(rangekey.Span{ + Start: startKey, + End: endKey, + Keys: []rangekey.Key{{Trailer: startIKey.Trailer, Suffix: suffix, Value: value}}, + }) return nil } func (msstw *multiSSTWriter) Finish(ctx context.Context) (int64, error) { - if msstw.currSpan < len(msstw.keySpans) { + if msstw.currSpan < (len(msstw.localKeySpans) + len(msstw.mvccSSTSpans)) { for { - if err := msstw.finalizeSST(ctx); err != nil { + if err := msstw.finalizeSST(ctx, nil /* nextKey */); err != nil { return 0, err } - if msstw.currSpan >= len(msstw.keySpans) { + if msstw.currSpan >= (len(msstw.localKeySpans) + len(msstw.mvccSSTSpans)) { break } if err := msstw.initSST(ctx); err != nil { @@ -506,8 +692,11 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( // TODO(aaditya): Remove once we support flushableIngests for shared and // external files in the engine. - skipRangeDelForLastSpan := doExcise && (header.SharedReplicate || header.ExternalReplicate) - msstw, err := newMultiSSTWriter(ctx, kvSS.st, kvSS.scratch, keyRanges, kvSS.sstChunkSize, skipRangeDelForLastSpan) + skipClearForMVCCSpan := doExcise && (header.SharedReplicate || header.ExternalReplicate) + // The last key range is the user key span. + localRanges := keyRanges[:len(keyRanges)-1] + mvccRange := keyRanges[len(keyRanges)-1] + msstw, err := newMultiSSTWriter(ctx, kvSS.st, kvSS.scratch, localRanges, mvccRange, kvSS.sstChunkSize, skipClearForMVCCSpan) if err != nil { return noSnap, err } @@ -533,7 +722,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( doExcise = false sharedSSTs = nil externalSSTs = nil - if err := msstw.addRangeDelForLastSpan(); err != nil { + if err := msstw.addClearForMVCCSpan(); err != nil { return noSnap, errors.Wrap(err, "adding tombstone for last span") } } @@ -706,7 +895,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( sharedSSTs: sharedSSTs, externalSSTs: externalSSTs, doExcise: doExcise, - includesRangeDelForLastSpan: !skipRangeDelForLastSpan, + includesRangeDelForLastSpan: !skipClearForMVCCSpan, clearedSpans: keyRanges, } diff --git a/pkg/storage/engine_key.go b/pkg/storage/engine_key.go index 91bb63747cd..19180566610 100644 --- a/pkg/storage/engine_key.go +++ b/pkg/storage/engine_key.go @@ -419,3 +419,8 @@ func decodeMVCCMetaAndVerify(key roachpb.Key, value []byte) error { } return decodeMVCCValueAndVerify(key, meta.RawBytes) } + +// EngineKeyRange is a key range composed of EngineKeys. +type EngineKeyRange struct { + Start, End EngineKey +} diff --git a/pkg/storage/sst_writer.go b/pkg/storage/sst_writer.go index 4c4179e15bd..66ee08e83fa 100644 --- a/pkg/storage/sst_writer.go +++ b/pkg/storage/sst_writer.go @@ -291,6 +291,17 @@ func (fw *SSTWriter) ClearEngineRangeKey(start, end roachpb.Key, suffix []byte) return fw.fw.RangeKeyUnset(EngineKey{Key: start}.Encode(), EngineKey{Key: end}.Encode(), suffix) } +// ClearEngineRange clears point keys in the specified EngineKey range. +func (fw *SSTWriter) ClearEngineRange(start, end EngineKey) error { + fw.scratch = start.EncodeToBuf(fw.scratch[:0]) + endRaw := end.Encode() + fw.DataSize += int64(len(start.Key)) + int64(len(end.Key)) + if err := fw.fw.DeleteRange(fw.scratch, endRaw); err != nil { + return err + } + return nil +} + // ClearRawEncodedRange implements the InternalWriter interface. func (fw *SSTWriter) ClearRawEncodedRange(start, end []byte) error { startEngine, ok := DecodeEngineKey(start)