Skip to content

Commit

Permalink
Add comments; send response when context canceled
Browse files Browse the repository at this point in the history
  • Loading branch information
sjberman committed Jan 28, 2025
1 parent 096d6a9 commit 8019ab3
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
11 changes: 11 additions & 0 deletions internal/mode/static/nginx/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,19 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
defer broadcaster.CancelSubscription(channels.ID)

for {
// When a message is received over the ListenCh, it is assumed and required that the
// deployment object is already LOCKED. This lock is acquired by the event handler before calling
// `updateNginxConfig`. The entire transaction (as described in above in the function comment)
// must be locked to prevent the deployment files from changing during the transaction.
// This means that the lock is held until we receive either an error or response from agent
// (via msgr.Errors() or msgr.Mesages()) and respond back, finally returning to the event handler
// which releases the lock.
select {
case <-ctx.Done():
select {
case channels.ResponseCh <- struct{}{}:
default:
}
return grpcStatus.Error(codes.Canceled, context.Cause(ctx).Error())
case msg := <-channels.ListenCh:
var req *pb.ManagementPlaneRequest
Expand Down
14 changes: 7 additions & 7 deletions internal/mode/static/nginx/agent/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ are locked by the caller, hence why the functions themselves do not set the lock
*/

// GetFile gets the requested file for the deployment and returns its contents.
// Caller MUST lock the deployment before calling this function.
// The deployment MUST already be locked before calling this function.
func (d *Deployment) GetFile(name, hash string) []byte {
for _, file := range d.files {
if name == file.Meta.GetName() && hash == file.Meta.GetHash() {
Expand All @@ -124,7 +124,7 @@ func (d *Deployment) GetFile(name, hash string) []byte {
}

// SetFiles updates the nginx files and fileOverviews for the deployment and returns the message to send.
// Caller MUST lock the deployment before calling this function.
// The deployment MUST already be locked before calling this function.
func (d *Deployment) SetFiles(files []File) broadcast.NginxAgentMessage {
d.files = files

Expand Down Expand Up @@ -158,32 +158,32 @@ func (d *Deployment) SetFiles(files []File) broadcast.NginxAgentMessage {

// SetNGINXPlusActions updates the deployment's latest NGINX Plus Actions to perform if using NGINX Plus.
// Used by a Subscriber when it first connects.
// Caller MUST lock the deployment before calling this function.
// The deployment MUST already be locked before calling this function.
func (d *Deployment) SetNGINXPlusActions(actions []*pb.NGINXPlusAction) {
d.nginxPlusActions = actions
}

// SetPodErrorStatus sets the error status of a Pod in this Deployment if applying the config failed.
// Caller MUST lock the deployment before calling this function.
// The deployment MUST already be locked before calling this function.
func (d *Deployment) SetPodErrorStatus(pod string, err error) {
d.podStatuses[pod] = err
}

// SetLatestConfigError sets the latest config apply error for the deployment.
// Caller MUST lock the deployment before calling this function.
// The deployment MUST already be locked before calling this function.
func (d *Deployment) SetLatestConfigError(err error) {
d.latestConfigError = err
}

// SetLatestUpstreamError sets the latest upstream update error for the deployment.
// Caller MUST lock the deployment before calling this function.
// The deployment MUST already be locked before calling this function.
func (d *Deployment) SetLatestUpstreamError(err error) {
d.latestUpstreamError = err
}

// GetConfigurationStatus returns the current config status for this Deployment. It combines
// the most recent errors (if they exist) for all Pods in the Deployment into a single error.
// Caller MUST lock the deployment before calling this function.
// The deployment MUST already be locked before calling this function.
func (d *Deployment) GetConfigurationStatus() error {
errs := make([]error, 0, len(d.podStatuses))
for _, err := range d.podStatuses {
Expand Down
2 changes: 2 additions & 0 deletions internal/mode/static/nginx/agent/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func (fs *fileService) Register(server *grpc.Server) {
}

// GetFile is called by the agent when it needs to download a file for a ConfigApplyRequest.
// The deployment object used to get the files is already LOCKED when this function is called,
// before the ConfigApply transaction is started.
func (fs *fileService) GetFile(
ctx context.Context,
req *pb.GetFileRequest,
Expand Down

0 comments on commit 8019ab3

Please sign in to comment.