Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache all cids in memory in pruner #175

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 77 additions & 34 deletions blocks/randompruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type RandomPrunerConfig struct {
PinDuration time.Duration
}

type cidStatus struct {
pinned bool
pinTime time.Time
}

// RandomPruner is a blockstore wrapper which removes blocks at random when disk
// space is exhausted
type RandomPruner struct {
Expand All @@ -48,12 +53,25 @@ type RandomPruner struct {

// A list of "hot" CIDs which should not be deleted, and when they were last
// used
pins map[cid.Cid]time.Time
pinsLk sync.Mutex
allCids map[cid.Cid]*cidStatus
allCidsLk sync.Mutex
lastAllCidsUpdate time.Time

pruneLk sync.Mutex
}

var cidStatusPool = sync.Pool{
New: func() any {
return new(cidStatus)
},
}

// 128K
const approxBlockSizeBytes = 1 << 17

// 40 Bytes
const approxCidSizeBytes = 40

// The datastore that was used to create the blockstore is a required parameter
// used for calculating remaining disk space - the Blockstore interface itself
// does not provide this information
Expand Down Expand Up @@ -82,15 +100,16 @@ func NewRandomPruner(

log.Infof("Initialized pruner's tracked size as %s", humanize.IBytes(size))

var cidMapApproxSize uint64 = cfg.Threshold * approxCidSizeBytes / approxBlockSizeBytes

pruner := &RandomPruner{
Blockstore: inner,
datastore: datastore,
threshold: cfg.Threshold,
pruneBytes: cfg.PruneBytes,
pinDuration: cfg.PinDuration,
size: size,

pins: make(map[cid.Cid]time.Time),
allCids: make(map[cid.Cid]*cidStatus, cidMapApproxSize),
}

// Poll immediately on startup and then periodically
Expand All @@ -115,6 +134,13 @@ func (pruner *RandomPruner) DeleteBlock(ctx context.Context, cid cid.Cid) error
}

pruner.size -= uint64(blockSize)
pruner.allCidsLk.Lock()
cs, ok := pruner.allCids[cid]
delete(pruner.allCids, cid)
if ok {
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
cidStatusPool.Put(cs)
}
pruner.allCidsLk.Unlock()

return nil
}
Expand Down Expand Up @@ -190,12 +216,40 @@ func (pruner *RandomPruner) prune(ctx context.Context, bytesToPrune uint64) erro
log.Infof("Starting prune operation with original datastore size of %s", humanize.IBytes(pruner.size))
start := time.Now()

// Get all the keys in the blockstore
allCids, err := pruner.AllKeysChan(ctx)
if err != nil {
return err
// periodically sync the in memory list of all cids with disk
if time.Since(pruner.lastAllCidsUpdate) > time.Hour {
diskCids, err := pruner.AllKeysChan(ctx)
if err != nil {
return err
}
pruner.allCidsLk.Lock()
inMemoryCids := make(map[cid.Cid]struct{}, len(pruner.allCids))
for cid := range pruner.allCids {
inMemoryCids[cid] = struct{}{}
}
for diskCid := range diskCids {
if ctx.Err() != nil {
pruner.allCidsLk.Unlock()
return ctx.Err()
}
_, existing := pruner.allCids[diskCid]
if existing {
delete(inMemoryCids, diskCid)
} else {
cs := cidStatusPool.Get().(*cidStatus)
cs.pinned = false
pruner.allCids[diskCid] = cs
}
}
// delete remaining in memory cids that aren't on disk
for inMemoryCid := range inMemoryCids {
cs := pruner.allCids[inMemoryCid]
delete(pruner.allCids, inMemoryCid)
cidStatusPool.Put(cs)
}
pruner.allCidsLk.Unlock()
pruner.lastAllCidsUpdate = time.Now()
}

tmpFile, err := os.Create(path.Join(os.TempDir(), "autoretrieve-prune.txt"))
if err != nil {
return err
Expand All @@ -206,18 +260,19 @@ func (pruner *RandomPruner) prune(ctx context.Context, bytesToPrune uint64) erro
const cidPadLength = 64
writer := bufio.NewWriter(tmpFile)
cidCount := 0
for cid := range allCids {
if ctx.Err() != nil {
return ctx.Err()
}

// Don't consider pinned blocks for deletion
if pruner.isPinned(cid) {
pruner.allCidsLk.Lock()
for cid, status := range pruner.allCids {

if status.pinned && time.Since(status.pinTime) < pruner.pinDuration {
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
continue
}

status.pinned = false

paddedCidStr := fmt.Sprintf("%-*s", cidPadLength, cid.String())
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
if _, err := writer.WriteString(paddedCidStr + "\n"); err != nil {
pruner.allCidsLk.Unlock()
return fmt.Errorf("failed to write cid to tmp file: %w", err)
}
cidCount++
Expand Down Expand Up @@ -295,25 +350,13 @@ func (pruner *RandomPruner) updatePin(pin cid.Cid) {
emptyPinClone := make([]byte, 0, len(pin.Bytes()))
pinClone := append(emptyPinClone, pin.Bytes()...)
pin = cid.MustParse(pinClone)
pruner.pinsLk.Lock()
pruner.pins[pin] = time.Now()
pruner.pinsLk.Unlock()
}

func (pruner *RandomPruner) isPinned(cid cid.Cid) bool {
pruner.pinsLk.Lock()
defer pruner.pinsLk.Unlock()

lastUse, ok := pruner.pins[cid]
pruner.allCidsLk.Lock()
cs, ok := pruner.allCids[pin]
if !ok {
return false
cs = cidStatusPool.Get().(*cidStatus)
pruner.allCids[pin] = cs
}

if time.Since(lastUse) < pruner.pinDuration {
return true
}

delete(pruner.pins, cid)
return false

cs.pinned = true
cs.pinTime = time.Now()
pruner.allCidsLk.Unlock()
}