Skip to content

Commit

Permalink
Ready connection; broadcaster initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
sjberman committed Jan 28, 2025
1 parent 1d4d025 commit 096d6a9
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 122 deletions.
3 changes: 1 addition & 2 deletions internal/mode/static/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
ngfConfig "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/config"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/licensing"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/broadcast"
ngxConfig "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane"
Expand Down Expand Up @@ -184,7 +183,7 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
// and Deployment.
// If fully deleted, then delete the deployment from the Store and close the stopCh.
stopCh := make(chan struct{})
deployment := h.cfg.nginxDeployments.GetOrStore(deploymentName, broadcast.NewDeploymentBroadcaster(stopCh))
deployment := h.cfg.nginxDeployments.GetOrStore(deploymentName, stopCh)
if deployment == nil {
panic("expected deployment, got nil")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/mode/static/nginx/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (cs *commandService) waitForConnection(
case <-timer.C:
return nil, nil, err
case <-ticker.C:
if conn, ok := cs.connTracker.Ready(gi.IPAddress); ok {
if conn := cs.connTracker.GetConnection(gi.IPAddress); conn.Ready() {
// connection has been established, now ensure that the deployment exists in the store
if deployment := cs.nginxDeployments.Get(conn.Parent); deployment != nil {
return &conn, deployment, nil
Expand Down
12 changes: 6 additions & 6 deletions internal/mode/static/nginx/agent/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,13 @@ func TestSubscribe(t *testing.T) {
PodName: "nginx-pod",
InstanceID: "nginx-id",
}
connTracker.ReadyReturns(conn, true)
connTracker.GetConnectionReturns(conn)

store := NewDeploymentStore(&connTracker)
cs := newCommandService(
logr.Discard(),
fake.NewFakeClient(),
NewDeploymentStore(&connTracker),
store,
&connTracker,
status.NewQueue(),
)
Expand All @@ -315,7 +316,7 @@ func TestSubscribe(t *testing.T) {
broadcaster.SubscribeReturns(subChannels)

// set the initial files and actions to be applied by the Subscription
deployment := cs.nginxDeployments.GetOrStore(conn.Parent, broadcaster)
deployment := store.StoreWithBroadcaster(conn.Parent, broadcaster)
files := []File{
{
Meta: &pb.FileMeta{
Expand Down Expand Up @@ -438,7 +439,7 @@ func TestSubscribe_Errors(t *testing.T) {
cs *commandService,
ct *agentgrpcfakes.FakeConnectionsTracker,
) {
ct.ReadyReturns(agentgrpc.Connection{}, true)
ct.GetConnectionReturns(agentgrpc.Connection{InstanceID: "nginx-id"})
cs.connectionTimeout = 1100 * time.Millisecond
},
errString: "timed out waiting for nginx deployment to be added to store",
Expand Down Expand Up @@ -581,8 +582,7 @@ func TestSetInitialConfig_Errors(t *testing.T) {
InstanceID: "nginx-id",
}

broadcaster := &broadcastfakes.FakeBroadcaster{}
deployment := cs.nginxDeployments.GetOrStore(conn.Parent, broadcaster)
deployment := newDeployment(&broadcastfakes.FakeBroadcaster{})

if test.setup != nil {
test.setup(msgr, deployment)
Expand Down
16 changes: 14 additions & 2 deletions internal/mode/static/nginx/agent/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type Deployment struct {
Lock sync.RWMutex
}

// newDeployment returns a new deployment object.
// newDeployment returns a new Deployment object.
func newDeployment(broadcaster broadcast.Broadcaster) *Deployment {
return &Deployment{
broadcaster: broadcaster,
Expand Down Expand Up @@ -227,11 +227,23 @@ func (d *DeploymentStore) Get(nsName types.NamespacedName) *Deployment {

// GetOrStore returns the existing value for the key if present.
// Otherwise, it stores and returns the given value.
func (d *DeploymentStore) GetOrStore(nsName types.NamespacedName, broadcaster broadcast.Broadcaster) *Deployment {
func (d *DeploymentStore) GetOrStore(nsName types.NamespacedName, stopCh chan struct{}) *Deployment {
if deployment := d.Get(nsName); deployment != nil {
return deployment
}

deployment := newDeployment(broadcast.NewDeploymentBroadcaster(stopCh))
d.deployments.Store(nsName, deployment)

return deployment
}

// StoreWithBroadcaster creates a new Deployment with the supplied broadcaster and stores it.
// Used in unit tests to provide a mock broadcaster.
func (d *DeploymentStore) StoreWithBroadcaster(
nsName types.NamespacedName,
broadcaster broadcast.Broadcaster,
) *Deployment {
deployment := newDeployment(broadcaster)
d.deployments.Store(nsName, deployment)

Expand Down
4 changes: 2 additions & 2 deletions internal/mode/static/nginx/agent/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ func TestDeploymentStore(t *testing.T) {

nsName := types.NamespacedName{Namespace: "default", Name: "test-deployment"}

deployment := store.GetOrStore(nsName, &broadcastfakes.FakeBroadcaster{})
deployment := store.GetOrStore(nsName, nil)
g.Expect(deployment).ToNot(BeNil())

fetchedDeployment := store.Get(nsName)
g.Expect(fetchedDeployment).To(Equal(deployment))

deployment = store.GetOrStore(nsName, &broadcastfakes.FakeBroadcaster{})
deployment = store.GetOrStore(nsName, nil)
g.Expect(fetchedDeployment).To(Equal(deployment))

store.Remove(nsName)
Expand Down
5 changes: 2 additions & 3 deletions internal/mode/static/nginx/agent/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/types"

"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/broadcast/broadcastfakes"
agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc"
grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context"
agentgrpcfakes "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/grpcfakes"
Expand All @@ -32,7 +31,7 @@ func TestGetFile(t *testing.T) {
connTracker.GetConnectionReturns(conn)

depStore := NewDeploymentStore(connTracker)
dep := depStore.GetOrStore(deploymentName, &broadcastfakes.FakeBroadcaster{})
dep := depStore.GetOrStore(deploymentName, nil)

fileMeta := &pb.FileMeta{
Name: "test.conf",
Expand Down Expand Up @@ -155,7 +154,7 @@ func TestGetFile_FileNotFound(t *testing.T) {
connTracker.GetConnectionReturns(conn)

depStore := NewDeploymentStore(connTracker)
depStore.GetOrStore(deploymentName, &broadcastfakes.FakeBroadcaster{})
depStore.GetOrStore(deploymentName, nil)

fs := newFileService(logr.Discard(), depStore, connTracker)

Expand Down
17 changes: 6 additions & 11 deletions internal/mode/static/nginx/agent/grpc/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
type ConnectionsTracker interface {
Track(key string, conn Connection)
GetConnection(key string) Connection
Ready(key string) (Connection, bool)
SetInstanceID(key, id string)
UntrackConnectionsForParent(parent types.NamespacedName)
}
Expand All @@ -27,6 +26,12 @@ type Connection struct {
Parent types.NamespacedName
}

// Ready returns if the connection is ready to be used. In other words, agent
// has registered itself and an nginx instance with the control plane.
func (c *Connection) Ready() bool {
return c.InstanceID != ""
}

// AgentConnectionsTracker keeps track of all connections between the control plane and nginx agents.
type AgentConnectionsTracker struct {
// connections contains a map of all IP addresses that have connected and their connection info.
Expand Down Expand Up @@ -61,16 +66,6 @@ func (c *AgentConnectionsTracker) GetConnection(key string) Connection {
return c.connections[key]
}

// ConnectionIsReady returns if the connection is ready to be used. In other words, agent
// has registered itself and an nginx instance with the control plane.
func (c *AgentConnectionsTracker) Ready(key string) (Connection, bool) {
c.lock.RLock()
defer c.lock.RUnlock()

conn, ok := c.connections[key]
return conn, ok && conn.InstanceID != ""
}

// SetInstanceID sets the nginx instanceID for a connection.
func (c *AgentConnectionsTracker) SetInstanceID(key, id string) {
c.lock.Lock()
Expand Down
23 changes: 7 additions & 16 deletions internal/mode/static/nginx/agent/grpc/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,29 @@ func TestGetConnection(t *testing.T) {
g.Expect(nonExistent).To(Equal(agentgrpc.Connection{}))
}

func TestReady(t *testing.T) {
func TestConnectionIsReady(t *testing.T) {
t.Parallel()
g := NewWithT(t)

tracker := agentgrpc.NewConnectionsTracker()

conn := agentgrpc.Connection{
PodName: "pod1",
InstanceID: "instance1",
Parent: types.NamespacedName{Namespace: "default", Name: "parent1"},
}
tracker.Track("key1", conn)

trackedConn, ready := tracker.Ready("key1")
g.Expect(ready).To(BeTrue())
g.Expect(trackedConn).To(Equal(conn))
g.Expect(conn.Ready()).To(BeTrue())
}

func TestConnectionIsNotReady(t *testing.T) {
t.Parallel()
g := NewWithT(t)

tracker := agentgrpc.NewConnectionsTracker()

conn := agentgrpc.Connection{
PodName: "pod1",
Parent: types.NamespacedName{Namespace: "default", Name: "parent1"},
}
tracker.Track("key1", conn)

_, ready := tracker.Ready("key1")
g.Expect(ready).To(BeFalse())
g.Expect(conn.Ready()).To(BeFalse())
}

func TestSetInstanceID(t *testing.T) {
Expand All @@ -74,13 +65,13 @@ func TestSetInstanceID(t *testing.T) {
}
tracker.Track("key1", conn)

_, ready := tracker.Ready("key1")
g.Expect(ready).To(BeFalse())
trackedConn := tracker.GetConnection("key1")
g.Expect(trackedConn.Ready()).To(BeFalse())

tracker.SetInstanceID("key1", "instance1")

trackedConn, ready := tracker.Ready("key1")
g.Expect(ready).To(BeTrue())
trackedConn = tracker.GetConnection("key1")
g.Expect(trackedConn.Ready()).To(BeTrue())
g.Expect(trackedConn.InstanceID).To(Equal("instance1"))
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 096d6a9

Please sign in to comment.