diff --git a/internal/mode/static/manager.go b/internal/mode/static/manager.go index 0c72745345..b6da949c52 100644 --- a/internal/mode/static/manager.go +++ b/internal/mode/static/manager.go @@ -50,6 +50,7 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/licensing" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics/collectors" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent" + agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc" ngxcfg "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies/clientsettings" @@ -180,14 +181,14 @@ func StartManager(cfg config.Config) error { nginxUpdater := agent.NewNginxUpdater(cfg.Logger.WithName("nginxUpdater"), cfg.Plus) - grpcServer := &agent.GRPCServer{ - Logger: cfg.Logger.WithName("agentGRPCServer"), - RegisterServices: []func(*grpc.Server){ + grpcServer := agentgrpc.NewServer( + cfg.Logger.WithName("agentGRPCServer"), + grpcServerPort, + []func(*grpc.Server){ nginxUpdater.CommandService.Register, nginxUpdater.FileService.Register, }, - Port: grpcServerPort, - } + ) if err = mgr.Add(&runnables.LeaderOrNonLeader{Runnable: grpcServer}); err != nil { return fmt.Errorf("cannot register grpc server: %w", err) diff --git a/internal/mode/static/nginx/agent/agent.go b/internal/mode/static/nginx/agent/agent.go index c6955040cb..1ce5d21b0b 100644 --- a/internal/mode/static/nginx/agent/agent.go +++ b/internal/mode/static/nginx/agent/agent.go @@ -18,30 +18,31 @@ type NginxUpdater interface { type NginxUpdaterImpl struct { CommandService *commandService FileService *fileService - Logger logr.Logger - Plus bool + logger logr.Logger + plus bool } +// NewNginxUpdater returns a new NginxUpdaterImpl instance. func NewNginxUpdater(logger logr.Logger, plus bool) *NginxUpdaterImpl { return &NginxUpdaterImpl{ - Logger: logger, - Plus: plus, - CommandService: newCommandService(), - FileService: newFileService(), + logger: logger, + plus: plus, + CommandService: newCommandService(logger.WithName("commandService")), + FileService: newFileService(logger.WithName("fileService")), } } // UpdateConfig sends the nginx configuration to the agent. func (n *NginxUpdaterImpl) UpdateConfig(files int) { - n.Logger.Info("Sending nginx configuration to agent", "numFiles", files) + n.logger.Info("Sending nginx configuration to agent", "numFiles", files) } // UpdateUpstreamServers sends an APIRequest to the agent to update upstream servers using the NGINX Plus API. // Only applicable when using NGINX Plus. func (n *NginxUpdaterImpl) UpdateUpstreamServers() { - if !n.Plus { + if !n.plus { return } - n.Logger.Info("Updating upstream servers using NGINX Plus API") + n.logger.Info("Updating upstream servers using NGINX Plus API") } diff --git a/internal/mode/static/nginx/agent/command.go b/internal/mode/static/nginx/agent/command.go index 3cdf6ce101..9eabd8680e 100644 --- a/internal/mode/static/nginx/agent/command.go +++ b/internal/mode/static/nginx/agent/command.go @@ -6,32 +6,51 @@ import ( "fmt" "time" + "github.com/go-logr/logr" pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" "google.golang.org/grpc" + + agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc" + grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context" ) -// commandService handles the connection and subscription to the agent. +// commandService handles the connection and subscription to the data plane agent. type commandService struct { pb.CommandServiceServer + connTracker *agentgrpc.ConnectionsTracker + // TODO(sberman): all logs are at Info level right now. Adjust appropriately. + logger logr.Logger } -func newCommandService() *commandService { - return &commandService{} +func newCommandService(logger logr.Logger) *commandService { + return &commandService{ + logger: logger, + connTracker: agentgrpc.NewConnectionsTracker(), + } } func (cs *commandService) Register(server *grpc.Server) { pb.RegisterCommandServiceServer(server, cs) } +// CreateConnection registers a data plane agent with the control plane. func (cs *commandService) CreateConnection( - _ context.Context, + ctx context.Context, req *pb.CreateConnectionRequest, ) (*pb.CreateConnectionResponse, error) { if req == nil { return nil, errors.New("empty connection request") } - fmt.Printf("Creating connection for nginx pod: %s\n", req.GetResource().GetContainerInfo().GetHostname()) + gi, ok := grpcContext.GrpcInfoFromContext(ctx) + if !ok { + return nil, agentgrpc.ErrStatusInvalidConnection + } + + podName := req.GetResource().GetContainerInfo().GetHostname() + + cs.logger.Info(fmt.Sprintf("Creating connection for nginx pod: %s", podName)) + cs.connTracker.Track(gi.IPAddress, podName) return &pb.CreateConnectionResponse{ Response: &pb.CommandResponse{ @@ -40,50 +59,99 @@ func (cs *commandService) CreateConnection( }, nil } +// Subscribe is a decoupled communication mechanism between the data plane agent and control plane. func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error { - fmt.Println("Received subscribe request") - ctx := in.Context() + gi, ok := grpcContext.GrpcInfoFromContext(ctx) + if !ok { + return agentgrpc.ErrStatusInvalidConnection + } + + cs.logger.Info(fmt.Sprintf("Received subscribe request from %q", gi.IPAddress)) + + go cs.listenForDataPlaneResponse(ctx, in) + + // wait for the agent to report itself + podName, err := cs.waitForConnection(ctx, gi) + if err != nil { + cs.logger.Error(err, "error waiting for connection") + return err + } + + cs.logger.Info(fmt.Sprintf("Handling subscription for %s/%s", podName, gi.IPAddress)) for { select { case <-ctx.Done(): return ctx.Err() case <-time.After(1 * time.Minute): dummyRequest := &pb.ManagementPlaneRequest{ - Request: &pb.ManagementPlaneRequest_StatusRequest{ - StatusRequest: &pb.StatusRequest{}, + Request: &pb.ManagementPlaneRequest_HealthRequest{ + HealthRequest: &pb.HealthRequest{}, }, } - if err := in.Send(dummyRequest); err != nil { // will likely need retry logic - fmt.Printf("ERROR: %v\n", err) + if err := in.Send(dummyRequest); err != nil { // TODO(sberman): will likely need retry logic + cs.logger.Error(err, "error sending request to agent") } } } } -func (cs *commandService) UpdateDataPlaneStatus( - _ context.Context, - req *pb.UpdateDataPlaneStatusRequest, -) (*pb.UpdateDataPlaneStatusResponse, error) { - fmt.Println("Updating data plane status") +// TODO(sberman): current issue: when control plane restarts, agent doesn't re-establish a CreateConnection call, +// so this fails. +func (cs *commandService) waitForConnection(ctx context.Context, gi grpcContext.GrpcInfo) (string, error) { + var podName string + ticker := time.NewTicker(time.Second) + defer ticker.Stop() - if req == nil { - return nil, errors.New("empty update data plane status request") + timer := time.NewTimer(30 * time.Second) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-timer.C: + return "", errors.New("timed out waiting for agent connection") + case <-ticker.C: + if podName = cs.connTracker.GetConnection(gi.IPAddress); podName != "" { + return podName, nil + } + } } +} - return &pb.UpdateDataPlaneStatusResponse{}, nil +func (cs *commandService) listenForDataPlaneResponse(ctx context.Context, in pb.CommandService_SubscribeServer) { + for { + select { + case <-ctx.Done(): + return + default: + dataPlaneResponse, err := in.Recv() + cs.logger.Info(fmt.Sprintf("Received data plane response: %v", dataPlaneResponse)) + if err != nil { + cs.logger.Error(err, "failed to receive data plane response") + return + } + } + } } +// UpdateDataPlaneHealth includes full health information about the data plane as reported by the agent. +// TODO(sberman): Is health monitoring the data planes something useful for us to do? func (cs *commandService) UpdateDataPlaneHealth( _ context.Context, - req *pb.UpdateDataPlaneHealthRequest, + _ *pb.UpdateDataPlaneHealthRequest, ) (*pb.UpdateDataPlaneHealthResponse, error) { - fmt.Println("Updating data plane health") - - if req == nil { - return nil, errors.New("empty update dataplane health request") - } - return &pb.UpdateDataPlaneHealthResponse{}, nil } + +// UpdateDataPlaneStatus is called by agent on startup and upon any change in agent metadata, +// instance metadata, or configurations. Since directly changing nginx configuration on the instance +// is not supported, this is a no-op for NGF. +func (cs *commandService) UpdateDataPlaneStatus( + _ context.Context, + _ *pb.UpdateDataPlaneStatusRequest, +) (*pb.UpdateDataPlaneStatusResponse, error) { + return &pb.UpdateDataPlaneStatusResponse{}, nil +} diff --git a/internal/mode/static/nginx/agent/file.go b/internal/mode/static/nginx/agent/file.go index 9a3df38c4e..296e1705ee 100644 --- a/internal/mode/static/nginx/agent/file.go +++ b/internal/mode/static/nginx/agent/file.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/go-logr/logr" pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" "google.golang.org/grpc" ) @@ -11,52 +12,57 @@ import ( // fileService handles file management between the control plane and the agent. type fileService struct { pb.FileServiceServer + // TODO(sberman): all logs are at Info level right now. Adjust appropriately. + logger logr.Logger } -func newFileService() *fileService { - return &fileService{} +func newFileService(logger logr.Logger) *fileService { + return &fileService{logger: logger} } func (fs *fileService) Register(server *grpc.Server) { pb.RegisterFileServiceServer(server, fs) } +// GetOverview gets the overview of files for a particular configuration version of an instance. +// Agent calls this if it's missing an overview when a ConfigApplyRequest is called by the control plane. func (fs *fileService) GetOverview( _ context.Context, _ *pb.GetOverviewRequest, ) (*pb.GetOverviewResponse, error) { - fmt.Println("Get overview request") + fs.logger.Info("Get overview request") return &pb.GetOverviewResponse{ Overview: &pb.FileOverview{}, }, nil } -func (fs *fileService) UpdateOverview( - _ context.Context, - _ *pb.UpdateOverviewRequest, -) (*pb.UpdateOverviewResponse, error) { - fmt.Println("Update overview request") - - return &pb.UpdateOverviewResponse{}, nil -} - +// GetFile is called by the agent when it needs to download a file for a ConfigApplyRequest. func (fs *fileService) GetFile( _ context.Context, req *pb.GetFileRequest, ) (*pb.GetFileResponse, error) { filename := req.GetFileMeta().GetName() hash := req.GetFileMeta().GetHash() - fmt.Printf("Getting file: %s, %s\n", filename, hash) + fs.logger.Info(fmt.Sprintf("Getting file: %s, %s", filename, hash)) return &pb.GetFileResponse{}, nil } +// UpdateOverview is called by agent on startup and whenever any files change on the instance. +// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF. +func (fs *fileService) UpdateOverview( + _ context.Context, + _ *pb.UpdateOverviewRequest, +) (*pb.UpdateOverviewResponse, error) { + return &pb.UpdateOverviewResponse{}, nil +} + +// UpdateFile is called by agent whenever any files change on the instance. +// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF. func (fs *fileService) UpdateFile( _ context.Context, - req *pb.UpdateFileRequest, + _ *pb.UpdateFileRequest, ) (*pb.UpdateFileResponse, error) { - fmt.Println("Update file request for: ", req.GetFile().GetFileMeta().GetName()) - return &pb.UpdateFileResponse{}, nil } diff --git a/internal/mode/static/nginx/agent/grpc.go b/internal/mode/static/nginx/agent/grpc.go deleted file mode 100644 index 6c558da2f3..0000000000 --- a/internal/mode/static/nginx/agent/grpc.go +++ /dev/null @@ -1,59 +0,0 @@ -package agent - -import ( - "context" - "fmt" - "net" - "time" - - "github.com/go-logr/logr" - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" - "sigs.k8s.io/controller-runtime/pkg/manager" -) - -const ( - keepAliveTime = 1 * time.Minute - keepAliveTimeout = 15 * time.Second -) - -// GRPCServer is a gRPC server for communicating with the nginx agent. -type GRPCServer struct { - Logger logr.Logger - // RegisterServices is a list of functions to register gRPC services to the gRPC server. - RegisterServices []func(*grpc.Server) - // Port is the port that the server is listening on. - // Must be exposed in the control plane deployment/service. - Port int -} - -// Start is a runnable that starts the gRPC server for communicating with the nginx agent. -func (g *GRPCServer) Start(ctx context.Context) error { - listener, err := net.Listen("tcp", fmt.Sprintf(":%d", g.Port)) - if err != nil { - return err - } - - server := grpc.NewServer( - grpc.KeepaliveParams( - keepalive.ServerParameters{ - Time: keepAliveTime, - Timeout: keepAliveTimeout, - }, - ), - ) - - for _, registerSvc := range g.RegisterServices { - registerSvc(server) - } - - go func() { - <-ctx.Done() - g.Logger.Info("Shutting down GRPC Server") - server.GracefulStop() - }() - - return server.Serve(listener) -} - -var _ manager.Runnable = &GRPCServer{} diff --git a/internal/mode/static/nginx/agent/grpc/connections.go b/internal/mode/static/nginx/agent/grpc/connections.go new file mode 100644 index 0000000000..af99b84002 --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/connections.go @@ -0,0 +1,50 @@ +package grpc + +import ( + "sync" +) + +// ConnectionsTracker keeps track of all connections between the control plane and nginx agents. +type ConnectionsTracker struct { + // connections contains a map of all IP addresses that have connected and their associated pod names. + // TODO(sberman): we'll likely need to create a channel for each connection that can be stored in this map. + // Then the Subscription listens on the channel for its connection, while the nginxUpdater sends the config + // for the pod over that channel. + connections map[string]string + + lock sync.Mutex +} + +// NewConnectionsTracker returns a new ConnectionsTracker instance. +func NewConnectionsTracker() *ConnectionsTracker { + return &ConnectionsTracker{ + connections: make(map[string]string), + } +} + +// Track adds a connection to the tracking map. +// TODO(sberman): we need to handle the case when the token expires (once we support the token). +// This likely involves setting a callback to cancel a context when the token expires, which triggers +// the connection to be removed from the tracking list. +func (c *ConnectionsTracker) Track(address, hostname string) { + c.lock.Lock() + defer c.lock.Unlock() + + c.connections[address] = hostname +} + +// GetConnections returns all connections that are currently tracked. +func (c *ConnectionsTracker) GetConnections() map[string]string { + c.lock.Lock() + defer c.lock.Unlock() + + return c.connections +} + +// GetConnection returns the hostname of the requested connection. +func (c *ConnectionsTracker) GetConnection(address string) string { + c.lock.Lock() + defer c.lock.Unlock() + + return c.connections[address] +} diff --git a/internal/mode/static/nginx/agent/grpc/context/context.go b/internal/mode/static/nginx/agent/grpc/context/context.go new file mode 100644 index 0000000000..f8daf457eb --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/context/context.go @@ -0,0 +1,24 @@ +package context + +import ( + "context" +) + +// GrpcInfo for storing identity information for the gRPC client. +type GrpcInfo struct { + IPAddress string `json:"ip_address"` // ip address of the agent +} + +type contextGRPCKey struct{} + +// NewGrpcContext returns a new context.Context that has the provided GrpcInfo attached. +func NewGrpcContext(ctx context.Context, r GrpcInfo) context.Context { + return context.WithValue(ctx, contextGRPCKey{}, r) +} + +// GrpcInfoFromContext returns the GrpcInfo saved in ctx if it exists. +// Returns false if there's no GrpcInfo in the context. +func GrpcInfoFromContext(ctx context.Context) (GrpcInfo, bool) { + v, ok := ctx.Value(contextGRPCKey{}).(GrpcInfo) + return v, ok +} diff --git a/internal/mode/static/nginx/agent/grpc/context/doc.go b/internal/mode/static/nginx/agent/grpc/context/doc.go new file mode 100644 index 0000000000..689a126cf7 --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/context/doc.go @@ -0,0 +1,4 @@ +/* +Package context contains the functions for storing extra information in the gRPC context. +*/ +package context diff --git a/internal/mode/static/nginx/agent/grpc/doc.go b/internal/mode/static/nginx/agent/grpc/doc.go new file mode 100644 index 0000000000..b98f0af8b6 --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/doc.go @@ -0,0 +1,4 @@ +/* +Package grpc contains the functionality for the gRPC server for communicating with the nginx agent. +*/ +package grpc diff --git a/internal/mode/static/nginx/agent/grpc/grpc.go b/internal/mode/static/nginx/agent/grpc/grpc.go new file mode 100644 index 0000000000..0bac99f1b8 --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/grpc.go @@ -0,0 +1,91 @@ +package grpc + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/go-logr/logr" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/interceptor" +) + +const ( + keepAliveTime = 10 * time.Second + keepAliveTimeout = 10 * time.Second +) + +var ErrStatusInvalidConnection = status.Error(codes.Unauthenticated, "invalid connection") + +// Interceptor provides hooks to intercept the execution of an RPC on the server. +type Interceptor interface { + Stream() grpc.StreamServerInterceptor + Unary() grpc.UnaryServerInterceptor +} + +// Server is a gRPC server for communicating with the nginx agent. +type Server struct { + // Interceptor provides hooks to intercept the execution of an RPC on the server. + interceptor Interceptor + + logger logr.Logger + // RegisterServices is a list of functions to register gRPC services to the gRPC server. + registerServices []func(*grpc.Server) + // Port is the port that the server is listening on. + // Must be exposed in the control plane deployment/service. + port int +} + +func NewServer(logger logr.Logger, port int, registerSvcs []func(*grpc.Server)) *Server { + return &Server{ + logger: logger, + port: port, + registerServices: registerSvcs, + interceptor: interceptor.NewContextSetter(), + } +} + +// Start is a runnable that starts the gRPC server for communicating with the nginx agent. +func (g *Server) Start(ctx context.Context) error { + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", g.port)) + if err != nil { + return err + } + + server := grpc.NewServer( + grpc.KeepaliveParams( + keepalive.ServerParameters{ + Time: keepAliveTime, + Timeout: keepAliveTimeout, + }, + ), + grpc.KeepaliveEnforcementPolicy( + keepalive.EnforcementPolicy{ + MinTime: keepAliveTime, + PermitWithoutStream: true, + }, + ), + grpc.ChainStreamInterceptor(g.interceptor.Stream()), + grpc.ChainUnaryInterceptor(g.interceptor.Unary()), + ) + + for _, registerSvc := range g.registerServices { + registerSvc(server) + } + + go func() { + <-ctx.Done() + g.logger.Info("Shutting down GRPC Server") + server.GracefulStop() + }() + + return server.Serve(listener) +} + +var _ manager.Runnable = &Server{} diff --git a/internal/mode/static/nginx/agent/grpc/interceptor/doc.go b/internal/mode/static/nginx/agent/grpc/interceptor/doc.go new file mode 100644 index 0000000000..e5175664b9 --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/interceptor/doc.go @@ -0,0 +1,4 @@ +/* +Package interceptor contains the middleware for intercepting an RPC call. +*/ +package interceptor diff --git a/internal/mode/static/nginx/agent/grpc/interceptor/interceptor.go b/internal/mode/static/nginx/agent/grpc/interceptor/interceptor.go new file mode 100644 index 0000000000..3b36c23eef --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/interceptor/interceptor.go @@ -0,0 +1,83 @@ +package interceptor + +import ( + "context" + "fmt" + "net" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" + + grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context" +) + +// streamHandler is a struct that implements StreamHandler, allowing the interceptor to replace the context. +type streamHandler struct { + grpc.ServerStream + ctx context.Context +} + +func (sh *streamHandler) Context() context.Context { + return sh.ctx +} + +type ContextSetter struct{} + +func NewContextSetter() ContextSetter { + return ContextSetter{} +} + +func (c ContextSetter) Stream() grpc.StreamServerInterceptor { + return func( + srv interface{}, + ss grpc.ServerStream, + _ *grpc.StreamServerInfo, + handler grpc.StreamHandler, + ) error { + ctx, err := setContext(ss.Context()) + if err != nil { + return err + } + return handler(srv, &streamHandler{ + ServerStream: ss, + ctx: ctx, + }) + } +} + +func (c ContextSetter) Unary() grpc.UnaryServerInterceptor { + return func( + ctx context.Context, + req interface{}, + _ *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (resp interface{}, err error) { + if ctx, err = setContext(ctx); err != nil { + return nil, err + } + return handler(ctx, req) + } +} + +// TODO(sberman): for now, we'll just use the IP address of the agent to link a Connection +// to a Subscription by setting it in the context. Once we support auth, we can likely change this +// interceptor to instead set the uuid. +func setContext(ctx context.Context) (context.Context, error) { + p, ok := peer.FromContext(ctx) + if !ok { + return nil, status.Error(codes.InvalidArgument, "no peer data") + } + + addr, ok := p.Addr.(*net.TCPAddr) + if !ok { + panic(fmt.Sprintf("address %q was not of type net.TCPAddr", p.Addr.String())) + } + + gi := &grpcContext.GrpcInfo{ + IPAddress: addr.IP.String(), + } + + return grpcContext.NewGrpcContext(ctx, *gi), nil +}