Skip to content

Commit fc4a09b

Browse files
authored
Reduce resource usage during traversal.
1 parent 797623f commit fc4a09b

File tree

1 file changed

+45
-15
lines changed

1 file changed

+45
-15
lines changed

structure/tree.go

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@ import (
1515
)
1616

1717
const (
18-
workerTimeout = time.Second * 2
19-
workerReset = time.Second
18+
workerTimeout = time.Second
2019
childPathBufSize = 512
21-
bfsQueueSize = 64
20+
bfsQueueSize = 1024
2221
)
2322

2423
// TreeOpt defines a custom type for configuring a *Tree instance.
@@ -244,16 +243,44 @@ func (t *Tree) PersistCache() (chan struct{}, error) {
244243
return t.cache.SetAsync(t.root.Path, t.root)
245244
}
246245

247-
func (t *Tree) TraverseAsync(skipCache bool) (chan struct{}, chan error) {
248-
var wg sync.WaitGroup
246+
// scanQueue represents a queue for *Entry instances scheduled for traversal.
247+
type scanQueue struct {
248+
entries []*Entry
249+
mx sync.RWMutex
250+
}
251+
252+
func (sq *scanQueue) Push(val *Entry) {
253+
if val == nil {
254+
return
255+
}
256+
257+
sq.mx.Lock()
258+
defer sq.mx.Unlock()
259+
260+
sq.entries = append(sq.entries, val)
261+
}
262+
263+
func (sq *scanQueue) Get() (*Entry, bool) {
264+
sq.mx.Lock()
265+
defer sq.mx.Unlock()
266+
267+
if len(sq.entries) == 0 {
268+
return nil, false
269+
}
270+
271+
entry := sq.entries[0]
272+
sq.entries = sq.entries[1:]
273+
274+
return entry, true
275+
}
249276

277+
func (t *Tree) TraverseAsync(skipCache bool) (chan struct{}, chan error) {
250278
drive.InoFilterInstance.Reset()
251279

252280
if t.root == nil || !t.root.IsDir {
253281
return nil, nil
254282
}
255283

256-
queue := make(chan *Entry, bfsQueueSize)
257284
done, errChan := make(chan struct{}), make(chan error, 1)
258285

259286
if !skipCache && t.cachingEnabled() && t.cache.Has(t.root.Path) {
@@ -266,9 +293,12 @@ func (t *Tree) TraverseAsync(skipCache bool) (chan struct{}, chan error) {
266293
return done, errChan
267294
}
268295

296+
var wg sync.WaitGroup
297+
269298
t.dirty = true
270299

271-
queue <- t.root
300+
queue := scanQueue{entries: make([]*Entry, 0, bfsQueueSize)}
301+
queue.Push(t.root)
272302

273303
worker := func() {
274304
timeoutTimer := time.NewTimer(workerTimeout)
@@ -282,21 +312,22 @@ func (t *Tree) TraverseAsync(skipCache bool) (chan struct{}, chan error) {
282312

283313
for {
284314
select {
285-
case entry, ok := <-queue:
315+
case <-timeoutTimer.C:
316+
return
317+
default:
318+
item, ok := queue.Get()
286319
if !ok {
287-
return
320+
continue
288321
}
289322

290323
t.handleEntry(
291324
ba,
292-
entry,
293-
func(newDir *Entry) { go func() { queue <- newDir }() },
325+
item,
326+
func(newDir *Entry) { queue.Push(newDir) },
294327
func(err error) { errChan <- err },
295328
)
296329

297-
timeoutTimer.Reset(workerReset)
298-
case <-timeoutTimer.C:
299-
return
330+
timeoutTimer.Reset(workerTimeout)
300331
}
301332
}
302333
}
@@ -310,7 +341,6 @@ func (t *Tree) TraverseAsync(skipCache bool) (chan struct{}, chan error) {
310341
wg.Wait()
311342

312343
close(done)
313-
close(queue)
314344
close(errChan)
315345
}()
316346

0 commit comments

Comments
 (0)