Skip to content

Commit

Permalink
Merge branch 'main' into log-output
Browse files Browse the repository at this point in the history
  • Loading branch information
wdbaruni authored Oct 7, 2024
2 parents e44cdae + 7285a99 commit 285e549
Show file tree
Hide file tree
Showing 7 changed files with 597 additions and 164 deletions.
137 changes: 112 additions & 25 deletions pkg/lib/collections/hashed_priority_queue.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,60 @@
package collections

import "sync"

import (
"sync"
)

// HashedPriorityQueue is a priority queue that maintains only a single item per unique key.
// It combines the functionality of a hash map and a priority queue to provide efficient
// operations with the following key characteristics:
//
// 1. Single Item Per Key: The queue maintains only the latest version of an item for each
// unique key. When a new item with an existing key is enqueued, it replaces the old item
// instead of adding a duplicate.
//
// 2. Lazy Dequeuing: Outdated items (those that have been replaced by a newer version) are
// not immediately removed from the underlying queue. Instead, they are filtered out
// during dequeue operations. This approach improves enqueue performance
// at the cost of potentially slower dequeue operations.
//
// 3. Higher Enqueue Throughput: By avoiding immediate removal of outdated items, the
// HashedPriorityQueue achieves higher enqueue throughput. This makes it particularly
// suitable for scenarios with frequent updates to existing items.
//
// 4. Eventually Consistent: The queue becomes consistent over time as outdated items are
// lazily removed during dequeue operations. This means that the queue's length and the
// items it contains become accurate as items are dequeued.
//
// 5. Memory Consideration: Due to the lazy removal of outdated items, the underlying queue
// may temporarily hold more items than there are unique keys. This trade-off allows for
// better performance but may use more memory compared to a strictly consistent queue.
//
// Use HashedPriorityQueue when you need a priority queue that efficiently handles updates
// to existing items and can tolerate some latency in removing outdated entries in favor
// of higher enqueue performance.
type HashedPriorityQueue[K comparable, T any] struct {
identifiers map[K]struct{}
queue *PriorityQueue[T]
identifiers map[K]int64
queue *PriorityQueue[versionedItem[T]]
mu sync.RWMutex
indexer IndexerFunc[K, T]
}

// versionedItem wraps the actual data item with a version number.
// This structure is used internally by HashedPriorityQueue to implement
// the versioning mechanism that allows for efficient updates and
// lazy removal of outdated items. The queue is only interested in
// the latest version of an item for each unique key:
// - data: The actual item of type T stored in the queue.
// - version: A monotonically increasing number representing the
// version of this item. When an item with the same key is enqueued,
// its version is incremented. This allows the queue to identify
// the most recent version during dequeue operations and discard
// any older versions of the same item.
type versionedItem[T any] struct {
data T
version int64
}

// IndexerFunc is used to find the key (of type K) from the provided
// item (T). This will be used for the item lookup in `Contains`
type IndexerFunc[K comparable, T any] func(item T) K
Expand All @@ -18,12 +64,24 @@ type IndexerFunc[K comparable, T any] func(item T) K
// be used on Enqueue/Dequeue to keep the index up to date.
func NewHashedPriorityQueue[K comparable, T any](indexer IndexerFunc[K, T]) *HashedPriorityQueue[K, T] {
return &HashedPriorityQueue[K, T]{
identifiers: make(map[K]struct{}),
queue: NewPriorityQueue[T](),
identifiers: make(map[K]int64),
queue: NewPriorityQueue[versionedItem[T]](),
indexer: indexer,
}
}

// isLatestVersion checks if the given item is the latest version
func (q *HashedPriorityQueue[K, T]) isLatestVersion(item versionedItem[T]) bool {
k := q.indexer(item.data)
currentVersion := q.identifiers[k]
return item.version == currentVersion
}

// unwrapQueueItem converts a versionedItem to a QueueItem
func (q *HashedPriorityQueue[K, T]) unwrapQueueItem(item *QueueItem[versionedItem[T]]) *QueueItem[T] {
return &QueueItem[T]{Value: item.Value.data, Priority: item.Priority}
}

