Skip to content

Commit

Permalink
refactor: move pause into small lib
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Sep 14, 2024
1 parent b81888a commit f596f8b
Show file tree
Hide file tree
Showing 18 changed files with 387 additions and 338 deletions.
83 changes: 42 additions & 41 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (

"github.com/tochemey/goakt/v2/address"
"github.com/tochemey/goakt/v2/goaktpb"
"github.com/tochemey/goakt/v2/internal/lib"
"github.com/tochemey/goakt/v2/internal/types"
"github.com/tochemey/goakt/v2/log"
clustermocks "github.com/tochemey/goakt/v2/mocks/cluster"
Expand Down Expand Up @@ -101,7 +102,7 @@ func TestActorSystem(t *testing.T) {
assert.NotNil(t, actorRef)

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
err = sys.Stop(ctx)
Expand All @@ -123,7 +124,7 @@ func TestActorSystem(t *testing.T) {
assert.Nil(t, actorRef)

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
err = sys.Stop(ctx)
Expand Down Expand Up @@ -151,7 +152,7 @@ func TestActorSystem(t *testing.T) {
assert.True(t, ref1 == ref2)

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
err = sys.Stop(ctx)
Expand Down Expand Up @@ -196,7 +197,7 @@ func TestActorSystem(t *testing.T) {
require.NoError(t, err)

// wait for the cluster to start
pause(time.Second)
lib.Pause(time.Second)

// create an actor
actorName := uuid.NewString()
Expand All @@ -207,7 +208,7 @@ func TestActorSystem(t *testing.T) {

// wait for a while for replication to take effect
// otherwise the subsequent test will return actor not found
pause(time.Second)
lib.Pause(time.Second)

// get the actor
addr, _, err := newActorSystem.ActorOf(ctx, actorName)
Expand Down Expand Up @@ -242,7 +243,7 @@ func TestActorSystem(t *testing.T) {
require.Nil(t, remoteAddr)

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
err = newActorSystem.Stop(ctx)
Expand Down Expand Up @@ -270,7 +271,7 @@ func TestActorSystem(t *testing.T) {
require.NoError(t, err)

// wait for the cluster to fully start
pause(time.Second)
lib.Pause(time.Second)

// create an actor
actorName := uuid.NewString()
Expand Down Expand Up @@ -306,7 +307,7 @@ func TestActorSystem(t *testing.T) {
require.NotNil(t, addr)

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
err = sys.Stop(ctx)
Expand All @@ -329,7 +330,7 @@ func TestActorSystem(t *testing.T) {
require.Nil(t, addr)

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
err = sys.Stop(ctx)
Expand Down Expand Up @@ -387,14 +388,14 @@ func TestActorSystem(t *testing.T) {
require.True(t, actorRef.IsRunning())

// wait for a while for the system to stop
pause(time.Second)
lib.Pause(time.Second)
// restart the actor
_, err = sys.ReSpawn(ctx, actorName)
require.NoError(t, err)

// wait for the actor to complete start
// TODO we can add a callback for complete start
pause(time.Second)
lib.Pause(time.Second)
require.True(t, actorRef.IsRunning())

var items []*goaktpb.ActorRestarted
Expand Down Expand Up @@ -437,7 +438,7 @@ func TestActorSystem(t *testing.T) {
require.True(t, actorRef.IsRunning())

// wait for a while for the system to stop
pause(time.Second)
lib.Pause(time.Second)
// restart the actor
pid, err := sys.ReSpawn(ctx, actorName)
require.Error(t, err)
Expand Down Expand Up @@ -471,13 +472,13 @@ func TestActorSystem(t *testing.T) {
require.True(t, proto.Equal(expected, reply))
require.True(t, actorRef.IsRunning())
// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

err = sys.Kill(ctx, actorName)
require.NoError(t, err)

// wait for a while for the system to stop
pause(time.Second)
lib.Pause(time.Second)
// restart the actor
_, err = sys.ReSpawn(ctx, actorName)
require.Error(t, err)
Expand Down Expand Up @@ -526,15 +527,15 @@ func TestActorSystem(t *testing.T) {
require.True(t, proto.Equal(expected, reply))
require.True(t, actorRef.IsRunning())
// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

// restart the actor
_, err = newActorSystem.ReSpawn(ctx, actorName)
require.NoError(t, err)

// wait for the actor to complete start
// TODO we can add a callback for complete start
pause(time.Second)
lib.Pause(time.Second)
require.True(t, actorRef.IsRunning())

t.Cleanup(func() {
Expand All @@ -556,7 +557,7 @@ func TestActorSystem(t *testing.T) {
assert.NotNil(t, actorRef)

// wait for the start of the actor to be complete
pause(time.Second)
lib.Pause(time.Second)

assert.EqualValues(t, 1, sys.NumActors())

Expand Down Expand Up @@ -584,7 +585,7 @@ func TestActorSystem(t *testing.T) {
err = newActorSystem.Start(ctx)
require.NoError(t, err)

pause(time.Second)
lib.Pause(time.Second)

actorName := "some-actor"
addr, err := RemoteLookup(ctx, host, remotingPort, actorName)
Expand All @@ -603,7 +604,7 @@ func TestActorSystem(t *testing.T) {
require.Nil(t, reply)

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
err = newActorSystem.Stop(ctx)
Expand Down Expand Up @@ -677,7 +678,7 @@ func TestActorSystem(t *testing.T) {
require.Nil(t, remoteAddr)

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
err = newActorSystem.Stop(ctx)
Expand Down Expand Up @@ -706,7 +707,7 @@ func TestActorSystem(t *testing.T) {
require.Equal(t, ref.Address().String(), local.Address().String())

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
err = sys.Stop(ctx)
Expand All @@ -728,7 +729,7 @@ func TestActorSystem(t *testing.T) {
require.EqualError(t, err, ErrActorNotFound("some-name").Error())

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
err = sys.Stop(ctx)
Expand Down Expand Up @@ -780,7 +781,7 @@ func TestActorSystem(t *testing.T) {
assert.NoError(t, err)

// wait for the system to properly start
pause(time.Second)
lib.Pause(time.Second)

actorName := "HousekeeperActor"
actorHandler := newTestActor()
Expand All @@ -789,7 +790,7 @@ func TestActorSystem(t *testing.T) {
require.NotNil(t, actorRef)

// wait for the actor to properly start
pause(time.Second)
lib.Pause(time.Second)

// locate the actor
ref, err := sys.LocalActor(actorName)
Expand All @@ -798,7 +799,7 @@ func TestActorSystem(t *testing.T) {
require.EqualError(t, err, ErrActorNotFound(actorName).Error())

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
err = sys.Stop(ctx)
Expand All @@ -816,7 +817,7 @@ func TestActorSystem(t *testing.T) {
assert.NoError(t, err)

// wait for the system to properly start
pause(time.Second)
lib.Pause(time.Second)

partition := sys.GetPartition("some-actor")
assert.Zero(t, partition)
Expand All @@ -840,7 +841,7 @@ func TestActorSystem(t *testing.T) {
assert.NotNil(t, actorRef)

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
assert.Error(t, sys.Stop(ctx))
Expand All @@ -855,7 +856,7 @@ func TestActorSystem(t *testing.T) {
assert.NoError(t, err)

// wait for complete start
pause(time.Second)
lib.Pause(time.Second)

// create a deadletter subscriber
consumer, err := sys.Subscribe()
Expand All @@ -869,14 +870,14 @@ func TestActorSystem(t *testing.T) {
assert.NotNil(t, actorRef)

// wait a while
pause(time.Second)
lib.Pause(time.Second)

// every message sent to the actor will result in deadletters
for i := 0; i < 5; i++ {
require.NoError(t, Tell(ctx, actorRef, new(testpb.TestSend)))
}

pause(time.Second)
lib.Pause(time.Second)

var items []*goaktpb.Deadletter
for message := range consumer.Iterator() {
Expand Down Expand Up @@ -922,7 +923,7 @@ func TestActorSystem(t *testing.T) {
// stop the actor system
assert.NoError(t, sys.Stop(ctx))

pause(time.Second)
lib.Pause(time.Second)

// create a deadletter subscriber
err = sys.Unsubscribe(consumer)
Expand Down Expand Up @@ -966,7 +967,7 @@ func TestActorSystem(t *testing.T) {
require.NoError(t, err)

// wait for the cluster to start
pause(time.Second)
lib.Pause(time.Second)

// create an actor
actorName := uuid.NewString()
Expand All @@ -977,7 +978,7 @@ func TestActorSystem(t *testing.T) {

// wait for a while for replication to take effect
// otherwise the subsequent test will return actor not found
pause(time.Second)
lib.Pause(time.Second)

// get the actor
addr, pid, err := newActorSystem.ActorOf(ctx, actorName)
Expand All @@ -993,7 +994,7 @@ func TestActorSystem(t *testing.T) {
require.Nil(t, remoteAddr)

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
err = newActorSystem.Stop(ctx)
Expand Down Expand Up @@ -1057,7 +1058,7 @@ func TestActorSystem(t *testing.T) {
assert.ElementsMatch(t, expected, actual)

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

t.Cleanup(func() {
err = sys.Stop(ctx)
Expand Down Expand Up @@ -1091,7 +1092,7 @@ func TestActorSystem(t *testing.T) {
require.NotNil(t, subscriber2)

// wait for some time
pause(time.Second)
lib.Pause(time.Second)

// capture the joins
var joins []*goaktpb.NodeJoined
Expand All @@ -1110,15 +1111,15 @@ func TestActorSystem(t *testing.T) {
require.Equal(t, peerAddress2, joins[0].GetAddress())

// wait for some time
pause(time.Second)
lib.Pause(time.Second)

// stop the node
require.NoError(t, cl1.Unsubscribe(subscriber1))
assert.NoError(t, cl1.Stop(ctx))
assert.NoError(t, sd1.Close())

// wait for some time
pause(time.Second)
lib.Pause(time.Second)

var lefts []*goaktpb.NodeLeft
for event := range subscriber2.Iterator() {
Expand Down Expand Up @@ -1203,7 +1204,7 @@ func TestActorSystem(t *testing.T) {
assert.NotNil(t, actorRef)

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

// send a message to the actor
require.NoError(t, Tell(ctx, actorRef, &testpb.Reply{Content: "test spawn from func"}))
Expand Down Expand Up @@ -1264,7 +1265,7 @@ func TestActorSystem(t *testing.T) {
assert.NotNil(t, actorRef)

// stop the actor after some time
pause(time.Second)
lib.Pause(time.Second)

// send a message to the actor
require.NoError(t, Tell(ctx, actorRef, &testpb.Reply{Content: "test spawn from func"}))
Expand Down Expand Up @@ -1449,7 +1450,7 @@ func TestActorSystem(t *testing.T) {
require.NoError(t, err)

// wait for the cluster to start
pause(time.Second)
lib.Pause(time.Second)

// create an actor
actorName := "actorID"
Expand Down
Loading

0 comments on commit f596f8b

Please sign in to comment.