diff --git a/actors/receive_context_test.go b/actors/receive_context_test.go index 4e0f8d83..2f2c2d33 100644 --- a/actors/receive_context_test.go +++ b/actors/receive_context_test.go @@ -2114,4 +2114,106 @@ func TestReceiveContext(t *testing.T) { assert.NoError(t, actorSystem.Stop(ctx)) }) }) + t.Run("With Stash when stash not set", func(t *testing.T) { + ctx := context.TODO() + // create an actor + opts := []pidOption{ + withInitMaxRetries(1), + withCustomLogger(log.DiscardLogger), + } + + ports := dynaport.Get(1) + // create the actor path + actor := &stasher{} + actorPath := address.New("stasher", "sys", "host", ports[0]) + pid, err := newPID(ctx, actorPath, actor, opts...) + require.NoError(t, err) + require.NotNil(t, pid) + + // wait for the actor to properly start + lib.Pause(5 * time.Millisecond) + + // create a receive context + receiveContext := &ReceiveContext{ + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + self: pid, + } + + receiveContext.Stash() + err = receiveContext.getError() + require.Error(t, err) + assert.EqualError(t, err, ErrStashBufferNotSet.Error()) + err = pid.Shutdown(ctx) + assert.NoError(t, err) + }) + t.Run("With Unstash when stash not set", func(t *testing.T) { + ctx := context.TODO() + // create an actor + opts := []pidOption{ + withInitMaxRetries(1), + withCustomLogger(log.DiscardLogger), + } + + ports := dynaport.Get(1) + // create the actor path + actor := &stasher{} + actorPath := address.New("stasher", "sys", "host", ports[0]) + pid, err := newPID(ctx, actorPath, actor, opts...) + require.NoError(t, err) + require.NotNil(t, pid) + + // wait for the actor to properly start + lib.Pause(5 * time.Millisecond) + + // create a receive context + receiveContext := &ReceiveContext{ + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + self: pid, + } + + receiveContext.Unstash() + err = receiveContext.getError() + require.Error(t, err) + assert.EqualError(t, err, ErrStashBufferNotSet.Error()) + err = pid.Shutdown(ctx) + assert.NoError(t, err) + }) + t.Run("With UnstashAll when stash not set", func(t *testing.T) { + ctx := context.TODO() + // create an actor + opts := []pidOption{ + withInitMaxRetries(1), + withCustomLogger(log.DiscardLogger), + } + + ports := dynaport.Get(1) + // create the actor path + actor := &stasher{} + actorPath := address.New("stasher", "sys", "host", ports[0]) + pid, err := newPID(ctx, actorPath, actor, opts...) + require.NoError(t, err) + require.NotNil(t, pid) + + // wait for the actor to properly start + lib.Pause(5 * time.Millisecond) + + // create a receive context + receiveContext := &ReceiveContext{ + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + self: pid, + } + + receiveContext.UnstashAll() + err = receiveContext.getError() + require.Error(t, err) + assert.EqualError(t, err, ErrStashBufferNotSet.Error()) + err = pid.Shutdown(ctx) + assert.NoError(t, err) + }) } diff --git a/actors/stash_test.go b/actors/stash_test.go index 4da6f897..61acd98f 100644 --- a/actors/stash_test.go +++ b/actors/stash_test.go @@ -112,7 +112,7 @@ func TestStash(t *testing.T) { }) t.Run("With stash failure", func(t *testing.T) { ctx := context.TODO() - // create a Ping actor + // create an actor opts := []pidOption{ withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), @@ -136,7 +136,7 @@ func TestStash(t *testing.T) { err = pid.Shutdown(ctx) assert.NoError(t, err) }) - t.Run("With unstash failure", func(t *testing.T) { + t.Run("With unstash when stash not set", func(t *testing.T) { ctx := context.TODO() // create a Ping actor opts := []pidOption{ @@ -162,6 +162,76 @@ func TestStash(t *testing.T) { assert.Error(t, err) assert.EqualError(t, err, ErrStashBufferNotSet.Error()) + err = pid.Shutdown(ctx) + assert.NoError(t, err) + }) + t.Run("With unstash when there is no stashed message", func(t *testing.T) { + ctx := context.TODO() + // create a Ping actor + opts := []pidOption{ + withInitMaxRetries(1), + withCustomLogger(log.DiscardLogger), + withStash(), + } + + ports := dynaport.Get(1) + + // create the actor path + actor := &stasher{} + actorPath := address.New("stasher", "sys", "host", ports[0]) + pid, err := newPID(ctx, actorPath, actor, opts...) + require.NoError(t, err) + require.NotNil(t, pid) + + // wait for the actor to properly start + lib.Pause(time.Second) + + // send a stash message to the actor + err = Tell(ctx, pid, new(testpb.TestStash)) + require.NoError(t, err) + + // add some pause here due to async calls + lib.Pause(time.Second) + require.EqualValues(t, 1, pid.StashSize()) + + // at this stage any message sent to the actor is stashed + for i := 0; i < 5; i++ { + assert.NoError(t, Tell(ctx, pid, new(testpb.TestSend))) + } + + // add some pause here due to async calls + lib.Pause(time.Second) + + // when we assert the actor received count it will only show 1 + require.EqualValues(t, 1, pid.StashSize()) + + // send another stash + require.NoError(t, Tell(ctx, pid, new(testpb.TestLogin))) + // add some pause here due to async calls + lib.Pause(time.Second) + require.EqualValues(t, 2, pid.StashSize()) + + // add some pause here due to async calls + lib.Pause(time.Second) + assert.NoError(t, Tell(ctx, pid, new(testpb.TestUnstash))) + + // add some pause here due to async calls + lib.Pause(time.Second) + require.EqualValues(t, 1, pid.StashSize()) + + // add some pause here due to async calls + lib.Pause(time.Second) + assert.NoError(t, Tell(ctx, pid, new(testpb.TestUnstashAll))) + + // add some pause here due to async calls + lib.Pause(time.Second) + + require.Zero(t, pid.StashSize()) + + err = pid.unstash() + assert.Error(t, err) + assert.EqualError(t, err, "stash buffer may be closed") + err = pid.Shutdown(ctx) assert.NoError(t, err) })