Skip to content

Commit

Permalink
code cleanup; channel instead of ctx; log additions
Browse files Browse the repository at this point in the history
  • Loading branch information
sjberman committed Jan 27, 2025
1 parent 4fa25b9 commit 1e9562c
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 147 deletions.
43 changes: 28 additions & 15 deletions internal/mode/static/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,40 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log

// TODO(sberman): if nginx Deployment is scaled down, we should remove the pod from the ConnectionsTracker
// and Deployment.
// If fully deleted, then delete the deployment from the Store
var configApplied bool
deployment := h.cfg.nginxDeployments.GetOrStore(deploymentName, broadcast.NewDeploymentBroadcaster(ctx))
// If fully deleted, then delete the deployment from the Store and close the stopCh.
stopCh := make(chan struct{})
deployment := h.cfg.nginxDeployments.GetOrStore(deploymentName, broadcast.NewDeploymentBroadcaster(stopCh))
if deployment == nil {
panic("expected deployment, got nil")
}

configApplied := h.processStateAndBuildConfig(ctx, logger, gr, changeType, deployment)

configErr := deployment.GetLatestConfigError()
upstreamErr := deployment.GetLatestUpstreamError()
err := errors.Join(configErr, upstreamErr)

if configApplied || err != nil {
obj := &status.QueueObject{
Error: err,
Deployment: deploymentName,
}
h.cfg.statusQueue.Enqueue(obj)
}
}

func (h *eventHandlerImpl) processStateAndBuildConfig(
ctx context.Context,
logger logr.Logger,
gr *graph.Graph,
changeType state.ChangeType,
deployment *agent.Deployment,
) bool {
var configApplied bool
switch changeType {
case state.NoChange:
logger.Info("Handling events didn't result into NGINX configuration changes")
return
return false
case state.EndpointsOnlyChange:
h.version++
cfg := dataplane.BuildConfiguration(ctx, gr, h.cfg.serviceResolver, h.version)
Expand Down Expand Up @@ -227,17 +250,7 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
deployment.Lock.Unlock()
}

configErr := deployment.GetLatestConfigError()
upstreamErr := deployment.GetLatestUpstreamError()
err := errors.Join(configErr, upstreamErr)

if configApplied || err != nil {
obj := &status.QueueObject{
Error: err,
Deployment: deploymentName,
}
h.cfg.statusQueue.Enqueue(obj)
}
return configApplied
}

func (h *eventHandlerImpl) waitForStatusUpdates(ctx context.Context) {
Expand Down
24 changes: 4 additions & 20 deletions internal/mode/static/nginx/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,10 @@ func (n *NginxUpdaterImpl) UpdateConfig(
) bool {
n.logger.Info("Sending nginx configuration to agent")

// reset the latest error to nil now that we're applying new config
deployment.SetLatestConfigError(nil)

msg := deployment.SetFiles(files)
applied := deployment.GetBroadcaster().Send(msg)

latestStatus := deployment.GetConfigurationStatus()
if latestStatus != nil {
deployment.SetLatestConfigError(latestStatus)
}
deployment.SetLatestConfigError(deployment.GetConfigurationStatus())

return applied
}
Expand Down Expand Up @@ -131,18 +125,6 @@ func (n *NginxUpdaterImpl) UpdateUpstreamServers(
},
}
actions = append(actions, action)

msg := broadcast.NginxAgentMessage{
Type: broadcast.APIRequest,
NGINXPlusAction: action,
}

requestApplied, err := n.sendRequest(broadcaster, msg, deployment)
if err != nil {
errs = append(errs, fmt.Errorf(
"couldn't update upstream %q via the API: %w", upstream.Name, deployment.GetConfigurationStatus()))
}
applied = applied || requestApplied
}

for _, upstream := range conf.StreamUpstreams {
Expand All @@ -152,7 +134,9 @@ func (n *NginxUpdaterImpl) UpdateUpstreamServers(
},
}
actions = append(actions, action)
}

