Skip to content

Commit

Permalink
refactor: move stash back to mailbox (#469)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Sep 24, 2024
1 parent fb3d3a4 commit 10dc734
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 11 deletions.
5 changes: 2 additions & 3 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/tochemey/goakt/v2/address"
"github.com/tochemey/goakt/v2/future"
"github.com/tochemey/goakt/v2/goaktpb"
"github.com/tochemey/goakt/v2/internal/collection"
"github.com/tochemey/goakt/v2/internal/errorschain"
"github.com/tochemey/goakt/v2/internal/eventstream"
"github.com/tochemey/goakt/v2/internal/http"
Expand Down Expand Up @@ -161,7 +160,7 @@ type PID struct {
behaviorStack *behaviorStack

// stash settings
stashBuffer *collection.Queue
stashBuffer *mailbox
stashLocker *sync.Mutex

// define an events stream
Expand Down Expand Up @@ -513,7 +512,7 @@ func (pid *PID) StashSize() uint64 {
if pid.stashBuffer == nil {
return 0
}
return uint64(pid.stashBuffer.Length())
return uint64(pid.stashBuffer.Len())
}

// PipeTo processes a long-running task and pipes the result to the provided actor.
Expand Down
3 changes: 1 addition & 2 deletions actors/pid_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package actors
import (
"time"

"github.com/tochemey/goakt/v2/internal/collection"
"github.com/tochemey/goakt/v2/internal/eventstream"
"github.com/tochemey/goakt/v2/log"
"github.com/tochemey/goakt/v2/telemetry"
Expand Down Expand Up @@ -104,7 +103,7 @@ func withTelemetry(telemetry *telemetry.Telemetry) pidOption {
// withStash sets the actor's stash buffer
func withStash() pidOption {
return func(pid *PID) {
pid.stashBuffer = collection.NewQueue()
pid.stashBuffer = newMailbox()
}
}

Expand Down
12 changes: 6 additions & 6 deletions actors/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (pid *PID) stash(ctx *ReceiveContext) error {
if pid.stashBuffer == nil {
return ErrStashBufferNotSet
}
pid.stashBuffer.Enqueue(ctx)
pid.stashBuffer.Push(ctx)
return nil
}

Expand All @@ -41,11 +41,11 @@ func (pid *PID) unstash() error {
return ErrStashBufferNotSet
}

received := pid.stashBuffer.Dequeue()
received := pid.stashBuffer.Pop()
if received == nil {
return errors.New("stash buffer may be closed")
}
pid.doReceive(received.(*ReceiveContext))
pid.doReceive(received)
return nil
}

Expand All @@ -59,12 +59,12 @@ func (pid *PID) unstashAll() error {
pid.stashLocker.Lock()
defer pid.stashLocker.Unlock()

for pid.stashBuffer.Length() > 0 {
received := pid.stashBuffer.Dequeue()
for !pid.stashBuffer.IsEmpty() {
received := pid.stashBuffer.Pop()
if received == nil {
return errors.New("stash buffer may be closed")
}
pid.doReceive(received.(*ReceiveContext))
pid.doReceive(received)
}

return nil
Expand Down

0 comments on commit 10dc734

Please sign in to comment.