diff --git a/internal/mode/static/handler.go b/internal/mode/static/handler.go index afb72a27aa..f37fbcdfbe 100644 --- a/internal/mode/static/handler.go +++ b/internal/mode/static/handler.go @@ -182,17 +182,40 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log // TODO(sberman): if nginx Deployment is scaled down, we should remove the pod from the ConnectionsTracker // and Deployment. - // If fully deleted, then delete the deployment from the Store - var configApplied bool - deployment := h.cfg.nginxDeployments.GetOrStore(deploymentName, broadcast.NewDeploymentBroadcaster(ctx)) + // If fully deleted, then delete the deployment from the Store and close the stopCh. + stopCh := make(chan struct{}) + deployment := h.cfg.nginxDeployments.GetOrStore(deploymentName, broadcast.NewDeploymentBroadcaster(stopCh)) if deployment == nil { panic("expected deployment, got nil") } + configApplied := h.processStateAndBuildConfig(ctx, logger, gr, changeType, deployment) + + configErr := deployment.GetLatestConfigError() + upstreamErr := deployment.GetLatestUpstreamError() + err := errors.Join(configErr, upstreamErr) + + if configApplied || err != nil { + obj := &status.QueueObject{ + Error: err, + Deployment: deploymentName, + } + h.cfg.statusQueue.Enqueue(obj) + } +} + +func (h *eventHandlerImpl) processStateAndBuildConfig( + ctx context.Context, + logger logr.Logger, + gr *graph.Graph, + changeType state.ChangeType, + deployment *agent.Deployment, +) bool { + var configApplied bool switch changeType { case state.NoChange: logger.Info("Handling events didn't result into NGINX configuration changes") - return + return false case state.EndpointsOnlyChange: h.version++ cfg := dataplane.BuildConfiguration(ctx, gr, h.cfg.serviceResolver, h.version) @@ -227,17 +250,7 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log deployment.Lock.Unlock() } - configErr := deployment.GetLatestConfigError() - upstreamErr := deployment.GetLatestUpstreamError() - err := errors.Join(configErr, upstreamErr) - - if configApplied || err != nil { - obj := &status.QueueObject{ - Error: err, - Deployment: deploymentName, - } - h.cfg.statusQueue.Enqueue(obj) - } + return configApplied } func (h *eventHandlerImpl) waitForStatusUpdates(ctx context.Context) { diff --git a/internal/mode/static/nginx/agent/agent.go b/internal/mode/static/nginx/agent/agent.go index 83aef4f99d..107daba49b 100644 --- a/internal/mode/static/nginx/agent/agent.go +++ b/internal/mode/static/nginx/agent/agent.go @@ -87,16 +87,10 @@ func (n *NginxUpdaterImpl) UpdateConfig( ) bool { n.logger.Info("Sending nginx configuration to agent") - // reset the latest error to nil now that we're applying new config - deployment.SetLatestConfigError(nil) - msg := deployment.SetFiles(files) applied := deployment.GetBroadcaster().Send(msg) - latestStatus := deployment.GetConfigurationStatus() - if latestStatus != nil { - deployment.SetLatestConfigError(latestStatus) - } + deployment.SetLatestConfigError(deployment.GetConfigurationStatus()) return applied } @@ -131,18 +125,6 @@ func (n *NginxUpdaterImpl) UpdateUpstreamServers( }, } actions = append(actions, action) - - msg := broadcast.NginxAgentMessage{ - Type: broadcast.APIRequest, - NGINXPlusAction: action, - } - - requestApplied, err := n.sendRequest(broadcaster, msg, deployment) - if err != nil { - errs = append(errs, fmt.Errorf( - "couldn't update upstream %q via the API: %w", upstream.Name, deployment.GetConfigurationStatus())) - } - applied = applied || requestApplied } for _, upstream := range conf.StreamUpstreams { @@ -152,7 +134,9 @@ func (n *NginxUpdaterImpl) UpdateUpstreamServers( }, } actions = append(actions, action) + } + for _, action := range actions { msg := broadcast.NginxAgentMessage{ Type: broadcast.APIRequest, NGINXPlusAction: action, @@ -161,7 +145,7 @@ func (n *NginxUpdaterImpl) UpdateUpstreamServers( requestApplied, err := n.sendRequest(broadcaster, msg, deployment) if err != nil { errs = append(errs, fmt.Errorf( - "couldn't update upstream %q via the API: %w", upstream.Name, deployment.GetConfigurationStatus())) + "couldn't update upstream via the API: %w", deployment.GetConfigurationStatus())) } applied = applied || requestApplied } diff --git a/internal/mode/static/nginx/agent/agent_test.go b/internal/mode/static/nginx/agent/agent_test.go index bd318953bb..494d6f68ea 100644 --- a/internal/mode/static/nginx/agent/agent_test.go +++ b/internal/mode/static/nginx/agent/agent_test.go @@ -227,8 +227,8 @@ func TestUpdateUpstreamServers(t *testing.T) { if test.expErr { expErr := errors.Join( - fmt.Errorf("couldn't update upstream \"test-upstream\" via the API: %w", testErr), - fmt.Errorf("couldn't update upstream \"test-stream-upstream\" via the API: %w", testErr), + fmt.Errorf("couldn't update upstream via the API: %w", testErr), + fmt.Errorf("couldn't update upstream via the API: %w", testErr), ) g.Expect(deployment.GetLatestUpstreamError()).To(Equal(expErr)) diff --git a/internal/mode/static/nginx/agent/broadcast/broadcast.go b/internal/mode/static/nginx/agent/broadcast/broadcast.go index 2543399555..972e56963f 100644 --- a/internal/mode/static/nginx/agent/broadcast/broadcast.go +++ b/internal/mode/static/nginx/agent/broadcast/broadcast.go @@ -1,7 +1,6 @@ package broadcast import ( - "context" "sync" pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" @@ -49,7 +48,7 @@ type DeploymentBroadcaster struct { } // NewDeploymentBroadcaster returns a new instance of a DeploymentBroadcaster. -func NewDeploymentBroadcaster(ctx context.Context) *DeploymentBroadcaster { +func NewDeploymentBroadcaster(stopCh chan struct{}) *DeploymentBroadcaster { broadcaster := &DeploymentBroadcaster{ listeners: make(map[string]storedChannels), publishCh: make(chan NginxAgentMessage), @@ -57,7 +56,7 @@ func NewDeploymentBroadcaster(ctx context.Context) *DeploymentBroadcaster { unsubCh: make(chan string), doneCh: make(chan struct{}), } - go broadcaster.run(ctx) + go broadcaster.run(stopCh) return broadcaster } @@ -99,14 +98,14 @@ func (b *DeploymentBroadcaster) CancelSubscription(id string) { } // run starts the broadcaster loop. It handles the following events: -// - if context is canceled, return. +// - if stopCh is closed, return. // - if receiving a new subscriber, add it to the subscriber list. // - if receiving a canceled subscription, remove it from the subscriber list. // - if receiving a message to publish, send it to all subscribers. -func (b *DeploymentBroadcaster) run(ctx context.Context) { +func (b *DeploymentBroadcaster) run(stopCh chan struct{}) { for { select { - case <-ctx.Done(): + case <-stopCh: return case channels := <-b.subCh: b.listeners[channels.id] = channels diff --git a/internal/mode/static/nginx/agent/broadcast/broadcast_test.go b/internal/mode/static/nginx/agent/broadcast/broadcast_test.go index d27d723547..5f69360bb4 100644 --- a/internal/mode/static/nginx/agent/broadcast/broadcast_test.go +++ b/internal/mode/static/nginx/agent/broadcast/broadcast_test.go @@ -1,7 +1,6 @@ package broadcast_test import ( - "context" "testing" . "github.com/onsi/gomega" @@ -13,10 +12,10 @@ func TestSubscribe(t *testing.T) { t.Parallel() g := NewWithT(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + stopCh := make(chan struct{}) + defer close(stopCh) - broadcaster := broadcast.NewDeploymentBroadcaster(ctx) + broadcaster := broadcast.NewDeploymentBroadcaster(stopCh) subscriber := broadcaster.Subscribe() g.Expect(subscriber.ID).NotTo(BeEmpty()) @@ -38,10 +37,10 @@ func TestSubscribe_MultipleListeners(t *testing.T) { t.Parallel() g := NewWithT(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + stopCh := make(chan struct{}) + defer close(stopCh) - broadcaster := broadcast.NewDeploymentBroadcaster(ctx) + broadcaster := broadcast.NewDeploymentBroadcaster(stopCh) subscriber1 := broadcaster.Subscribe() subscriber2 := broadcaster.Subscribe() @@ -67,10 +66,10 @@ func TestSubscribe_NoListeners(t *testing.T) { t.Parallel() g := NewWithT(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + stopCh := make(chan struct{}) + defer close(stopCh) - broadcaster := broadcast.NewDeploymentBroadcaster(ctx) + broadcaster := broadcast.NewDeploymentBroadcaster(stopCh) message := broadcast.NginxAgentMessage{ ConfigVersion: "v1", @@ -85,10 +84,10 @@ func TestCancelSubscription(t *testing.T) { t.Parallel() g := NewWithT(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + stopCh := make(chan struct{}) + defer close(stopCh) - broadcaster := broadcast.NewDeploymentBroadcaster(ctx) + broadcaster := broadcast.NewDeploymentBroadcaster(stopCh) subscriber := broadcaster.Subscribe() diff --git a/internal/mode/static/nginx/agent/command.go b/internal/mode/static/nginx/agent/command.go index c196c177b8..796583e0e0 100644 --- a/internal/mode/static/nginx/agent/command.go +++ b/internal/mode/static/nginx/agent/command.go @@ -93,6 +93,7 @@ func (cs *commandService) CreateConnection( Error: err.Error(), }, } + cs.logger.Error(err, "error getting pod owner") return response, grpcStatus.Errorf(codes.Internal, "error getting pod owner %s", err.Error()) } @@ -145,12 +146,12 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error return err } + deployment.Lock.RUnlock() // subscribe to the deployment broadcaster to get file updates broadcaster := deployment.GetBroadcaster() channels := broadcaster.Subscribe() defer broadcaster.CancelSubscription(channels.ID) - deployment.Lock.RUnlock() for { select { @@ -175,7 +176,10 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error return grpcStatus.Error(codes.Internal, err.Error()) } case err = <-msgr.Errors(): - cs.logger.Error(err, "connection error") + cs.logger.Error(err, "connection error", "pod", conn.PodName) + deployment.SetPodErrorStatus(conn.PodName, err) + channels.ResponseCh <- struct{}{} + if errors.Is(err, io.EOF) { return grpcStatus.Error(codes.Aborted, err.Error()) } @@ -214,7 +218,7 @@ func (cs *commandService) waitForConnection( case <-timer.C: return nil, nil, err case <-ticker.C: - if conn, ok := cs.connTracker.ConnectionIsReady(gi.IPAddress); ok { + if conn, ok := cs.connTracker.Ready(gi.IPAddress); ok { // connection has been established, now ensure that the deployment exists in the store if deployment := cs.nginxDeployments.Get(conn.Parent); deployment != nil { return &conn, deployment, nil @@ -332,7 +336,7 @@ func (cs *commandService) logAndSendErrorStatus(deployment *Deployment, conn *ag if err != nil { cs.logger.Error(err, "error sending request to agent") } else { - cs.logger.Info(fmt.Sprintf("Successfully configured nginx for new subscription %q", conn.PodName)) + cs.logger.Info("Successfully configured nginx for new subscription", "pod", conn.PodName) } deployment.SetPodErrorStatus(conn.PodName, err) diff --git a/internal/mode/static/nginx/agent/command_test.go b/internal/mode/static/nginx/agent/command_test.go index 897650a26d..e1f4cb59ad 100644 --- a/internal/mode/static/nginx/agent/command_test.go +++ b/internal/mode/static/nginx/agent/command_test.go @@ -295,7 +295,7 @@ func TestSubscribe(t *testing.T) { PodName: "nginx-pod", InstanceID: "nginx-id", } - connTracker.ConnectionIsReadyReturns(conn, true) + connTracker.ReadyReturns(conn, true) cs := newCommandService( logr.Discard(), @@ -438,7 +438,7 @@ func TestSubscribe_Errors(t *testing.T) { cs *commandService, ct *agentgrpcfakes.FakeConnectionsTracker, ) { - ct.ConnectionIsReadyReturns(agentgrpc.Connection{}, true) + ct.ReadyReturns(agentgrpc.Connection{}, true) cs.connectionTimeout = 1100 * time.Millisecond }, errString: "timed out waiting for nginx deployment to be added to store", diff --git a/internal/mode/static/nginx/agent/deployment.go b/internal/mode/static/nginx/agent/deployment.go index eb71abdfed..2ede9388a6 100644 --- a/internal/mode/static/nginx/agent/deployment.go +++ b/internal/mode/static/nginx/agent/deployment.go @@ -164,6 +164,7 @@ func (d *Deployment) SetNGINXPlusActions(actions []*pb.NGINXPlusAction) { } // SetPodErrorStatus sets the error status of a Pod in this Deployment if applying the config failed. +// Caller MUST lock the deployment before calling this function. func (d *Deployment) SetPodErrorStatus(pod string, err error) { d.podStatuses[pod] = err } diff --git a/internal/mode/static/nginx/agent/grpc/connections.go b/internal/mode/static/nginx/agent/grpc/connections.go index 534d3729c6..bb6c7c52ba 100644 --- a/internal/mode/static/nginx/agent/grpc/connections.go +++ b/internal/mode/static/nginx/agent/grpc/connections.go @@ -15,7 +15,7 @@ import ( type ConnectionsTracker interface { Track(key string, conn Connection) GetConnection(key string) Connection - ConnectionIsReady(key string) (Connection, bool) + Ready(key string) (Connection, bool) SetInstanceID(key, id string) UntrackConnectionsForParent(parent types.NamespacedName) } @@ -63,7 +63,7 @@ func (c *AgentConnectionsTracker) GetConnection(key string) Connection { // ConnectionIsReady returns if the connection is ready to be used. In other words, agent // has registered itself and an nginx instance with the control plane. -func (c *AgentConnectionsTracker) ConnectionIsReady(key string) (Connection, bool) { +func (c *AgentConnectionsTracker) Ready(key string) (Connection, bool) { c.lock.RLock() defer c.lock.RUnlock() diff --git a/internal/mode/static/nginx/agent/grpc/connections_test.go b/internal/mode/static/nginx/agent/grpc/connections_test.go index c13dfa8bb0..b407273532 100644 --- a/internal/mode/static/nginx/agent/grpc/connections_test.go +++ b/internal/mode/static/nginx/agent/grpc/connections_test.go @@ -29,7 +29,7 @@ func TestGetConnection(t *testing.T) { g.Expect(nonExistent).To(Equal(agentgrpc.Connection{})) } -func TestConnectionIsReady(t *testing.T) { +func TestReady(t *testing.T) { t.Parallel() g := NewWithT(t) @@ -42,7 +42,7 @@ func TestConnectionIsReady(t *testing.T) { } tracker.Track("key1", conn) - trackedConn, ready := tracker.ConnectionIsReady("key1") + trackedConn, ready := tracker.Ready("key1") g.Expect(ready).To(BeTrue()) g.Expect(trackedConn).To(Equal(conn)) } @@ -59,7 +59,7 @@ func TestConnectionIsNotReady(t *testing.T) { } tracker.Track("key1", conn) - _, ready := tracker.ConnectionIsReady("key1") + _, ready := tracker.Ready("key1") g.Expect(ready).To(BeFalse()) } @@ -74,12 +74,12 @@ func TestSetInstanceID(t *testing.T) { } tracker.Track("key1", conn) - _, ready := tracker.ConnectionIsReady("key1") + _, ready := tracker.Ready("key1") g.Expect(ready).To(BeFalse()) tracker.SetInstanceID("key1", "instance1") - trackedConn, ready := tracker.ConnectionIsReady("key1") + trackedConn, ready := tracker.Ready("key1") g.Expect(ready).To(BeTrue()) g.Expect(trackedConn.InstanceID).To(Equal("instance1")) } diff --git a/internal/mode/static/nginx/agent/grpc/grpc.go b/internal/mode/static/nginx/agent/grpc/grpc.go index c476d5855d..b7bfe29ff8 100644 --- a/internal/mode/static/nginx/agent/grpc/grpc.go +++ b/internal/mode/static/nginx/agent/grpc/grpc.go @@ -82,6 +82,7 @@ func (g *Server) Start(ctx context.Context) error { go func() { <-ctx.Done() g.logger.Info("Shutting down GRPC Server") + // Since we use a long-lived stream, GracefulStop does not terminate. Therefore we use Stop. server.Stop() }() diff --git a/internal/mode/static/nginx/agent/grpc/grpcfakes/fake_connections_tracker.go b/internal/mode/static/nginx/agent/grpc/grpcfakes/fake_connections_tracker.go index 6824df1a87..9b0ae34f75 100644 --- a/internal/mode/static/nginx/agent/grpc/grpcfakes/fake_connections_tracker.go +++ b/internal/mode/static/nginx/agent/grpc/grpcfakes/fake_connections_tracker.go @@ -9,29 +9,29 @@ import ( ) type FakeConnectionsTracker struct { - ConnectionIsReadyStub func(string) (grpc.Connection, bool) - connectionIsReadyMutex sync.RWMutex - connectionIsReadyArgsForCall []struct { + GetConnectionStub func(string) grpc.Connection + getConnectionMutex sync.RWMutex + getConnectionArgsForCall []struct { arg1 string } - connectionIsReadyReturns struct { + getConnectionReturns struct { result1 grpc.Connection - result2 bool } - connectionIsReadyReturnsOnCall map[int]struct { + getConnectionReturnsOnCall map[int]struct { result1 grpc.Connection - result2 bool } - GetConnectionStub func(string) grpc.Connection - getConnectionMutex sync.RWMutex - getConnectionArgsForCall []struct { + ReadyStub func(string) (grpc.Connection, bool) + readyMutex sync.RWMutex + readyArgsForCall []struct { arg1 string } - getConnectionReturns struct { + readyReturns struct { result1 grpc.Connection + result2 bool } - getConnectionReturnsOnCall map[int]struct { + readyReturnsOnCall map[int]struct { result1 grpc.Connection + result2 bool } SetInstanceIDStub func(string, string) setInstanceIDMutex sync.RWMutex @@ -54,70 +54,6 @@ type FakeConnectionsTracker struct { invocationsMutex sync.RWMutex } -func (fake *FakeConnectionsTracker) ConnectionIsReady(arg1 string) (grpc.Connection, bool) { - fake.connectionIsReadyMutex.Lock() - ret, specificReturn := fake.connectionIsReadyReturnsOnCall[len(fake.connectionIsReadyArgsForCall)] - fake.connectionIsReadyArgsForCall = append(fake.connectionIsReadyArgsForCall, struct { - arg1 string - }{arg1}) - stub := fake.ConnectionIsReadyStub - fakeReturns := fake.connectionIsReadyReturns - fake.recordInvocation("ConnectionIsReady", []interface{}{arg1}) - fake.connectionIsReadyMutex.Unlock() - if stub != nil { - return stub(arg1) - } - if specificReturn { - return ret.result1, ret.result2 - } - return fakeReturns.result1, fakeReturns.result2 -} - -func (fake *FakeConnectionsTracker) ConnectionIsReadyCallCount() int { - fake.connectionIsReadyMutex.RLock() - defer fake.connectionIsReadyMutex.RUnlock() - return len(fake.connectionIsReadyArgsForCall) -} - -func (fake *FakeConnectionsTracker) ConnectionIsReadyCalls(stub func(string) (grpc.Connection, bool)) { - fake.connectionIsReadyMutex.Lock() - defer fake.connectionIsReadyMutex.Unlock() - fake.ConnectionIsReadyStub = stub -} - -func (fake *FakeConnectionsTracker) ConnectionIsReadyArgsForCall(i int) string { - fake.connectionIsReadyMutex.RLock() - defer fake.connectionIsReadyMutex.RUnlock() - argsForCall := fake.connectionIsReadyArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeConnectionsTracker) ConnectionIsReadyReturns(result1 grpc.Connection, result2 bool) { - fake.connectionIsReadyMutex.Lock() - defer fake.connectionIsReadyMutex.Unlock() - fake.ConnectionIsReadyStub = nil - fake.connectionIsReadyReturns = struct { - result1 grpc.Connection - result2 bool - }{result1, result2} -} - -func (fake *FakeConnectionsTracker) ConnectionIsReadyReturnsOnCall(i int, result1 grpc.Connection, result2 bool) { - fake.connectionIsReadyMutex.Lock() - defer fake.connectionIsReadyMutex.Unlock() - fake.ConnectionIsReadyStub = nil - if fake.connectionIsReadyReturnsOnCall == nil { - fake.connectionIsReadyReturnsOnCall = make(map[int]struct { - result1 grpc.Connection - result2 bool - }) - } - fake.connectionIsReadyReturnsOnCall[i] = struct { - result1 grpc.Connection - result2 bool - }{result1, result2} -} - func (fake *FakeConnectionsTracker) GetConnection(arg1 string) grpc.Connection { fake.getConnectionMutex.Lock() ret, specificReturn := fake.getConnectionReturnsOnCall[len(fake.getConnectionArgsForCall)] @@ -179,6 +115,70 @@ func (fake *FakeConnectionsTracker) GetConnectionReturnsOnCall(i int, result1 gr }{result1} } +func (fake *FakeConnectionsTracker) Ready(arg1 string) (grpc.Connection, bool) { + fake.readyMutex.Lock() + ret, specificReturn := fake.readyReturnsOnCall[len(fake.readyArgsForCall)] + fake.readyArgsForCall = append(fake.readyArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.ReadyStub + fakeReturns := fake.readyReturns + fake.recordInvocation("Ready", []interface{}{arg1}) + fake.readyMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeConnectionsTracker) ReadyCallCount() int { + fake.readyMutex.RLock() + defer fake.readyMutex.RUnlock() + return len(fake.readyArgsForCall) +} + +func (fake *FakeConnectionsTracker) ReadyCalls(stub func(string) (grpc.Connection, bool)) { + fake.readyMutex.Lock() + defer fake.readyMutex.Unlock() + fake.ReadyStub = stub +} + +func (fake *FakeConnectionsTracker) ReadyArgsForCall(i int) string { + fake.readyMutex.RLock() + defer fake.readyMutex.RUnlock() + argsForCall := fake.readyArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeConnectionsTracker) ReadyReturns(result1 grpc.Connection, result2 bool) { + fake.readyMutex.Lock() + defer fake.readyMutex.Unlock() + fake.ReadyStub = nil + fake.readyReturns = struct { + result1 grpc.Connection + result2 bool + }{result1, result2} +} + +func (fake *FakeConnectionsTracker) ReadyReturnsOnCall(i int, result1 grpc.Connection, result2 bool) { + fake.readyMutex.Lock() + defer fake.readyMutex.Unlock() + fake.ReadyStub = nil + if fake.readyReturnsOnCall == nil { + fake.readyReturnsOnCall = make(map[int]struct { + result1 grpc.Connection + result2 bool + }) + } + fake.readyReturnsOnCall[i] = struct { + result1 grpc.Connection + result2 bool + }{result1, result2} +} + func (fake *FakeConnectionsTracker) SetInstanceID(arg1 string, arg2 string) { fake.setInstanceIDMutex.Lock() fake.setInstanceIDArgsForCall = append(fake.setInstanceIDArgsForCall, struct { @@ -280,10 +280,10 @@ func (fake *FakeConnectionsTracker) UntrackConnectionsForParentArgsForCall(i int func (fake *FakeConnectionsTracker) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.connectionIsReadyMutex.RLock() - defer fake.connectionIsReadyMutex.RUnlock() fake.getConnectionMutex.RLock() defer fake.getConnectionMutex.RUnlock() + fake.readyMutex.RLock() + defer fake.readyMutex.RUnlock() fake.setInstanceIDMutex.RLock() defer fake.setInstanceIDMutex.RUnlock() fake.trackMutex.RLock()