for _, action := range actions {
msg := broadcast.NginxAgentMessage{
Type: broadcast.APIRequest,
NGINXPlusAction: action,
Expand All @@ -161,7 +145,7 @@ func (n *NginxUpdaterImpl) UpdateUpstreamServers(
requestApplied, err := n.sendRequest(broadcaster, msg, deployment)
if err != nil {
errs = append(errs, fmt.Errorf(
"couldn't update upstream %q via the API: %w", upstream.Name, deployment.GetConfigurationStatus()))
"couldn't update upstream via the API: %w", deployment.GetConfigurationStatus()))
}
applied = applied || requestApplied
}
Expand Down
4 changes: 2 additions & 2 deletions internal/mode/static/nginx/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ func TestUpdateUpstreamServers(t *testing.T) {

if test.expErr {
expErr := errors.Join(
fmt.Errorf("couldn't update upstream \"test-upstream\" via the API: %w", testErr),
fmt.Errorf("couldn't update upstream \"test-stream-upstream\" via the API: %w", testErr),
fmt.Errorf("couldn't update upstream via the API: %w", testErr),
fmt.Errorf("couldn't update upstream via the API: %w", testErr),
)

g.Expect(deployment.GetLatestUpstreamError()).To(Equal(expErr))
Expand Down
11 changes: 5 additions & 6 deletions internal/mode/static/nginx/agent/broadcast/broadcast.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package broadcast

import (
"context"
"sync"

pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
Expand Down Expand Up @@ -49,15 +48,15 @@ type DeploymentBroadcaster struct {
}

// NewDeploymentBroadcaster returns a new instance of a DeploymentBroadcaster.
func NewDeploymentBroadcaster(ctx context.Context) *DeploymentBroadcaster {
func NewDeploymentBroadcaster(stopCh chan struct{}) *DeploymentBroadcaster {
broadcaster := &DeploymentBroadcaster{
listeners: make(map[string]storedChannels),
publishCh: make(chan NginxAgentMessage),
subCh: make(chan storedChannels),
unsubCh: make(chan string),
doneCh: make(chan struct{}),
}
go broadcaster.run(ctx)
go broadcaster.run(stopCh)

return broadcaster
}
Expand Down Expand Up @@ -99,14 +98,14 @@ func (b *DeploymentBroadcaster) CancelSubscription(id string) {
}

// run starts the broadcaster loop. It handles the following events:
// - if context is canceled, return.
// - if stopCh is closed, return.
// - if receiving a new subscriber, add it to the subscriber list.
// - if receiving a canceled subscription, remove it from the subscriber list.
// - if receiving a message to publish, send it to all subscribers.
func (b *DeploymentBroadcaster) run(ctx context.Context) {
func (b *DeploymentBroadcaster) run(stopCh chan struct{}) {
for {
select {
case <-ctx.Done():
case <-stopCh:
return
case channels := <-b.subCh:
b.listeners[channels.id] = channels
Expand Down
25 changes: 12 additions & 13 deletions internal/mode/static/nginx/agent/broadcast/broadcast_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package broadcast_test

import (
"context"
"testing"

. "github.com/onsi/gomega"
Expand All @@ -13,10 +12,10 @@ func TestSubscribe(t *testing.T) {
t.Parallel()
g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stopCh := make(chan struct{})
defer close(stopCh)

broadcaster := broadcast.NewDeploymentBroadcaster(ctx)
broadcaster := broadcast.NewDeploymentBroadcaster(stopCh)

subscriber := broadcaster.Subscribe()
g.Expect(subscriber.ID).NotTo(BeEmpty())
Expand All @@ -38,10 +37,10 @@ func TestSubscribe_MultipleListeners(t *testing.T) {
t.Parallel()
g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stopCh := make(chan struct{})
defer close(stopCh)

broadcaster := broadcast.NewDeploymentBroadcaster(ctx)
broadcaster := broadcast.NewDeploymentBroadcaster(stopCh)

subscriber1 := broadcaster.Subscribe()
subscriber2 := broadcaster.Subscribe()
Expand All @@ -67,10 +66,10 @@ func TestSubscribe_NoListeners(t *testing.T) {
t.Parallel()
g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stopCh := make(chan struct{})
defer close(stopCh)

broadcaster := broadcast.NewDeploymentBroadcaster(ctx)
broadcaster := broadcast.NewDeploymentBroadcaster(stopCh)

message := broadcast.NginxAgentMessage{
ConfigVersion: "v1",
Expand All @@ -85,10 +84,10 @@ func TestCancelSubscription(t *testing.T) {
t.Parallel()
g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stopCh := make(chan struct{})
defer close(stopCh)

broadcaster := broadcast.NewDeploymentBroadcaster(ctx)
broadcaster := broadcast.NewDeploymentBroadcaster(stopCh)

subscriber := broadcaster.Subscribe()

Expand Down
12 changes: 8 additions & 4 deletions internal/mode/static/nginx/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (cs *commandService) CreateConnection(
Error: err.Error(),
},
}
cs.logger.Error(err, "error getting pod owner")
return response, grpcStatus.Errorf(codes.Internal, "error getting pod owner %s", err.Error())
}

Expand Down Expand Up @@ -145,12 +146,12 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error

return err
}
deployment.Lock.RUnlock()

// subscribe to the deployment broadcaster to get file updates
broadcaster := deployment.GetBroadcaster()
channels := broadcaster.Subscribe()
defer broadcaster.CancelSubscription(channels.ID)
deployment.Lock.RUnlock()

for {
select {
Expand All @@ -175,7 +176,10 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
return grpcStatus.Error(codes.Internal, err.Error())
}
case err = <-msgr.Errors():
cs.logger.Error(err, "connection error")
cs.logger.Error(err, "connection error", "pod", conn.PodName)
deployment.SetPodErrorStatus(conn.PodName, err)
channels.ResponseCh <- struct{}{}

if errors.Is(err, io.EOF) {
return grpcStatus.Error(codes.Aborted, err.Error())
}
Expand Down Expand Up @@ -214,7 +218,7 @@ func (cs *commandService) waitForConnection(
case <-timer.C:
return nil, nil, err
case <-ticker.C:
if conn, ok := cs.connTracker.ConnectionIsReady(gi.IPAddress); ok {
if conn, ok := cs.connTracker.Ready(gi.IPAddress); ok {
// connection has been established, now ensure that the deployment exists in the store
if deployment := cs.nginxDeployments.Get(conn.Parent); deployment != nil {
return &conn, deployment, nil
Expand Down Expand Up @@ -332,7 +336,7 @@ func (cs *commandService) logAndSendErrorStatus(deployment *Deployment, conn *ag
if err != nil {
cs.logger.Error(err, "error sending request to agent")
} else {
cs.logger.Info(fmt.Sprintf("Successfully configured nginx for new subscription %q", conn.PodName))
cs.logger.Info("Successfully configured nginx for new subscription", "pod", conn.PodName)
}
deployment.SetPodErrorStatus(conn.PodName, err)

Expand Down
4 changes: 2 additions & 2 deletions internal/mode/static/nginx/agent/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func TestSubscribe(t *testing.T) {
PodName: "nginx-pod",
InstanceID: "nginx-id",
}
connTracker.ConnectionIsReadyReturns(conn, true)
connTracker.ReadyReturns(conn, true)

cs := newCommandService(
logr.Discard(),
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestSubscribe_Errors(t *testing.T) {
cs *commandService,
ct *agentgrpcfakes.FakeConnectionsTracker,
) {
ct.ConnectionIsReadyReturns(agentgrpc.Connection{}, true)
ct.ReadyReturns(agentgrpc.Connection{}, true)
cs.connectionTimeout = 1100 * time.Millisecond
},
errString: "timed out waiting for nginx deployment to be added to store",
Expand Down
1 change: 1 addition & 0 deletions internal/mode/static/nginx/agent/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (d *Deployment) SetNGINXPlusActions(actions []*pb.NGINXPlusAction) {
}

// SetPodErrorStatus sets the error status of a Pod in this Deployment if applying the config failed.
// Caller MUST lock the deployment before calling this function.
func (d *Deployment) SetPodErrorStatus(pod string, err error) {
d.podStatuses[pod] = err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/mode/static/nginx/agent/grpc/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type ConnectionsTracker interface {
Track(key string, conn Connection)
GetConnection(key string) Connection
ConnectionIsReady(key string) (Connection, bool)
Ready(key string) (Connection, bool)
SetInstanceID(key, id string)
UntrackConnectionsForParent(parent types.NamespacedName)
}
Expand Down Expand Up @@ -63,7 +63,7 @@ func (c *AgentConnectionsTracker) GetConnection(key string) Connection {

// ConnectionIsReady returns if the connection is ready to be used. In other words, agent
// has registered itself and an nginx instance with the control plane.
func (c *AgentConnectionsTracker) ConnectionIsReady(key string) (Connection, bool) {
func (c *AgentConnectionsTracker) Ready(key string) (Connection, bool) {
c.lock.RLock()
defer c.lock.RUnlock()

Expand Down
10 changes: 5 additions & 5 deletions internal/mode/static/nginx/agent/grpc/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestGetConnection(t *testing.T) {
g.Expect(nonExistent).To(Equal(agentgrpc.Connection{}))
}

func TestConnectionIsReady(t *testing.T) {
func TestReady(t *testing.T) {
t.Parallel()
g := NewWithT(t)

Expand All @@ -42,7 +42,7 @@ func TestConnectionIsReady(t *testing.T) {
}
tracker.Track("key1", conn)

trackedConn, ready := tracker.ConnectionIsReady("key1")
trackedConn, ready := tracker.Ready("key1")
g.Expect(ready).To(BeTrue())
g.Expect(trackedConn).To(Equal(conn))
}
Expand All @@ -59,7 +59,7 @@ func TestConnectionIsNotReady(t *testing.T) {
}
tracker.Track("key1", conn)

_, ready := tracker.ConnectionIsReady("key1")
_, ready := tracker.Ready("key1")
g.Expect(ready).To(BeFalse())
}

Expand All @@ -74,12 +74,12 @@ func TestSetInstanceID(t *testing.T) {
}
tracker.Track("key1", conn)

_, ready := tracker.ConnectionIsReady("key1")
_, ready := tracker.Ready("key1")
g.Expect(ready).To(BeFalse())

tracker.SetInstanceID("key1", "instance1")

trackedConn, ready := tracker.ConnectionIsReady("key1")
trackedConn, ready := tracker.Ready("key1")
g.Expect(ready).To(BeTrue())
g.Expect(trackedConn.InstanceID).To(Equal("instance1"))
}
Expand Down
1 change: 1 addition & 0 deletions internal/mode/static/nginx/agent/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (g *Server) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
g.logger.Info("Shutting down GRPC Server")
// Since we use a long-lived stream, GracefulStop does not terminate. Therefore we use Stop.
server.Stop()
}()

Expand Down
Loading

0 comments on commit 1e9562c

Please sign in to comment.