Skip to content

Commit

Permalink
HW6 Pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
bimboterminator1 committed Jan 18, 2025
1 parent eb79325 commit 785910c
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 3 deletions.
37 changes: 35 additions & 2 deletions hw06_pipeline_execution/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,39 @@ type (
type Stage func(in In) (out Out)

func ExecutePipeline(in In, done In, stages ...Stage) Out {
// Place your code here.
return nil
if len(stages) == 0 {
return in
}

proxyChan := func(inChan In, doneChan In) Out {
out := make(Bi)

go func() {
defer func() {
close(out)
//nolint:revive
for range inChan {
}
}()
for {
select {
case <-doneChan:
return
case v, ok := <-inChan:
if !ok {
return
}
out <- v
}
}
}()
return out
}

outChan := in
for i := 0; i < len(stages); i++ {
outChan = stages[i](proxyChan(outChan, done))
}

return outChan
}
47 changes: 46 additions & 1 deletion hw06_pipeline_execution/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,51 @@ func TestAllStageStop(t *testing.T) {
wg.Wait()

require.Len(t, result, 0)

})
}

func TestEdgeInputs(t *testing.T) {
stages := []Stage{
func(in In) Out {
out := make(Bi)
go func() {
defer close(out)
for v := range in {
out <- v
}
}()
return out
},
}

// closed channel
in := make(Bi)
close(in)

result := make([]interface{}, 0)
for s := range ExecutePipeline(in, nil, stages...) {
result = append(result, s)
}

require.Len(t, result, 0)

// empty stages
in = make(Bi)
go func() {
for i := 0; i < 10; i++ {
in <- i
}
close(in)
}()

stages = []Stage{}
resultEmpty := make([]int, 0)
for s := range ExecutePipeline(in, nil, stages...) {
resultEmpty = append(resultEmpty, s.(int))
}

require.Len(t, resultEmpty, 10)
for i := 0; i < 10; i++ {
require.Equal(t, i, resultEmpty[i])
}
}

0 comments on commit 785910c

Please sign in to comment.