Skip to content

Commit

Permalink
chore: Add more logs with timing information to application controller [
Browse files Browse the repository at this point in the history
argoproj#18923] (argoproj#18926)

Closes argoproj#18923
There are some gaps in debugging information for long reconciliations. Fill in a lot of those gaps by adding more debug logs with timing information about different code execution steps.
Also, fix a flaky test in app_test.go.

Signed-off-by: Andrii Korotkov <[email protected]>
  • Loading branch information
andrii-korotkov-verkada authored and crenshaw-dev committed Jul 17, 2024
1 parent 0704aa6 commit 48e6f13
Showing 1 changed file with 94 additions and 2 deletions.
96 changes: 94 additions & 2 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
argodiff "github.com/argoproj/argo-cd/v2/util/argo/diff"
"github.com/argoproj/argo-cd/v2/util/argo/normalizers"
"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/argo-cd/v2/util/stats"

kubeerrors "k8s.io/apimachinery/pkg/api/errors"

Expand Down Expand Up @@ -453,19 +454,32 @@ func (ctrl *ApplicationController) handleObjectUpdated(managedByApp map[string]b
// setAppManagedResources will build a list of ResourceDiff based on the provided comparisonResult
// and persist app resources related data in the cache. Will return the persisted ApplicationTree.
func (ctrl *ApplicationController) setAppManagedResources(a *appv1.Application, comparisonResult *comparisonResult) (*appv1.ApplicationTree, error) {
ts := stats.NewTimingStats()
defer func() {
logCtx := getAppLog(a)
for k, v := range ts.Timings() {
logCtx = logCtx.WithField(k, v.Milliseconds())
}
logCtx = logCtx.WithField("time_ms", time.Since(ts.StartTime).Milliseconds())
logCtx.Debug("Finished setting app managed resources")
}()
managedResources, err := ctrl.hideSecretData(a, comparisonResult)
ts.AddCheckpoint("hide_secret_data_ms")
if err != nil {
return nil, fmt.Errorf("error getting managed resources: %w", err)
}
tree, err := ctrl.getResourceTree(a, managedResources)
ts.AddCheckpoint("get_resource_tree_ms")
if err != nil {
return nil, fmt.Errorf("error getting resource tree: %w", err)
}
err = ctrl.cache.SetAppResourcesTree(a.InstanceName(ctrl.namespace), tree)
ts.AddCheckpoint("set_app_resources_tree_ms")
if err != nil {
return nil, fmt.Errorf("error setting app resource tree: %w", err)
}
err = ctrl.cache.SetAppManagedResources(a.InstanceName(ctrl.namespace), managedResources)
ts.AddCheckpoint("set_app_managed_resources_ms")
if err != nil {
return nil, fmt.Errorf("error setting app managed resources: %w", err)
}
Expand Down Expand Up @@ -497,8 +511,18 @@ func isKnownOrphanedResourceExclusion(key kube.ResourceKey, proj *appv1.AppProje
}

func (ctrl *ApplicationController) getResourceTree(a *appv1.Application, managedResources []*appv1.ResourceDiff) (*appv1.ApplicationTree, error) {
ts := stats.NewTimingStats()
defer func() {
logCtx := getAppLog(a)
for k, v := range ts.Timings() {
logCtx = logCtx.WithField(k, v.Milliseconds())
}
logCtx = logCtx.WithField("time_ms", time.Since(ts.StartTime).Milliseconds())
logCtx.Debug("Finished getting resource tree")
}()
nodes := make([]appv1.ResourceNode, 0)
proj, err := ctrl.getAppProj(a)
ts.AddCheckpoint("get_app_proj_ms")
if err != nil {
return nil, fmt.Errorf("failed to get project: %w", err)
}
Expand All @@ -512,6 +536,7 @@ func (ctrl *ApplicationController) getResourceTree(a *appv1.Application, managed
}
warnOrphaned = proj.Spec.OrphanedResources.IsWarn()
}
ts.AddCheckpoint("get_orphaned_resources_ms")
for i := range managedResources {
managedResource := managedResources[i]
delete(orphanedNodesMap, kube.NewResourceKey(managedResource.Group, managedResource.Kind, managedResource.Namespace, managedResource.Name))
Expand Down Expand Up @@ -556,6 +581,7 @@ func (ctrl *ApplicationController) getResourceTree(a *appv1.Application, managed
}
}
}
ts.AddCheckpoint("process_managed_resources_ms")
orphanedNodes := make([]appv1.ResourceNode, 0)
for k := range orphanedNodesMap {
if k.Namespace != "" && proj.IsGroupKindPermitted(k.GroupKind(), true) && !isKnownOrphanedResourceExclusion(k, proj) {
Expand Down Expand Up @@ -598,15 +624,26 @@ func (ctrl *ApplicationController) getResourceTree(a *appv1.Application, managed
sort.Slice(orphanedNodes, func(i, j int) bool {
return orphanedNodes[i].ResourceRef.String() < orphanedNodes[j].ResourceRef.String()
})
ts.AddCheckpoint("process_orphaned_resources_ms")

hosts, err := ctrl.getAppHosts(a, nodes)
if err != nil {
return nil, fmt.Errorf("failed to get app hosts: %w", err)
}
ts.AddCheckpoint("get_app_hosts_ms")
return &appv1.ApplicationTree{Nodes: nodes, OrphanedNodes: orphanedNodes, Hosts: hosts}, nil
}

func (ctrl *ApplicationController) getAppHosts(a *appv1.Application, appNodes []appv1.ResourceNode) ([]appv1.HostInfo, error) {
ts := stats.NewTimingStats()
defer func() {
logCtx := getAppLog(a)
for k, v := range ts.Timings() {
logCtx = logCtx.WithField(k, v.Milliseconds())
}
logCtx = logCtx.WithField("time_ms", time.Since(ts.StartTime).Milliseconds())
logCtx.Debug("Finished getting app hosts")
}()
supportedResourceNames := map[v1.ResourceName]bool{
v1.ResourceCPU: true,
v1.ResourceStorage: true,
Expand Down Expand Up @@ -636,6 +673,7 @@ func (ctrl *ApplicationController) getAppHosts(a *appv1.Application, appNodes []
}
}
})
ts.AddCheckpoint("iterate_resources_ms")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -691,6 +729,7 @@ func (ctrl *ApplicationController) getAppHosts(a *appv1.Application, appNodes []
})
hosts = append(hosts, appv1.HostInfo{Name: nodeName, SystemInfo: node.SystemInfo, ResourcesInfo: resourcesInfo})
}
ts.AddCheckpoint("process_app_pods_by_node_ms")
return hosts, nil
}

