From 10dc734f3ca86b1689c8f3311923267a95572a12 Mon Sep 17 00:00:00 2001 From: Arsene Date: Tue, 24 Sep 2024 18:48:34 +0100 Subject: [PATCH] refactor: move stash back to mailbox (#469) --- actors/pid.go | 5 ++--- actors/pid_option.go | 3 +-- actors/stash.go | 12 ++++++------ 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/actors/pid.go b/actors/pid.go index cea58bae..77b88b55 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -46,7 +46,6 @@ import ( "github.com/tochemey/goakt/v2/address" "github.com/tochemey/goakt/v2/future" "github.com/tochemey/goakt/v2/goaktpb" - "github.com/tochemey/goakt/v2/internal/collection" "github.com/tochemey/goakt/v2/internal/errorschain" "github.com/tochemey/goakt/v2/internal/eventstream" "github.com/tochemey/goakt/v2/internal/http" @@ -161,7 +160,7 @@ type PID struct { behaviorStack *behaviorStack // stash settings - stashBuffer *collection.Queue + stashBuffer *mailbox stashLocker *sync.Mutex // define an events stream @@ -513,7 +512,7 @@ func (pid *PID) StashSize() uint64 { if pid.stashBuffer == nil { return 0 } - return uint64(pid.stashBuffer.Length()) + return uint64(pid.stashBuffer.Len()) } // PipeTo processes a long-running task and pipes the result to the provided actor. diff --git a/actors/pid_option.go b/actors/pid_option.go index 2b6731a1..1993806d 100644 --- a/actors/pid_option.go +++ b/actors/pid_option.go @@ -27,7 +27,6 @@ package actors import ( "time" - "github.com/tochemey/goakt/v2/internal/collection" "github.com/tochemey/goakt/v2/internal/eventstream" "github.com/tochemey/goakt/v2/log" "github.com/tochemey/goakt/v2/telemetry" @@ -104,7 +103,7 @@ func withTelemetry(telemetry *telemetry.Telemetry) pidOption { // withStash sets the actor's stash buffer func withStash() pidOption { return func(pid *PID) { - pid.stashBuffer = collection.NewQueue() + pid.stashBuffer = newMailbox() } } diff --git a/actors/stash.go b/actors/stash.go index d6160454..4228d4c7 100644 --- a/actors/stash.go +++ b/actors/stash.go @@ -31,7 +31,7 @@ func (pid *PID) stash(ctx *ReceiveContext) error { if pid.stashBuffer == nil { return ErrStashBufferNotSet } - pid.stashBuffer.Enqueue(ctx) + pid.stashBuffer.Push(ctx) return nil } @@ -41,11 +41,11 @@ func (pid *PID) unstash() error { return ErrStashBufferNotSet } - received := pid.stashBuffer.Dequeue() + received := pid.stashBuffer.Pop() if received == nil { return errors.New("stash buffer may be closed") } - pid.doReceive(received.(*ReceiveContext)) + pid.doReceive(received) return nil } @@ -59,12 +59,12 @@ func (pid *PID) unstashAll() error { pid.stashLocker.Lock() defer pid.stashLocker.Unlock() - for pid.stashBuffer.Length() > 0 { - received := pid.stashBuffer.Dequeue() + for !pid.stashBuffer.IsEmpty() { + received := pid.stashBuffer.Pop() if received == nil { return errors.New("stash buffer may be closed") } - pid.doReceive(received.(*ReceiveContext)) + pid.doReceive(received) } return nil