Skip to content

Commit

Permalink
[pool] fixed task scheduling timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
shmel1k committed Dec 17, 2018
1 parent 50c2b33 commit 4523cfd
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (p *Pool) add(t TaskFn) error {
return ErrPoolFull
}

left := time.Since(started) - p.conf.TaskScheduleTimeout
left := p.conf.TaskScheduleTimeout - time.Since(started)
if left <= 0 {
return ErrScheduleTimeout
}
Expand Down
56 changes: 43 additions & 13 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,40 @@ func TestPoolQueueOverfilled(t *testing.T) {
}

func TestPoolScheduleTimeout(t *testing.T) {
pool := NewPool(Config{
MaxQueueSize: 1,
MaxWorkers: 1,
UnstoppableWorkers: 1,
TaskScheduleTimeout: 10 * time.Millisecond,
})

for i := 0; i < 5; i++ {
pool.Add(TaskFn(func() {
time.Sleep(10 * time.Second)
}))
var testData = []struct {
pool *Pool
expectedErr error
}{
{
pool: NewPool(Config{
MaxQueueSize: 1,
MaxWorkers: 1,
UnstoppableWorkers: 1,
TaskScheduleTimeout: 10 * time.Millisecond,
}),
expectedErr: ErrScheduleTimeout,
},
{
pool: NewPool(Config{
MaxQueueSize: 1,
MaxWorkers: 1,
UnstoppableWorkers: 1,
TaskScheduleTimeout: 5 * time.Nanosecond,
}),
expectedErr: ErrScheduleTimeout,
},
}

if err := pool.Add(TaskFn(func() {})); err != ErrScheduleTimeout {
t.Fatalf("add task: want err %v, got %v", ErrScheduleTimeout, err)
for i, v := range testData {
for i := 0; i < 5; i++ {
v.pool.Add(TaskFn(func() {
time.Sleep(10 * time.Second)
}))
}

if err := v.pool.Add(TaskFn(func() {})); err != v.expectedErr {
t.Errorf("TestPoolScheduleTimeout[%d]: add task: want err %v, got %v", i, v.expectedErr, err)
}
}
}

Expand Down Expand Up @@ -226,6 +245,17 @@ func TestPoolWithAdditionalWorkersClose(t *testing.T) {
pool.Shutdown()
}

func TestPoolShutdown(t *testing.T) {
pool := NewPool(Config{
MaxQueueSize: 1,
MaxWorkers: 1,
UnstoppableWorkers: 1,
})
time.Sleep(5 * time.Millisecond)

pool.Shutdown()
}

func BenchmarkPool(b *testing.B) {
pool := NewPool(Config{
MaxQueueSize: 0,
Expand Down

0 comments on commit 4523cfd

Please sign in to comment.