Skip to content

Commit

Permalink
indices: speed up ProcessSlice / ProcessSliceFn
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdecaf committed Jan 15, 2025
1 parent 993a402 commit 7d2112c
Showing 1 changed file with 62 additions and 31 deletions.
93 changes: 62 additions & 31 deletions internal/indices/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,60 +32,91 @@ func New(total, groups int) []int {
return append(xs, total)
}

// ProcessSlice processes input slice concurrently using the provided function
func ProcessSlice[T any, F any](in []T, groups int, f func(T) F) []F {
results := make(chan F, len(in))
// ProcessSlice processes items concurrently using a worker pool
func ProcessSlice[T any, F any](in []T, workers int, f func(T) F) []F {
if len(in) == 0 {
return nil
}

var wg sync.WaitGroup
wg.Add(len(in))
// For very small slices, process sequentially
if len(in) < workers {
out := make([]F, len(in))
for i, item := range in {
out[i] = f(item)
}
return out
}

for _, elm := range in {
go func(v T) {
defer wg.Done()
results <- f(v)
}(elm)
// Calculate chunk size
chunkSize := len(in) / workers
if chunkSize < 1 {
chunkSize = 1
}

go func() {
wg.Wait()
close(results)
}()
// Pre-allocate output slice
out := make([]F, len(in))
var wg sync.WaitGroup

// Process chunks directly, writing to pre-allocated slice
for i := 0; i < len(in); i += chunkSize {
wg.Add(1)
start := i
end := i + chunkSize
if end > len(in) {
end = len(in)
}

out := make([]F, 0, len(in))
for result := range results {
out = append(out, result)
go func(start, end int) {
defer wg.Done()
// Process chunk and write directly to output slice
for i, item := range in[start:end] {
out[start+i] = f(item)
}
}(start, end)
}

wg.Wait()
return out
}

func ProcessSliceFn[T any](in []T, groups int, f func(T)) {
// ProcessSliceFn processes items in chunks concurrently
func ProcessSliceFn[T any](in []T, workers int, f func(T)) {
if len(in) == 0 {
return
}

indices := New(len(in), groups)
numGroups := len(indices) - 1 // Number of actual chunks
// For very small slices, process sequentially
if len(in) < workers {
for _, item := range in {
f(item)
}
return
}

// Calculate chunk size
chunkSize := len(in) / workers
if chunkSize < 1 {
chunkSize = 1
}

// Use WaitGroup for synchronization
var wg sync.WaitGroup
wg.Add(numGroups)

// Process each chunk concurrently
for i := 0; i < numGroups; i++ {
start := indices[i]
end := indices[i+1]
// Process chunks directly
for i := 0; i < len(in); i += chunkSize {
wg.Add(1)
start := i
end := i + chunkSize
if end > len(in) {
end = len(in)
}

go func(start, end int) {
defer wg.Done()

// Process chunk and write directly to pre-allocated output slice
for _, v := range in[start:end] {
f(v)
for _, item := range in[start:end] {
f(item)
}
}(start, end)
}

// Wait for all goroutines to complete
wg.Wait()
}

0 comments on commit 7d2112c

Please sign in to comment.