// Contains will return true if the provided identifier (of type K)
// will be found in this queue, false if it is not present.
func (q *HashedPriorityQueue[K, T]) Contains(id K) bool {
Expand All @@ -40,9 +98,9 @@ func (q *HashedPriorityQueue[K, T]) Enqueue(data T, priority int64) {
defer q.mu.Unlock()

k := q.indexer(data)

q.identifiers[k] = struct{}{}
q.queue.Enqueue(data, priority)
version := q.identifiers[k] + 1
q.identifiers[k] = version
q.queue.Enqueue(versionedItem[T]{data: data, version: version}, priority)
}

// Dequeue returns the next highest priority item, returning both
Expand All @@ -53,16 +111,39 @@ func (q *HashedPriorityQueue[K, T]) Dequeue() *QueueItem[T] {
q.mu.Lock()
defer q.mu.Unlock()

item := q.queue.Dequeue()
if item == nil {
return nil
for {
item := q.queue.Dequeue()
if item == nil {
return nil
}

if q.isLatestVersion(item.Value) {
k := q.indexer(item.Value.data)
delete(q.identifiers, k)
return q.unwrapQueueItem(item)
}
}
}

// Peek returns the next highest priority item without removing it from the queue.
// It returns nil if the queue is empty.
func (q *HashedPriorityQueue[K, T]) Peek() *QueueItem[T] {
q.mu.RLock()
defer q.mu.RUnlock()

// Find the key for the item and delete it from the presence map
k := q.indexer(item.Value)
delete(q.identifiers, k)
for {
item := q.queue.Peek()
if item == nil {
return nil
}

return item
if q.isLatestVersion(item.Value) {
return q.unwrapQueueItem(item)
}

// If the peeked item is outdated, remove it and continue
q.queue.Dequeue()
}
}

// DequeueWhere allows the caller to iterate through the queue, in priority order, and
Expand All @@ -74,26 +155,32 @@ func (q *HashedPriorityQueue[K, T]) DequeueWhere(matcher MatchingFunction[T]) *Q
q.mu.Lock()
defer q.mu.Unlock()

item := q.queue.DequeueWhere(matcher)
if item == nil {
return nil
}
for {
item := q.queue.DequeueWhere(func(vi versionedItem[T]) bool {
return matcher(vi.data)
})

k := q.indexer(item.Value)
delete(q.identifiers, k)
if item == nil {
return nil
}

return item
if q.isLatestVersion(item.Value) {
k := q.indexer(item.Value.data)
delete(q.identifiers, k)
return q.unwrapQueueItem(item)
}
}
}

// Len returns the number of items currently in the queue
func (q *HashedPriorityQueue[K, T]) Len() int {
return q.queue.Len()
return len(q.identifiers)
}

// IsEmpty returns a boolean denoting whether the queue is
// currently empty or not.
func (q *HashedPriorityQueue[K, T]) IsEmpty() bool {
return q.queue.Len() == 0
return q.Len() == 0
}

var _ PriorityQueueInterface[struct{}] = (*HashedPriorityQueue[string, struct{}])(nil)
135 changes: 122 additions & 13 deletions pkg/lib/collections/hashed_priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,142 @@ package collections_test
import (
"testing"

"github.com/bacalhau-project/bacalhau/pkg/lib/collections"
"github.com/stretchr/testify/suite"

"github.com/bacalhau-project/bacalhau/pkg/lib/collections"
)

type HashedPriorityQueueSuite struct {
suite.Suite
PriorityQueueTestSuite
}

func (s *HashedPriorityQueueSuite) SetupTest() {
s.NewQueue = func() collections.PriorityQueueInterface[TestData] {
return collections.NewHashedPriorityQueue[string, TestData](func(t TestData) string {
return t.id
})
}
}

func TestHashedPriorityQueueSuite(t *testing.T) {
suite.Run(t, new(HashedPriorityQueueSuite))
}

func (s *HashedPriorityQueueSuite) TestContains() {
type TestData struct {
id string
data int
}

indexer := func(t TestData) string {
return t.id
}

q := collections.NewHashedPriorityQueue[string, TestData](indexer)
q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData])

s.Require().False(q.Contains("A"))
q.Enqueue(TestData{id: "A", data: 0}, 1)
q.Enqueue(TestData{"A", 0}, 1)
s.Require().True(q.Contains("A"))
_ = q.Dequeue()
s.Require().False(q.Contains("A"))
}

func (s *HashedPriorityQueueSuite) TestPeek() {
q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData])

q.Enqueue(TestData{"A", 1}, 1)
q.Enqueue(TestData{"B", 2}, 2)

item := q.Peek()
s.Require().NotNil(item)
s.Require().Equal(TestData{"B", 2}, item.Value)
s.Require().True(q.Contains("A"), "Item A should still be in the queue after Peek")
s.Require().True(q.Contains("B"), "Item B should still be in the queue after Peek")

_ = q.Dequeue()
s.Require().False(q.Contains("B"), "Item B should not be in the queue after Dequeue")
s.Require().True(q.Contains("A"), "Item A should still be in the queue after Dequeue")
}