Expand Down Expand Up @@ -913,21 +952,32 @@ func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext b
return
}
app := origApp.DeepCopy()
logCtx := getAppLog(app)
ts := stats.NewTimingStats()
defer func() {
for k, v := range ts.Timings() {
logCtx = logCtx.WithField(k, v.Milliseconds())
}
logCtx = logCtx.WithField("time_ms", time.Since(ts.StartTime).Milliseconds())
logCtx.Debug("Finished processing app operation queue item")
}()

if app.Operation != nil {
// If we get here, we are about to process an operation, but we cannot rely on informer since it might have stale data.
// So always retrieve the latest version to ensure it is not stale to avoid unnecessary syncing.
// We cannot rely on informer since applications might be updated by both application controller and api server.
freshApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace).Get(context.Background(), app.ObjectMeta.Name, metav1.GetOptions{})
if err != nil {
getAppLog(app).Errorf("Failed to retrieve latest application state: %v", err)
logCtx.Errorf("Failed to retrieve latest application state: %v", err)
return
}
app = freshApp
}
ts.AddCheckpoint("get_fresh_app_ms")

if app.Operation != nil {
ctrl.processRequestedAppOperation(app)
ts.AddCheckpoint("process_requested_app_operation_ms")
} else if app.DeletionTimestamp != nil {
if err = ctrl.finalizeApplicationDeletion(app, func(project string) ([]*appv1.Cluster, error) {
return ctrl.db.GetProjectClusters(context.Background(), project)
Expand All @@ -939,6 +989,7 @@ func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext b
message := fmt.Sprintf("Unable to delete application resources: %v", err.Error())
ctrl.logAppEvent(app, argo.EventInfo{Reason: argo.EventReasonStatusRefreshed, Type: v1.EventTypeWarning}, message, context.TODO())
}
ts.AddCheckpoint("finalize_application_deletion_ms")
}
return
}
Expand Down Expand Up @@ -1271,6 +1322,14 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
ctrl.setOperationState(app, state)
}
}()
ts := stats.NewTimingStats()
defer func() {
for k, v := range ts.Timings() {
logCtx = logCtx.WithField(k, v.Milliseconds())
}
logCtx = logCtx.WithField("time_ms", time.Since(ts.StartTime).Milliseconds())
logCtx.Debug("Finished processing requested app operation")
}()
terminating := false
if isOperationInProgress(app) {
state = app.Status.OperationState.DeepCopy()
Expand Down Expand Up @@ -1305,16 +1364,19 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
ctrl.setOperationState(app, state)
logCtx.Infof("Initialized new operation: %v", *app.Operation)
}
ts.AddCheckpoint("initial_operation_stage_ms")

