diff --git a/blocks/randompruner.go b/blocks/randompruner.go index 0cfdc13..db24154 100644 --- a/blocks/randompruner.go +++ b/blocks/randompruner.go @@ -35,6 +35,11 @@ type RandomPrunerConfig struct { PinDuration time.Duration } +type cidStatus struct { + pinned bool + pinTime time.Time +} + // RandomPruner is a blockstore wrapper which removes blocks at random when disk // space is exhausted type RandomPruner struct { @@ -48,12 +53,25 @@ type RandomPruner struct { // A list of "hot" CIDs which should not be deleted, and when they were last // used - pins map[cid.Cid]time.Time - pinsLk sync.Mutex + allCids map[cid.Cid]*cidStatus + allCidsLk sync.Mutex + lastAllCidsUpdate time.Time pruneLk sync.Mutex } +var cidStatusPool = sync.Pool{ + New: func() any { + return new(cidStatus) + }, +} + +// 128K +const approxBlockSizeBytes = 1 << 17 + +// 40 Bytes +const approxCidSizeBytes = 40 + // The datastore that was used to create the blockstore is a required parameter // used for calculating remaining disk space - the Blockstore interface itself // does not provide this information @@ -82,6 +100,8 @@ func NewRandomPruner( log.Infof("Initialized pruner's tracked size as %s", humanize.IBytes(size)) + var cidMapApproxSize uint64 = cfg.Threshold * approxCidSizeBytes / approxBlockSizeBytes + pruner := &RandomPruner{ Blockstore: inner, datastore: datastore, @@ -89,8 +109,7 @@ func NewRandomPruner( pruneBytes: cfg.PruneBytes, pinDuration: cfg.PinDuration, size: size, - - pins: make(map[cid.Cid]time.Time), + allCids: make(map[cid.Cid]*cidStatus, cidMapApproxSize), } // Poll immediately on startup and then periodically @@ -115,6 +134,13 @@ func (pruner *RandomPruner) DeleteBlock(ctx context.Context, cid cid.Cid) error } pruner.size -= uint64(blockSize) + pruner.allCidsLk.Lock() + cs, ok := pruner.allCids[cid] + if ok { + delete(pruner.allCids, cid) + cidStatusPool.Put(cs) + } + pruner.allCidsLk.Unlock() return nil } @@ -190,12 +216,40 @@ func (pruner *RandomPruner) prune(ctx context.Context, bytesToPrune uint64) erro log.Infof("Starting prune operation with original datastore size of %s", humanize.IBytes(pruner.size)) start := time.Now() - // Get all the keys in the blockstore - allCids, err := pruner.AllKeysChan(ctx) - if err != nil { - return err + // periodically sync the in memory list of all cids with disk + if time.Since(pruner.lastAllCidsUpdate) > time.Hour { + diskCids, err := pruner.AllKeysChan(ctx) + if err != nil { + return err + } + pruner.allCidsLk.Lock() + inMemoryCids := make(map[cid.Cid]struct{}, len(pruner.allCids)) + for cid := range pruner.allCids { + inMemoryCids[cid] = struct{}{} + } + for diskCid := range diskCids { + if ctx.Err() != nil { + pruner.allCidsLk.Unlock() + return ctx.Err() + } + _, existing := pruner.allCids[diskCid] + if existing { + delete(inMemoryCids, diskCid) + } else { + cs := cidStatusPool.Get().(*cidStatus) + cs.pinned = false + pruner.allCids[diskCid] = cs + } + } + // delete remaining in memory cids that aren't on disk + for inMemoryCid := range inMemoryCids { + cs := pruner.allCids[inMemoryCid] + delete(pruner.allCids, inMemoryCid) + cidStatusPool.Put(cs) + } + pruner.allCidsLk.Unlock() + pruner.lastAllCidsUpdate = time.Now() } - tmpFile, err := os.Create(path.Join(os.TempDir(), "autoretrieve-prune.txt")) if err != nil { return err @@ -206,18 +260,21 @@ func (pruner *RandomPruner) prune(ctx context.Context, bytesToPrune uint64) erro const cidPadLength = 64 writer := bufio.NewWriter(tmpFile) cidCount := 0 - for cid := range allCids { - if ctx.Err() != nil { - return ctx.Err() - } - // Don't consider pinned blocks for deletion - if pruner.isPinned(cid) { + pruner.allCidsLk.Lock() + for cid, status := range pruner.allCids { + if status.pinned && time.Since(status.pinTime) < pruner.pinDuration { continue } + status.pinned = false + paddedCidStr := fmt.Sprintf("%-*s", cidPadLength, cid.String()) + if len(paddedCidStr) > cidPadLength { + paddedCidStr = paddedCidStr[:cidPadLength] + } if _, err := writer.WriteString(paddedCidStr + "\n"); err != nil { + pruner.allCidsLk.Unlock() return fmt.Errorf("failed to write cid to tmp file: %w", err) } cidCount++ @@ -295,25 +352,13 @@ func (pruner *RandomPruner) updatePin(pin cid.Cid) { emptyPinClone := make([]byte, 0, len(pin.Bytes())) pinClone := append(emptyPinClone, pin.Bytes()...) pin = cid.MustParse(pinClone) - pruner.pinsLk.Lock() - pruner.pins[pin] = time.Now() - pruner.pinsLk.Unlock() -} - -func (pruner *RandomPruner) isPinned(cid cid.Cid) bool { - pruner.pinsLk.Lock() - defer pruner.pinsLk.Unlock() - - lastUse, ok := pruner.pins[cid] + pruner.allCidsLk.Lock() + cs, ok := pruner.allCids[pin] if !ok { - return false + cs = cidStatusPool.Get().(*cidStatus) + pruner.allCids[pin] = cs } - - if time.Since(lastUse) < pruner.pinDuration { - return true - } - - delete(pruner.pins, cid) - return false - + cs.pinned = true + cs.pinTime = time.Now() + pruner.allCidsLk.Unlock() }