func (s *HashedPriorityQueueSuite) TestSingleItemPerKey() {
q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData])

q.Enqueue(TestData{"A", 1}, 1)
q.Enqueue(TestData{"A", 2}, 2)
q.Enqueue(TestData{"A", 3}, 3)

s.Require().Equal(1, q.Len(), "Queue should only contain one item for key 'A'")

item := q.Dequeue()
s.Require().NotNil(item)
s.Require().Equal(TestData{"A", 3}, item.Value, "Should return the latest version of item 'A'")
s.Require().Equal(int64(3), item.Priority, "Should have the priority of the latest enqueue")

s.Require().Nil(q.Dequeue(), "Queue should be empty after dequeuing the single item")
}

func (s *HashedPriorityQueueSuite) TestPeekReturnsLatestVersion() {
q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData])

q.Enqueue(TestData{"A", 1}, 1)
q.Enqueue(TestData{"B", 1}, 3)
q.Enqueue(TestData{"A", 2}, 2)

item := q.Peek()
s.Require().NotNil(item)
s.Require().Equal(TestData{"B", 1}, item.Value, "Peek should return 'B' as it has the highest priority")
s.Require().Equal(int64(3), item.Priority)

q.Enqueue(TestData{"B", 2}, 1) // Lower priority, but newer version

item = q.Peek()
s.Require().NotNil(item)
s.Require().Equal(TestData{"A", 2}, item.Value, "Peek should now return 'A' as 'B' has lower priority")
s.Require().Equal(int64(2), item.Priority)
}

func (s *HashedPriorityQueueSuite) TestDequeueWhereReturnsLatestVersion() {
q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData])

q.Enqueue(TestData{"A", 1}, 1)
q.Enqueue(TestData{"B", 1}, 2)
q.Enqueue(TestData{"A", 2}, 3)

item := q.DequeueWhere(func(td TestData) bool {
return td.id == "A"
})

s.Require().NotNil(item)
s.Require().Equal(TestData{"A", 2}, item.Value, "DequeueWhere should return the latest version of 'A'")
s.Require().Equal(int64(3), item.Priority)

s.Require().False(q.Contains("A"), "A should no longer be in the queue")
s.Require().True(q.Contains("B"), "B should still be in the queue")
}

func (s *HashedPriorityQueueSuite) TestDuplicateKeys() {
inputs := []struct {
v TestData
p int64
}{
{TestData{"A", 1}, 3},
{TestData{"B", 2}, 2},
{TestData{"A", 3}, 1}, // Duplicate key with lower priority
{TestData{"C", 4}, 4},
{TestData{"B", 5}, 5}, // Duplicate key with higher priority
}

pq := s.NewQueue()
for _, tc := range inputs {
pq.Enqueue(tc.v, tc.p)
}

expected := []struct {
v TestData
p int64
}{
{TestData{"B", 5}, 5},
{TestData{"C", 4}, 4},
{TestData{"A", 3}, 1},
}

for _, exp := range expected {
qitem := pq.Dequeue()
s.Require().NotNil(qitem)
s.Require().Equal(exp.v, qitem.Value)
s.Require().Equal(exp.p, qitem.Priority)
}

s.Require().True(pq.IsEmpty())
}
20 changes: 20 additions & 0 deletions pkg/lib/collections/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type PriorityQueueInterface[T any] interface {
// extra PriorityQueue) for the dequeued items.
DequeueWhere(matcher MatchingFunction[T]) *QueueItem[T]

// Peek returns the next highest priority item without removing it from the queue.
// It returns nil if the queue is empty.
Peek() *QueueItem[T]

// Len returns the number of items currently in the queue
Len() int

Expand Down Expand Up @@ -117,6 +121,22 @@ func (pq *PriorityQueue[T]) dequeue() *QueueItem[T] {
return &QueueItem[T]{Value: item, Priority: heapItem.priority}
}

// Peek returns the next highest priority item without removing it from the queue.
// It returns nil if the queue is empty.
func (pq *PriorityQueue[T]) Peek() *QueueItem[T] {
pq.mu.Lock()
defer pq.mu.Unlock()

if pq.IsEmpty() {
return nil
}

heapItem := pq.internalQueue[0]
item, _ := heapItem.value.(T)

return &QueueItem[T]{Value: item, Priority: heapItem.priority}
}

// DequeueWhere allows the caller to iterate through the queue, in priority order, and
// attempt to match an item using the provided `MatchingFunction`. This method has a high
// time cost as dequeued but non-matching items must be held and requeued once the process
Expand Down
Loading

0 comments on commit 285e549

Please sign in to comment.