-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Feat(WIP): prioritized generic queue
- Loading branch information
1 parent
7acdbf9
commit e4b3dd7
Showing
4 changed files
with
137 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,6 @@ | ||
git.tcp.direct/kayos/common v0.9.2 h1:T7XzvmtPTmhXMTIb8v5W7a4DAV8ekusRkCe1WJ/qGBY= | ||
git.tcp.direct/kayos/common v0.9.2/go.mod h1:rMExTem3JjB5XrwL+i+/QBv3agRFB8mdsoQ97wzBa+8= | ||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= | ||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= | ||
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= | ||
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
package queue | ||
|
||
import "errors" | ||
|
||
var ErrQueueFull = errors.New("bounded queue is full") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
package queue | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"sync/atomic" | ||
|
||
"git.tcp.direct/kayos/common/list" | ||
"golang.org/x/exp/constraints" | ||
) | ||
|
||
type Item[P constraints.Ordered, V any] struct { | ||
value *V | ||
priority P | ||
index int | ||
} | ||
|
||
func NewItem[P constraints.Ordered, V any](v *V, p P) *Item[P, V] { | ||
return &Item[P, V]{ | ||
value: v, | ||
priority: p, | ||
index: 0, | ||
} | ||
} | ||
|
||
func (i *Item[P, V]) Less(j any) bool { | ||
return i.priority < j.(Item[P, V]).priority | ||
} | ||
|
||
func (i *Item[P, V]) Value() *V { | ||
return i.value | ||
} | ||
|
||
func (i *Item[P, V]) Priority() P { | ||
return i.priority | ||
} | ||
|
||
func (i *Item[P, V]) Set(v *V) { | ||
i.value = v | ||
} | ||
|
||
func (i *Item[P, V]) SetPriority(p P) { | ||
i.priority = p | ||
} | ||
|
||
type boundedQueue[P constraints.Ordered, V any] struct { | ||
items []*Item[P, V] | ||
qcap int64 | ||
qlen *atomic.Int64 | ||
mu *sync.RWMutex | ||
errs *list.LockingList | ||
} | ||
|
||
type BoundedQueue[P constraints.Ordered, V any] struct { | ||
// making sure container/heap can access boundedQueue without the normal API being exposed to the user | ||
inner *boundedQueue[P, V] | ||
} | ||
|
||
func NewBoundedQueue[P constraints.Ordered, V any](cap int64) *BoundedQueue[P, V] { | ||
bq := &BoundedQueue[P, V]{} | ||
bq.inner = &boundedQueue[P, V]{ | ||
items: make([]*Item[P, V], 0, cap), | ||
qcap: cap, | ||
qlen: &atomic.Int64{}, | ||
mu: &sync.RWMutex{}, | ||
errs: list.New(), | ||
} | ||
bq.inner.qlen.Store(0) | ||
return bq | ||
} | ||
|
||
func (b *boundedQueue[P, V]) err(err error) { | ||
b.errs.PushBack(err) | ||
} | ||
|
||
func (b *BoundedQueue[P, V]) Err() error { | ||
errLen := b.inner.errs.Len() | ||
switch errLen { | ||
case 0: | ||
return nil | ||
case 1: | ||
return b.inner.errs.Front().Value().(error) | ||
default: | ||
return fmt.Errorf("%w | (%d more errors in queue)", b.inner.errs.Front().Value().(error), errLen-1) | ||
} | ||
} | ||
|
||
func (b *BoundedQueue[P, V]) Len() int { | ||
return int(b.inner.qlen.Load()) | ||
} | ||
|
||
func (b *boundedQueue[P, V]) Less(i, j int) bool { | ||
b.mu.RLock() | ||
less := b.items[P, V][i].priority < b.items[P, V][j].priority | ||
b.mu.RUnlock() | ||
return less | ||
} | ||
|
||
func (b *boundedQueue[P, V]) Swap(i, j int) { | ||
b.mu.Lock() | ||
b.items[P, V][i], b.items[P, V][j] = b.items[P, V][j], b.items[P, V][i] | ||
b.items[P, V][i].index = i | ||
b.items[P, V][j].index = j | ||
b.mu.Unlock() | ||
} | ||
|
||
func (b *boundedQueue[P, V]) Push(x any) { | ||
if b.qlen.Load() >= b.qcap { | ||
b.err(fmt.Errorf("%w: %v dropped", ErrQueueFull, x)) | ||
return | ||
} | ||
defer b.qlen.Add(1) | ||
b.mu.Lock() | ||
b.items[P, V] = append(b.items[P, V], x.(*Item[P, V])) | ||
b.qlen.Add(1) | ||
b.mu.Unlock() | ||
} | ||
|
||
func (b *boundedQueue[P, V]) Pop() any { | ||
// todo | ||
return nil | ||
} |