-
Notifications
You must be signed in to change notification settings - Fork 0
/
workpool.go
80 lines (56 loc) · 1.22 KB
/
workpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package workpool
import "golang.org/x/sync/semaphore"
import "context"
import "sync"
type WorkerPool struct {
sem *semaphore.Weighted
wg *sync.WaitGroup
size int64
}
// run function as goroutine onto pool
func (pool *WorkerPool) Run(f func()) {
ctx := context.Background()
pool.wg.Add(1)
pool.sem.Acquire(ctx,1)
go func() {
defer pool.sem.Release(1)
defer pool.wg.Done()
f()
}()
}
// Spawns function onto worker pool and returns a future / channel
func Spawn[T any](pool *WorkerPool, f func() T) chan T {
outCh := make(chan T, 1)
f1 := func() {
outCh <- f()
}
pool.Run(f1)
return outCh
}
func Create(n_workers int) WorkerPool {
w := int64(n_workers)
sem := semaphore.NewWeighted(w)
var wg sync.WaitGroup
return WorkerPool{sem, &wg, w}
}
// blocks until all pending tasks on pool are completed
func (pool *WorkerPool) Wait() {
pool.wg.Wait()
}
/* map list of tasks on to worker pool and outputs results as stream */
func Map[T, U any](wp *WorkerPool, ch chan T, f func(x T) U) chan U {
out := make(chan U, wp.size)
var w sync.WaitGroup
for x := range ch {
w.Add(1)
wp.Run(func() {
out <- f(x)
w.Done()
})
}
go func() {
w.Wait()
close(out)
}()
return out
}