if err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, ctrl.db); err != nil {
state.Phase = synccommon.OperationFailed
state.Message = err.Error()
} else {
ctrl.appStateManager.SyncAppState(app, state)
}
ts.AddCheckpoint("validate_and_sync_app_state_ms")

// Check whether application is allowed to use project
_, err := ctrl.getAppProj(app)
ts.AddCheckpoint("get_app_proj_ms")
if err != nil {
state.Phase = synccommon.OperationError
state.Message = err.Error()
Expand Down Expand Up @@ -1357,6 +1419,7 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
}

ctrl.setOperationState(app, state)
ts.AddCheckpoint("final_set_operation_state")
if state.Phase.Completed() && (app.Operation.Sync != nil && !app.Operation.Sync.DryRun) {
// if we just completed an operation, force a refresh so that UI will report up-to-date
// sync/health information
Expand All @@ -1367,6 +1430,7 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
logCtx.Warnf("Fails to requeue application: %v", err)
}
}
ts.AddCheckpoint("request_app_refresh_ms")
}

func (ctrl *ApplicationController) setOperationState(app *appv1.Application, state *appv1.OperationState) {
Expand Down Expand Up @@ -1467,6 +1531,7 @@ func (ctrl *ApplicationController) PatchAppWithWriteBack(ctx context.Context, na
}

func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext bool) {
ts := stats.NewTimingStats()
patchMs := time.Duration(0) // time spent in doing patch/update calls
setOpMs := time.Duration(0) // time spent in doing Operation patch calls in autosync
appKey, shutdown := ctrl.appRefreshQueue.Get()
Expand Down Expand Up @@ -1513,6 +1578,9 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
defer func() {
reconcileDuration := time.Since(startTime)
ctrl.metricsServer.IncReconcile(origApp, reconcileDuration)
for k, v := range ts.Timings() {
logCtx = logCtx.WithField(k, v.Milliseconds())
}
logCtx.WithFields(log.Fields{
"time_ms": reconcileDuration.Milliseconds(),
"patch_ms": patchMs.Milliseconds(),
Expand All @@ -1538,8 +1606,10 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
return
}
}
ts.AddCheckpoint("comparison_with_nothing_ms")

project, hasErrors := ctrl.refreshAppConditions(app)
ts.AddCheckpoint("refresh_app_conditions_ms")
if hasErrors {
app.Status.Sync.Status = appv1.SyncStatusCodeUnknown
app.Status.Health.Status = health.HealthStatusUnknown
Expand All @@ -1551,6 +1621,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
if err := ctrl.cache.SetAppManagedResources(app.InstanceName(ctrl.namespace), nil); err != nil {
logCtx.Warnf("failed to set app managed resources tree: %v", err)
}
ts.AddCheckpoint("process_refresh_app_conditions_errors_ms")
return
}

Expand Down Expand Up @@ -1590,6 +1661,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
compareResult, err := ctrl.appStateManager.CompareAppState(app, project, revisions, sources,
refreshType == appv1.RefreshTypeHard,
comparisonLevel == CompareWithLatestForceResolve, localManifests, hasMultipleSources, false)
ts.AddCheckpoint("compare_app_state_ms")

if goerrors.Is(err, CompareStateRepoError) {
logCtx.Warnf("Ignoring temporary failed attempt to compare app state against repo: %v", err)
Expand All @@ -1601,8 +1673,10 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
}

ctrl.normalizeApplication(origApp, app)
ts.AddCheckpoint("normalize_application_ms")

tree, err := ctrl.setAppManagedResources(app, compareResult)
ts.AddCheckpoint("set_app_managed_resources_ms")
if err != nil {
logCtx.Errorf("Failed to cache app resources: %v", err)
} else {
Expand All @@ -1626,6 +1700,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
} else {
logCtx.Info("Sync prevented by sync window")
}
ts.AddCheckpoint("auto_sync_ms")

if app.Status.ReconciledAt == nil || comparisonLevel >= CompareWithLatest {
app.Status.ReconciledAt = &now
Expand All @@ -1639,7 +1714,10 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
app.Status.SourceType = compareResult.appSourceType
app.Status.SourceTypes = compareResult.appSourceTypes
app.Status.ControllerNamespace = ctrl.namespace
ts.AddCheckpoint("app_status_update_ms")
patchMs = ctrl.persistAppStatus(origApp, &app.Status)
// This is a partly a duplicate of patch_ms, but more descriptive and allows to have measurement for the next step.
ts.AddCheckpoint("persist_app_status_ms")
if (compareResult.hasPostDeleteHooks != app.HasPostDeleteFinalizer() || compareResult.hasPostDeleteHooks != app.HasPostDeleteFinalizer("cleanup")) &&
app.GetDeletionTimestamp() == nil {
if compareResult.hasPostDeleteHooks {
Expand All @@ -1654,6 +1732,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
logCtx.Errorf("Failed to update finalizers: %v", err)
}
}
ts.AddCheckpoint("process_finalizers_ms")
return
}

Expand Down Expand Up @@ -1813,10 +1892,18 @@ func (ctrl *ApplicationController) persistAppStatus(orig *appv1.Application, new

// autoSync will initiate a sync operation for an application configured with automated sync
func (ctrl *ApplicationController) autoSync(app *appv1.Application, syncStatus *appv1.SyncStatus, resources []appv1.ResourceStatus) (*appv1.ApplicationCondition, time.Duration) {
logCtx := getAppLog(app)
ts := stats.NewTimingStats()
defer func() {
for k, v := range ts.Timings() {
logCtx = logCtx.WithField(k, v.Milliseconds())
}
logCtx = logCtx.WithField("time_ms", time.Since(ts.StartTime).Milliseconds())
logCtx.Debug("Finished auto sync")
}()
if app.Spec.SyncPolicy == nil || app.Spec.SyncPolicy.Automated == nil {
return nil, 0
}
logCtx := getAppLog(app)

if app.Operation != nil {
logCtx.Infof("Skipping auto-sync: another operation is in progress")
Expand Down Expand Up @@ -1851,6 +1938,7 @@ func (ctrl *ApplicationController) autoSync(app *appv1.Application, syncStatus *
desiredCommitSHA := syncStatus.Revision
desiredCommitSHAsMS := syncStatus.Revisions
alreadyAttempted, attemptPhase := alreadyAttemptedSync(app, desiredCommitSHA, desiredCommitSHAsMS, app.Spec.HasMultipleSources())
ts.AddCheckpoint("already_attempted_sync_ms")
selfHeal := app.Spec.SyncPolicy.Automated.SelfHeal
op := appv1.Operation{
Sync: &appv1.SyncOperation{
Expand Down Expand Up @@ -1894,6 +1982,7 @@ func (ctrl *ApplicationController) autoSync(app *appv1.Application, syncStatus *
return nil, 0
}
}
ts.AddCheckpoint("already_attempted_check_ms")

if app.Spec.SyncPolicy.Automated.Prune && !app.Spec.SyncPolicy.Automated.AllowEmpty {
bAllNeedPrune := true
Expand All @@ -1910,8 +1999,10 @@ func (ctrl *ApplicationController) autoSync(app *appv1.Application, syncStatus *
}

appIf := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace)
ts.AddCheckpoint("get_applications_ms")
start := time.Now()
updatedApp, err := argo.SetAppOperation(appIf, app.Name, &op)
ts.AddCheckpoint("set_app_operation_ms")
setOpTime := time.Since(start)
if err != nil {
if goerrors.Is(err, argo.ErrAnotherOperationInProgress) {
Expand All @@ -1926,6 +2017,7 @@ func (ctrl *ApplicationController) autoSync(app *appv1.Application, syncStatus *
} else {
ctrl.writeBackToInformer(updatedApp)
}
ts.AddCheckpoint("write_back_to_informer_ms")

var target string
if updatedApp.Spec.HasMultipleSources() {
Expand Down

0 comments on commit 48e6f13

Please sign in to comment.