diff --git a/api/event.go b/api/event.go index 19f109ba2..6d97f9b61 100644 --- a/api/event.go +++ b/api/event.go @@ -130,6 +130,7 @@ func EventToHealth(event string) models.Health { type Event struct { ID uuid.UUID `gorm:"default:generate_ulid()"` + EventID uuid.UUID `json:"event_id"` Name string `json:"name"` CreatedAt time.Time `json:"created_at"` Properties types.JSONStringMap `json:"properties"` @@ -142,6 +143,7 @@ type Event struct { func (t Event) ToPostQEvent() models.Event { return models.Event{ ID: t.ID, + EventID: t.EventID, Name: t.Name, Error: t.Error, Attempts: t.Attempts, diff --git a/cmd/notification.go b/cmd/notification.go index a2a8febd1..2a83b0e5e 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -110,13 +110,17 @@ func triggerExistingNotification(ctx context.Context, flags *sendFlags) error { } event := models.Event{ - Name: flags.Event, + Name: flags.Event, + EventID: configID, Properties: map[string]string{ "id": configID.String(), "description": lo.FromPtr(config.Description), "status": lo.FromPtr(config.Status), }, } + if flags.Event == api.EventConfigChanged || flags.Event == api.EventConfigUpdated { + event.Properties["config_id"] = configID.String() + } celEnv, err := notification.GetEnvForEvent(ctx, event) if err != nil { @@ -158,7 +162,8 @@ func triggerExistingNotification(ctx context.Context, flags *sendFlags) error { } payload := notification.NotificationEventPayload{ - ID: configID, + ResourceID: configID, + EventID: event.EventID, EventName: flags.Event, NotificationID: notifID, EventCreatedAt: time.Now(), @@ -195,12 +200,13 @@ func triggerExistingNotification(ctx context.Context, flags *sendFlags) error { func enqueueNotification(ctx context.Context, payload notification.NotificationEventPayload) error { event := models.Event{ Name: api.EventNotificationSend, + EventID: payload.GenerateEventID(), Properties: payload.AsMap(), } if err := ctx.DB().Clauses(events.EventQueueOnConflictClause).Create(&event).Error; err != nil { return fmt.Errorf("failed to enqueue notification: %w", err) } - fmt.Printf("Notification enqueued (event_id=%s) - will be processed by running instance\n", event.ID) + fmt.Printf("Notification enqueued (event_id=%s) - will be processed by running instance\n", event.EventID) return nil } diff --git a/events/event_queue.go b/events/event_queue.go index eb76b4846..30135cf67 100644 --- a/events/event_queue.go +++ b/events/event_queue.go @@ -132,7 +132,7 @@ func StartConsumers(ctx context.Context) { // on conflict clause when inserting new events to the `event_queue` table var EventQueueOnConflictClause = clause.OnConflict{ - Columns: []clause.Column{{Name: "name"}, {Name: "properties"}}, + Columns: models.EventQueueUniqueConstraint(), DoUpdates: clause.Assignments(map[string]any{ "attempts": 0, "last_attempt": nil, diff --git a/events/resource.go b/events/resource.go new file mode 100644 index 000000000..89cf0b6f6 --- /dev/null +++ b/events/resource.go @@ -0,0 +1,114 @@ +package events + +import ( + "github.com/flanksource/duty" + dutyAPI "github.com/flanksource/duty/api" + "github.com/flanksource/duty/context" + "github.com/flanksource/duty/models" + "github.com/google/uuid" + + "github.com/flanksource/incident-commander/api" +) + +type EventResource struct { + Component *models.Component `json:"component,omitempty"` + Config *models.ConfigItem `json:"config,omitempty"` + Check *models.Check `json:"check,omitempty"` + CheckSummary *models.CheckSummary `json:"check_summary,omitempty"` + Canary *models.Canary `json:"canary,omitempty"` +} + +func (t *EventResource) AsMap() map[string]any { + output := map[string]any{} + + if t.Component != nil { + output["component"] = t.Component.AsMap() + } + if t.Config != nil { + output["config"] = t.Config.AsMap() + } + if t.Check != nil { + output["check"] = t.Check.AsMap() + } + if t.Canary != nil { + output["canary"] = t.Canary.AsMap() + } + if t.CheckSummary != nil { + output["check_summary"] = t.CheckSummary.AsMap() + } + + return output +} + +// BuildEventResource creates an EventResource from an event by fetching the appropriate models from the database +func BuildEventResource(ctx context.Context, event models.Event) (EventResource, error) { + var eventResource EventResource + switch event.Name { + case api.EventCheckFailed, api.EventCheckPassed: + checkID := event.EventID + var check models.Check + if err := ctx.DB().Where("id = ?", checkID).Limit(1).Find(&check).Error; err != nil { + return eventResource, err + } + if check.ID == uuid.Nil { + return eventResource, dutyAPI.Errorf(dutyAPI.ENOTFOUND, "check(id=%s) not found", checkID) + } + eventResource.Check = &check + + if summary, err := duty.CheckSummary(ctx, checkID.String()); err != nil { + return eventResource, err + } else if summary != nil { + eventResource.CheckSummary = summary + } + + var canary models.Canary + if err := ctx.DB().Where("id = ?", eventResource.Check.CanaryID).Limit(1).Find(&canary).Error; err != nil { + return eventResource, err + } + if canary.ID == uuid.Nil { + return eventResource, dutyAPI.Errorf(dutyAPI.ENOTFOUND, "canary(id=%s) not found", eventResource.Check.CanaryID) + } + eventResource.Canary = &canary + + case api.EventComponentHealthy, api.EventComponentUnhealthy, api.EventComponentWarning, api.EventComponentUnknown: + var component models.Component + if err := ctx.DB().Model(&models.Component{}).Where("id = ?", event.EventID).Limit(1).Find(&component).Error; err != nil { + return eventResource, err + } + if component.ID == uuid.Nil { + return eventResource, dutyAPI.Errorf(dutyAPI.ENOTFOUND, "component(id=%s) not found", event.EventID) + } + eventResource.Component = &component + + case api.EventConfigHealthy, api.EventConfigUnhealthy, api.EventConfigWarning, api.EventConfigUnknown, api.EventConfigDegraded: + var config models.ConfigItem + if err := ctx.DB().Model(&models.ConfigItem{}).Where("id = ?", event.EventID).Limit(1).Find(&config).Error; err != nil { + return eventResource, err + } + if config.ID == uuid.Nil { + return eventResource, dutyAPI.Errorf(dutyAPI.ENOTFOUND, "config(id=%s) not found", event.EventID) + } + eventResource.Config = &config + + case api.EventConfigCreated, api.EventConfigDeleted: + var config models.ConfigItem + if err := ctx.DB().Model(&models.ConfigItem{}).Where("id = ?", event.EventID).Limit(1).Find(&config).Error; err != nil { + return eventResource, err + } + if config.ID == uuid.Nil { + return eventResource, dutyAPI.Errorf(dutyAPI.ENOTFOUND, "config(id=%s) not found", event.EventID) + } + eventResource.Config = &config + + case api.EventConfigChanged, api.EventConfigUpdated: + var config models.ConfigItem + if err := ctx.DB().Model(&models.ConfigItem{}).Where("id = ?", event.Properties["config_id"]).Limit(1).Find(&config).Error; err != nil { + return eventResource, err + } + if config.ID == uuid.Nil { + return eventResource, dutyAPI.Errorf(dutyAPI.ENOTFOUND, "config(id=%s) not found", event.Properties["config_id"]) + } + eventResource.Config = &config + } + return eventResource, nil +} diff --git a/go.mod b/go.mod index 837b98589..430ad6b6e 100644 --- a/go.mod +++ b/go.mod @@ -436,7 +436,7 @@ require ( sigs.k8s.io/yaml v1.6.0 ) -// replace github.com/flanksource/duty => ../duty +replace github.com/flanksource/duty => ../duty // replace github.com/flanksource/clicky => ../clicky diff --git a/incidents/responder/responder_events.go b/incidents/responder/responder_events.go index 3c0eca373..9ace05a48 100644 --- a/incidents/responder/responder_events.go +++ b/incidents/responder/responder_events.go @@ -29,7 +29,7 @@ func RegisterEvents(ctx context.Context) { // generateResponderAddedAsyncEvent generates async events for each of the configured responder clients // in the associated team. func generateResponderAddedAsyncEvent(ctx context.Context, event models.Event) error { - responderID := event.Properties["id"] + responderID := event.EventID.String() var responder api.Responder err := ctx.DB().Where("id = ? AND external_id is NULL", responderID).Preload("Incident").Preload("Team").Find(&responder).Error @@ -43,13 +43,21 @@ func generateResponderAddedAsyncEvent(ctx context.Context, event models.Event) e } if spec.ResponderClients.Jira != nil { - if err := ctx.DB().Clauses(events.EventQueueOnConflictClause).Create(&api.Event{Name: api.EventJiraResponderAdded, Properties: map[string]string{"id": responderID}}).Error; err != nil { + if err := ctx.DB().Clauses(events.EventQueueOnConflictClause).Create(&api.Event{ + Name: api.EventJiraResponderAdded, + EventID: event.EventID, + Properties: map[string]string{"id": responderID}, + }).Error; err != nil { return err } } if spec.ResponderClients.MSPlanner != nil { - if err := ctx.DB().Clauses(events.EventQueueOnConflictClause).Create(&api.Event{Name: api.EventMSPlannerResponderAdded, Properties: map[string]string{"id": responderID}}).Error; err != nil { + if err := ctx.DB().Clauses(events.EventQueueOnConflictClause).Create(&api.Event{ + Name: api.EventMSPlannerResponderAdded, + EventID: event.EventID, + Properties: map[string]string{"id": responderID}, + }).Error; err != nil { return err } } @@ -59,7 +67,7 @@ func generateResponderAddedAsyncEvent(ctx context.Context, event models.Event) e // generateCommentAddedAsyncEvent generates comment.add async events for each of the configured responder clients. func generateCommentAddedAsyncEvent(ctx context.Context, event models.Event) error { - commentID := event.Properties["id"] + commentID := event.EventID.String() var comment api.Comment err := ctx.DB().Where("id = ? AND external_id IS NULL", commentID).First(&comment).Error @@ -86,17 +94,25 @@ func generateCommentAddedAsyncEvent(ctx context.Context, event models.Event) err for _, responder := range responders { switch responder.Type { case "jira": - if err := ctx.DB().Clauses(events.EventQueueOnConflictClause).Create(&api.Event{Name: api.EventJiraCommentAdded, Properties: map[string]string{ - "responder_id": responder.ID.String(), - "id": commentID, - }}).Error; err != nil { + if err := ctx.DB().Clauses(events.EventQueueOnConflictClause).Create(&api.Event{ + Name: api.EventJiraCommentAdded, + EventID: event.EventID, + Properties: map[string]string{ + "responder_id": responder.ID.String(), + "id": commentID, + }, + }).Error; err != nil { return err } case "ms_planner": - if err := ctx.DB().Clauses(events.EventQueueOnConflictClause).Create(&api.Event{Name: api.EventMSPlannerCommentAdded, Properties: map[string]string{ - "responder_id": responder.ID.String(), - "id": commentID, - }}).Error; err != nil { + if err := ctx.DB().Clauses(events.EventQueueOnConflictClause).Create(&api.Event{ + Name: api.EventMSPlannerCommentAdded, + EventID: event.EventID, + Properties: map[string]string{ + "responder_id": responder.ID.String(), + "id": commentID, + }, + }).Error; err != nil { return err } } @@ -130,7 +146,7 @@ func handleResponderEvent(ctx context.Context, event models.Event) error { // TODO: Modify this such that it only notifies the responder mentioned in the event. func reconcileResponderEvent(ctx context.Context, event models.Event) error { - responderID := event.Properties["id"] + responderID := event.EventID.String() var responder api.Responder err := ctx.DB().Where("id = ? AND external_id is NULL", responderID).Preload("Incident").Preload("Team").Find(&responder).Error @@ -157,7 +173,7 @@ func reconcileResponderEvent(ctx context.Context, event models.Event) error { // TODO: Modify this such that it only adds the comment to the particular responder mentioned in the event. func reconcileCommentEvent(ctx context.Context, event models.Event) error { - commentID := event.Properties["id"] + commentID := event.EventID.String() var comment api.Comment err := ctx.DB().Where("id = ? AND external_id IS NULL", commentID).First(&comment).Error diff --git a/notification/events.go b/notification/events.go index fc094b37a..68ac6e1c3 100644 --- a/notification/events.go +++ b/notification/events.go @@ -136,11 +136,16 @@ func (t *notificationHandler) addNotificationEvent(ctx context.Context, event mo celEnv, err := GetEnvForEvent(ctx, event) if err != nil { - return ctx.Oops().Wrapf(err, "failed to get env for event") + return ctx.Oops().Wrapf(err, "failed to get env for event %s %s", event.ID, event.Name) } if lo.Contains(api.ConfigEvents, event.Name) { - if err := resolveGroupMembership(ctx, celEnv, event.Properties["id"]); err != nil { + configID := event.EventID.String() + if celEnv.ConfigItem != nil { + configID = celEnv.ConfigItem.ID.String() + } + + if err := resolveGroupMembership(ctx, celEnv, configID); err != nil { return ctx.Oops().Wrapf(err, "failed to resolve group membership for event") } } @@ -215,7 +220,7 @@ func resolveGroupMembershipForNotification(ctx context.Context, celEnv *celVaria if notification.Filter != "" { valid, err := ctx.RunTemplateBool(gomplate.Template{Expression: notification.Filter}, celEnv.AsMap(ctx, celVarGetLatestHealthStatus)) if err != nil { - return false, ctx.Oops().Wrapf(err, "failed to validate notification filter for notification %s", notificationID) + return false, ctx.Oops().Wrapf(err, "failed to validate notification filter for notification:%s with config: %s", notificationID, configID) } if !valid { @@ -280,7 +285,7 @@ func addNotificationEvent(ctx context.Context, id string, celEnv *celVariables, } history := models.NotificationSendHistory{ NotificationID: n.ID, - ResourceID: payload.ID, + ResourceID: payload.ResourceID, ResourceHealth: payload.ResourceHealth, ResourceStatus: payload.ResourceStatus, ResourceHealthDescription: payload.ResourceHealthDescription, @@ -306,7 +311,7 @@ func addNotificationEvent(ctx context.Context, id string, celEnv *celVariables, if !rateLimiter.Allow() { // rate limited notifications are simply dropped. ctx.Warnf("notification rate limited event=%s notification=%s resource=%s (health=%s, status=%s, description=%s)", - event.Name, id, payload.ID, payload.ResourceHealth, payload.ResourceStatus, payload.ResourceHealthDescription) + event.Name, id, payload.ResourceID, payload.ResourceHealth, payload.ResourceStatus, payload.ResourceHealthDescription) ctx.Counter("notification_rate_limited", "id", id).Add(1) continue } @@ -320,7 +325,7 @@ func addNotificationEvent(ctx context.Context, id string, celEnv *celVariables, } pendingHistory := models.NotificationSendHistory{ NotificationID: n.ID, - ResourceID: payload.ID, + ResourceID: payload.ResourceID, ResourceHealth: payload.ResourceHealth, ResourceStatus: payload.ResourceStatus, ResourceHealthDescription: payload.ResourceHealthDescription, @@ -342,6 +347,7 @@ func addNotificationEvent(ctx context.Context, id string, celEnv *celVariables, } else { newEvent := models.Event{ Name: api.EventNotificationSend, + EventID: payload.GenerateEventID(), Properties: payload.AsMap(), } if err := ctx.DB().Clauses(events.EventQueueOnConflictClause).Create(&newEvent).Error; err != nil { @@ -368,7 +374,7 @@ func buildNotificationHistoryPayload(ctx context.Context, payload NotificationEv env := *celEnv if payload.GroupID != nil { - groupedResources, err := db.GetGroupedResources(ctx, *payload.GroupID, payload.ID.String()) + groupedResources, err := db.GetGroupedResources(ctx, *payload.GroupID, payload.ResourceID.String()) if err != nil { return nil, nil, fmt.Errorf("failed to get grouped resources for notification[%s]: %w", payload.NotificationID, err) } @@ -431,7 +437,7 @@ func processNotificationConstraints(ctx context.Context, // Repeat interval check if n.RepeatInterval != nil { - blockingSendHistory, err := checkRepeatInterval(ctx, n, payload.GroupID, payload.ID.String(), sourceEvent) + blockingSendHistory, err := checkRepeatInterval(ctx, n, payload.GroupID, payload.ResourceID.String(), sourceEvent) if err != nil { // If there are any errors in calculating interval, we send the notification and log the error ctx.Errorf("error checking repeat interval for notification[%s]: %v", n.ID, err) @@ -439,7 +445,7 @@ func processNotificationConstraints(ctx context.Context, if blockingSendHistory != nil { ctx.Logger.V(6).Infof("skipping notification[%s] due to repeat interval", n.ID) - ctx.Counter("notification_skipped_by_repeat_interval", "id", n.ID.String(), "resource", payload.ID.String(), "source_event", sourceEvent).Add(1) + ctx.Counter("notification_skipped_by_repeat_interval", "id", n.ID.String(), "resource", payload.ResourceID.String(), "source_event", sourceEvent).Add(1) result := &validateResult{ BlockedWithStatus: models.NotificationStatusRepeatInterval, @@ -452,7 +458,7 @@ func processNotificationConstraints(ctx context.Context, if silencedBy := getFirstSilencer(ctx, celEnv, matchingSilences); silencedBy != nil { ctx.Logger.V(6).Infof("silencing notification for event %s due to %d matching silences", sourceEvent, matchingSilences) - ctx.Counter("notification_silenced", "id", n.ID.String(), "resource", payload.ID.String()).Add(1) + ctx.Counter("notification_silenced", "id", n.ID.String(), "resource", payload.ResourceID.String()).Add(1) return &validateResult{ BlockedWithStatus: models.NotificationStatusSilenced, SilencedBy: &silencedBy.ID, @@ -467,7 +473,7 @@ func processNotificationConstraints(ctx context.Context, if inhibitor != nil { ctx.Logger.V(6).Infof("skipping notification[%s] due to inhibition", n.ID) - ctx.Counter("notification_inhibited", "id", n.ID.String(), "resource", payload.ID.String(), "source_event", sourceEvent).Add(1) + ctx.Counter("notification_inhibited", "id", n.ID.String(), "resource", payload.ResourceID.String(), "source_event", sourceEvent).Add(1) return &validateResult{ BlockedWithStatus: models.NotificationStatusInhibited, @@ -598,7 +604,7 @@ func sendFallbackNotification(ctx context.Context, sendHistory models.Notificati func sendPendingNotification(ctx context.Context, history models.NotificationSendHistory, payload NotificationEventPayload) error { notificationContext := NewContext(ctx.WithSubject(payload.NotificationID.String()), payload.NotificationID).WithHistory(history) ctx.Debugf("[notification.send] %s ", payload.EventName) - notificationContext.WithSource(payload.EventName, payload.ID) + notificationContext.WithSource(payload.EventName, payload.ResourceID) notificationContext.WithGroupID(payload.GroupID) err := _sendNotification(notificationContext, payload) @@ -675,7 +681,7 @@ func calculateGroupByHash(ctx context.Context, groupBy []string, resourceID, eve func sendNotification(ctx context.Context, payload NotificationEventPayload) error { notificationContext := NewContext(ctx.WithSubject(payload.NotificationID.String()), payload.NotificationID) ctx.Debugf("[notification.send] %s ", payload.EventName) - notificationContext.WithSource(payload.EventName, payload.ID) + notificationContext.WithSource(payload.EventName, payload.ResourceID) notificationContext.WithGroupID(payload.GroupID) logs.IfError(notificationContext.StartLog(), "error persisting start of notification send history") @@ -690,7 +696,7 @@ func sendNotification(ctx context.Context, payload NotificationEventPayload) err } func _sendNotification(ctx *Context, payload NotificationEventPayload) error { - originalEvent := models.Event{Name: payload.EventName, CreatedAt: payload.EventCreatedAt} + originalEvent := payload.ParentEvent() if len(payload.Properties) > 0 { if err := json.Unmarshal(payload.Properties, &originalEvent.Properties); err != nil { return fmt.Errorf("failed to unmarshal properties: %w", err) @@ -703,7 +709,7 @@ func _sendNotification(ctx *Context, payload NotificationEventPayload) error { } if payload.GroupID != nil { - celEnv.GroupedResources, err = db.GetGroupedResources(ctx.Context, *payload.GroupID, payload.ID.String()) + celEnv.GroupedResources, err = db.GetGroupedResources(ctx.Context, *payload.GroupID, payload.ResourceID.String()) if err != nil { return ctx.Oops().Wrapf(err, "failed to get grouped resources for notification[%s]", payload.NotificationID) } @@ -723,7 +729,7 @@ func _sendNotification(ctx *Context, payload NotificationEventPayload) error { ctx.log.PendingPlaybookRun() } else { - traceLog("NotificationID=%s Resource=[%s/%s] Sending ...", nn.ID, payload.EventName, payload.ID) + traceLog("NotificationID=%s Resource=[%s/%s] Sending ...", nn.ID, payload.EventName, payload.ResourceID) if err := PrepareAndSendEventNotification(ctx, payload, celEnv); err != nil { return fmt.Errorf("failed to send notification for event: %w", err) } @@ -754,7 +760,7 @@ func GetEnvForEvent(ctx context.Context, event models.Event) (*celVariables, err var env celVariables if strings.HasPrefix(event.Name, "check.") { - checkID := event.Properties["id"] + checkID := event.EventID.String() lastRuntime := event.Properties["last_runtime"] check, err := query.FindCachedCheck(ctx, checkID) @@ -804,7 +810,7 @@ func GetEnvForEvent(ctx context.Context, event models.Event) (*celVariables, err } if event.Name == "incident.created" || strings.HasPrefix(event.Name, "incident.status.") { - incidentID := event.Properties["id"] + incidentID := event.EventID.String() incident, err := query.GetCachedIncident(ctx, incidentID) if err != nil { @@ -818,7 +824,7 @@ func GetEnvForEvent(ctx context.Context, event models.Event) (*celVariables, err } if strings.HasPrefix(event.Name, "incident.responder.") { - responderID := event.Properties["id"] + responderID := event.EventID.String() responder, err := responder.FindResponderByID(ctx, responderID) if err != nil { return nil, fmt.Errorf("error finding responder(id=%s): %v", responderID, err) @@ -840,8 +846,8 @@ func GetEnvForEvent(ctx context.Context, event models.Event) (*celVariables, err if strings.HasPrefix(event.Name, "incident.comment.") { var comment models.Comment - if err := ctx.DB().Where("id = ?", event.Properties["id"]).Find(&comment).Error; err != nil { - return nil, fmt.Errorf("error getting comment (id=%s)", event.Properties["id"]) + if err := ctx.DB().Where("id = ?", event.EventID).Find(&comment).Error; err != nil { + return nil, fmt.Errorf("error getting comment (id=%s): %w", event.EventID, err) } incident, err := query.GetCachedIncident(ctx, comment.IncidentID.String()) @@ -868,7 +874,7 @@ func GetEnvForEvent(ctx context.Context, event models.Event) (*celVariables, err if strings.HasPrefix(event.Name, "incident.dod.") { var evidence models.Evidence - if err := ctx.DB().Where("id = ?", event.Properties["id"]).Find(&evidence).Error; err != nil { + if err := ctx.DB().Where("id = ?", event.EventID).Find(&evidence).Error; err != nil { return nil, err } @@ -891,7 +897,7 @@ func GetEnvForEvent(ctx context.Context, event models.Event) (*celVariables, err } if strings.HasPrefix(event.Name, "component.") { - componentID := event.Properties["id"] + componentID := event.EventID.String() component, err := query.GetCachedComponent(ctx, componentID) if err != nil { @@ -917,7 +923,10 @@ func GetEnvForEvent(ctx context.Context, event models.Event) (*celVariables, err } if strings.HasPrefix(event.Name, "config.") { - configID := event.Properties["id"] + configID := event.EventID.String() + if event.Name == api.EventConfigChanged || event.Name == api.EventConfigUpdated { + configID = event.Properties["config_id"] + } config, err := query.GetCachedConfig(ctx, configID) if err != nil { diff --git a/notification/job.go b/notification/job.go index 4c2feb01a..4f47b5129 100644 --- a/notification/job.go +++ b/notification/job.go @@ -13,9 +13,6 @@ import ( "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" "github.com/flanksource/duty/query" - "github.com/flanksource/incident-commander/api" - v1 "github.com/flanksource/incident-commander/api/v1" - "github.com/flanksource/incident-commander/db" "github.com/google/uuid" "github.com/samber/lo" "gorm.io/gorm" @@ -24,6 +21,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/flanksource/incident-commander/api" + v1 "github.com/flanksource/incident-commander/api/v1" + "github.com/flanksource/incident-commander/db" + "github.com/flanksource/incident-commander/events" ) var CRDStatusUpdateQueue *collections.Queue[string] @@ -316,7 +318,7 @@ func shouldSkipNotificationDueToHealth(ctx context.Context, notif NotificationWi var payload NotificationEventPayload payload.FromMap(currentHistory.Payload) - originalEvent := models.Event{Name: payload.EventName, CreatedAt: payload.EventCreatedAt} + originalEvent := models.Event{Name: payload.EventName, EventID: payload.EventID, CreatedAt: payload.EventCreatedAt} if len(payload.Properties) > 0 { if err := json.Unmarshal(payload.Properties, &originalEvent.Properties); err != nil { return false, fmt.Errorf("failed to unmarshal properties: %w", err) @@ -363,10 +365,10 @@ func shouldSkipNotificationDueToHealth(ctx context.Context, notif NotificationWi deleted := resourceHealth.DeletedAt != nil relativeUpdatedAt := time.Since(lo.FromPtr(resourceHealth.UpdatedAt)) - traceLog("NotificationID=%s HistoryID=%s Resource=[%s/%s] PreviousHealth=%s CurrentHealth=%s UpdatedAt=%s RelativeUpdatedAtAgo=%s Checking if reportable", notif.ID, currentHistory.ID, payload.EventName, payload.ID, previousHealth, currentHealth, lo.FromPtr(resourceHealth.UpdatedAt), relativeUpdatedAt) + traceLog("NotificationID=%s HistoryID=%s Resource=[%s/%s] PreviousHealth=%s CurrentHealth=%s UpdatedAt=%s RelativeUpdatedAtAgo=%s Checking if reportable", notif.ID, currentHistory.ID, payload.EventName, payload.ResourceID, previousHealth, currentHealth, lo.FromPtr(resourceHealth.UpdatedAt), relativeUpdatedAt) if !isHealthReportable(notif.Events, previousHealth, currentHealth) || deleted { ctx.Logger.V(6).Infof("skipping notification[%s] as health change is not reportable", notif.ID) - traceLog("NotificationID=%s HistoryID=%s Resource=[%s/%s] PreviousHealth=%s CurrentHealth=%s ResourceDeleted=%v Skipping", notif.ID, currentHistory.ID, payload.EventName, payload.ID, previousHealth, currentHealth, deleted) + traceLog("NotificationID=%s HistoryID=%s Resource=[%s/%s] PreviousHealth=%s CurrentHealth=%s ResourceDeleted=%v Skipping", notif.ID, currentHistory.ID, payload.EventName, payload.ResourceID, previousHealth, currentHealth, deleted) if err := db.SkipNotificationSendHistory(ctx, currentHistory.ID); err != nil { return false, fmt.Errorf("failed to skip notification send history (%s): %w", currentHistory.ID, err) @@ -374,7 +376,7 @@ func shouldSkipNotificationDueToHealth(ctx context.Context, notif NotificationWi return true, nil } - traceLog("NotificationID=%s HistoryID=%s Resource=[%s/%s] PreviousHealth=%s CurrentHealth=%s Reporting ...", notif.ID, currentHistory.ID, payload.EventName, payload.ID, previousHealth, currentHealth) + traceLog("NotificationID=%s HistoryID=%s Resource=[%s/%s] PreviousHealth=%s CurrentHealth=%s Reporting ...", notif.ID, currentHistory.ID, payload.EventName, payload.ResourceID, previousHealth, currentHealth) return false, nil } @@ -406,14 +408,7 @@ func processPendingNotification(ctx context.Context, currentHistory models.Notif var payload NotificationEventPayload payload.FromMap(currentHistory.Payload) - event := models.Event{ - Name: payload.EventName, - CreatedAt: payload.EventCreatedAt, - Properties: map[string]string{ - "id": payload.ID.String(), - }, - } - celEnv, err := GetEnvForEvent(ctx, event) + celEnv, err := GetEnvForEvent(ctx, payload.ParentEvent()) if err != nil { return fmt.Errorf("failed to get cel env: %w", err) } @@ -457,20 +452,20 @@ func processPendingNotification(ctx context.Context, currentHistory models.Notif } func triggerIncrementalScrape(ctx context.Context, configID string) error { + parsedConfigID, err := uuid.Parse(configID) + if err != nil { + return fmt.Errorf("invalid config id(%s): %w", configID, err) + } + event := models.Event{ - Name: "config-db.incremental-scrape", + Name: "config-db.incremental-scrape", + EventID: parsedConfigID, Properties: map[string]string{ "config_id": configID, }, } - onConflictClause := clause.OnConflict{ - Columns: []clause.Column{{Name: "name"}, {Name: "properties"}}, - DoUpdates: clause.Assignments(map[string]any{ - "created_at": gorm.Expr("CURRENT_TIMESTAMP"), - }), - } - return ctx.DB().Clauses(onConflictClause).Create(&event).Error + return ctx.DB().Clauses(events.EventQueueOnConflictClause).Create(&event).Error } func isKubernetesConfigItem(ctx context.Context, configID string) (bool, error) { diff --git a/notification/send.go b/notification/send.go index 278bdd3e5..d6c8fcd05 100644 --- a/notification/send.go +++ b/notification/send.go @@ -8,6 +8,7 @@ import ( "time" "github.com/flanksource/commons/collections" + "github.com/flanksource/commons/hash" "github.com/flanksource/commons/utils" pkgConnection "github.com/flanksource/duty/connection" "github.com/flanksource/duty/context" @@ -19,6 +20,7 @@ import ( "github.com/flanksource/incident-commander/api" "github.com/flanksource/incident-commander/db" + "github.com/flanksource/incident-commander/events" "github.com/flanksource/incident-commander/logs" "github.com/flanksource/incident-commander/teams" ) @@ -55,11 +57,12 @@ func DefaultTitleAndBody(payload NotificationEventPayload, celEnv *celVariables) // NotificationEventPayload holds data to create a notification. type NotificationEventPayload struct { - ID uuid.UUID `json:"id"` // Resource id. depends what it is based on the original event. + ResourceID uuid.UUID `json:"resource_id"` // Resource id. depends what it is based on the original event. ResourceHealth models.Health `json:"resource_health"` ResourceStatus string `json:"resource_status"` ResourceHealthDescription string `json:"resource_health_description"` + EventID uuid.UUID `json:"event_id"` // The id of the original event this notification is for. EventName string `json:"event_name"` // The name of the original event this notification is for. NotificationID uuid.UUID `json:"notification_id,omitempty"` // ID of the notification. EventCreatedAt time.Time `json:"event_created_at"` // Timestamp at which the original event was created @@ -75,7 +78,37 @@ type NotificationEventPayload struct { NotificationName string `json:"notification_name,omitempty"` // Name of the notification of a team } -func (t *NotificationEventPayload) AsMap() map[string]string { +// Generates an idempotent event id for this notification send. +func (t NotificationEventPayload) GenerateEventID() uuid.UUID { + var recipientSig string + switch { + case t.Connection != nil: + recipientSig = "connection:" + t.Connection.String() + case t.PersonID != nil: + recipientSig = "person:" + t.PersonID.String() + case t.TeamID != nil: + recipientSig = "team:" + t.TeamID.String() + if t.NotificationName != "" { + recipientSig += ":" + t.NotificationName + } + case t.PlaybookID != nil: + recipientSig = "playbook:" + t.PlaybookID.String() + case t.CustomService != nil: + customServiceJSON, _ := json.Marshal(t.CustomService) + recipientSig = "custom:" + string(customServiceJSON) + } + + groupID := "" + if t.GroupID != nil { + groupID = t.GroupID.String() + } + + sig := fmt.Sprintf("%s-%s-%s-%s-group-%s-recipient-%s", t.NotificationID.String(), t.ResourceID.String(), t.EventName, t.EventID.String(), groupID, recipientSig) + generated, _ := hash.DeterministicUUID(sig) + return generated +} + +func (t NotificationEventPayload) AsMap() map[string]string { // NOTE: Because the payload is marshalled to map[string]string instead of map[string]any // the custom_service field cannot be marshalled. // So, we marshal it separately and add it to the map. @@ -93,6 +126,22 @@ func (t *NotificationEventPayload) AsMap() map[string]string { return m } +func (t NotificationEventPayload) ParentEvent() models.Event { + event := models.Event{ + Name: t.EventName, + CreatedAt: t.EventCreatedAt, + EventID: t.EventID, + } + + if t.EventName == api.EventConfigChanged || t.EventName == api.EventConfigUpdated { + event.Properties = map[string]string{ + "config_id": t.ResourceID.String(), + } + } + + return event +} + func (t *NotificationEventPayload) FromMap(m map[string]string) { b, _ := json.Marshal(m) _ = json.Unmarshal(b, &t) @@ -199,6 +248,7 @@ func triggerPlaybookRun(ctx *Context, celEnv *celVariables, playbookID uuid.UUID err := ctx.Transaction(func(txCtx context.Context, _ trace.Span) error { eventProp := types.JSONStringMap{ "id": playbookID.String(), + "playbook_id": playbookID.String(), "notification_id": ctx.notificationID.String(), "notification_dispatch_id": ctx.log.ID.String(), } @@ -214,9 +264,10 @@ func triggerPlaybookRun(ctx *Context, celEnv *celVariables, playbookID uuid.UUID event := models.Event{ Name: api.EventPlaybookRun, + EventID: ctx.log.ID, Properties: eventProp, } - if err := txCtx.DB().Create(&event).Error; err != nil { + if err := txCtx.DB().Clauses(events.EventQueueOnConflictClause).Create(&event).Error; err != nil { return fmt.Errorf("failed to create run: %w", err) } @@ -412,13 +463,19 @@ func CreateNotificationSendPayloads(ctx context.Context, event models.Event, n * var payloads []NotificationEventPayload - resourceID, err := uuid.Parse(event.Properties["id"]) - if err != nil { - return nil, fmt.Errorf("failed to parse resource id: %v", err) + resource := celEnv.SelectableResource() + resourceID := event.EventID + if resource != nil { + parsedResourceID, err := uuid.Parse(resource.GetID()) + if err != nil { + return nil, fmt.Errorf("failed to parse resource id(%s): %w", resource.GetID(), err) + } + resourceID = parsedResourceID } var eventProperties []byte if len(event.Properties) > 0 { + var err error eventProperties, err = json.Marshal(event.Properties) if err != nil { return nil, fmt.Errorf("failed to marshal event properties: %v", err) @@ -445,7 +502,6 @@ func CreateNotificationSendPayloads(ctx context.Context, event models.Event, n * } } - resource := celEnv.SelectableResource() var resourceHealth, resourceStatus, resourceHealthDescription string if resource != nil { var err error @@ -466,12 +522,13 @@ func CreateNotificationSendPayloads(ctx context.Context, event models.Event, n * if n.PlaybookID != nil { payload := NotificationEventPayload{ + EventID: event.EventID, EventName: event.Name, NotificationID: n.ID, ResourceHealth: models.Health(resourceHealth), ResourceStatus: resourceStatus, ResourceHealthDescription: resourceHealthDescription, - ID: resourceID, + ResourceID: resourceID, PlaybookID: n.PlaybookID, EventCreatedAt: event.CreatedAt, Properties: eventProperties, @@ -483,12 +540,13 @@ func CreateNotificationSendPayloads(ctx context.Context, event models.Event, n * if n.PersonID != nil { payload := NotificationEventPayload{ + EventID: event.EventID, EventName: event.Name, NotificationID: n.ID, ResourceHealth: models.Health(resourceHealth), ResourceHealthDescription: resourceHealthDescription, ResourceStatus: resourceStatus, - ID: resourceID, + ResourceID: resourceID, PersonID: n.PersonID, EventCreatedAt: event.CreatedAt, Properties: eventProperties, @@ -515,12 +573,13 @@ func CreateNotificationSendPayloads(ctx context.Context, event models.Event, n * } payload := NotificationEventPayload{ + EventID: event.EventID, EventName: event.Name, NotificationID: n.ID, ResourceHealth: models.Health(resourceHealth), ResourceHealthDescription: resourceHealthDescription, ResourceStatus: resourceStatus, - ID: resourceID, + ResourceID: resourceID, TeamID: n.TeamID, NotificationName: cn.Name, EventCreatedAt: event.CreatedAt, @@ -543,13 +602,14 @@ func CreateNotificationSendPayloads(ctx context.Context, event models.Event, n * } payload := NotificationEventPayload{ + EventID: event.EventID, EventName: event.Name, NotificationID: n.ID, ResourceHealth: models.Health(resourceHealth), ResourceHealthDescription: resourceHealthDescription, ResourceStatus: resourceStatus, CustomService: cn.DeepCopy(), - ID: resourceID, + ResourceID: resourceID, EventCreatedAt: event.CreatedAt, Properties: eventProperties, GroupID: groupID, diff --git a/notification/silence.go b/notification/silence.go index a8d767deb..373277789 100644 --- a/notification/silence.go +++ b/notification/silence.go @@ -310,6 +310,7 @@ func CanSilenceViaFilter(ctx context.Context, n []models.NotificationSendHistory event := models.Event{ Name: notif.SourceEvent, + EventID: notif.ResourceID, Properties: properties, } celEnv, err := GetEnvForEvent(ctx, event) diff --git a/playbook/actions/ai.go b/playbook/actions/ai.go index 7693a0b58..cabb66aba 100644 --- a/playbook/actions/ai.go +++ b/playbook/actions/ai.go @@ -23,6 +23,7 @@ import ( v1 "github.com/flanksource/incident-commander/api/v1" pkgArtifacts "github.com/flanksource/incident-commander/artifacts" "github.com/flanksource/incident-commander/db" + "github.com/flanksource/incident-commander/events" "github.com/flanksource/incident-commander/llm" llmContext "github.com/flanksource/incident-commander/llm/context" "github.com/flanksource/incident-commander/utils" @@ -57,13 +58,15 @@ func init() { type aiAction struct { PlaybookID uuid.UUID // ID of the playbook that is executing this action RunID uuid.UUID // ID of the run that is executing this action + ActionID uuid.UUID // ID of the action that is executing this action TemplateEnv TemplateEnv } -func NewAIAction(playbookID, runID uuid.UUID, templateEnv TemplateEnv) *aiAction { +func NewAIAction(playbookID, runID, actionID uuid.UUID, templateEnv TemplateEnv) *aiAction { return &aiAction{ PlaybookID: playbookID, RunID: runID, + ActionID: actionID, TemplateEnv: templateEnv, } } @@ -288,6 +291,7 @@ func (t *aiAction) triggerPlaybookRun(ctx context.Context, contextProvider api.L eventProp := types.JSONStringMap{ "id": playbook.ID.String(), + "playbook_id": playbook.ID.String(), "parent_run_id": t.RunID.String(), "parameters": string(parametersJSON), } @@ -302,9 +306,10 @@ func (t *aiAction) triggerPlaybookRun(ctx context.Context, contextProvider api.L event := models.Event{ Name: api.EventPlaybookRun, + EventID: uuid.NewSHA1(t.ActionID, []byte(playbook.ID.String())), Properties: eventProp, } - if err := ctx.DB().Create(&event).Error; err != nil { + if err := ctx.DB().Clauses(events.EventQueueOnConflictClause).Create(&event).Error; err != nil { return fmt.Errorf("failed to create run: %w", err) } diff --git a/playbook/events.go b/playbook/events.go index 19d7e0ae9..9883d1dc1 100644 --- a/playbook/events.go +++ b/playbook/events.go @@ -9,7 +9,6 @@ import ( "github.com/flanksource/commons/collections" "github.com/flanksource/commons/logger" - "github.com/flanksource/duty" dutyAPI "github.com/flanksource/duty/api" "github.com/flanksource/duty/context" "github.com/flanksource/duty/models" @@ -69,8 +68,8 @@ func init() { func RegisterEvents(ctx context.Context) { EventRing = events.NewEventRing(ctx.Properties().Int("events.audit.size", events.DefaultEventLogSize)) - nh := playbookScheduler{Ring: EventRing} - events.RegisterSyncHandler(nh.Handle, api.EventStatusGroup...) + ps := playbookScheduler{Ring: EventRing} + events.RegisterSyncHandler(ps.Handle, api.EventStatusGroup...) events.RegisterSyncHandler(onNewRun, api.EventPlaybookRun) events.RegisterSyncHandler(onApprovalUpdated, api.EventPlaybookSpecApprovalUpdated) @@ -81,36 +80,6 @@ func RegisterEvents(ctx context.Context) { }() } -type EventResource struct { - Component *models.Component `json:"component,omitempty"` - Config *models.ConfigItem `json:"config,omitempty"` - Check *models.Check `json:"check,omitempty"` - CheckSummary *models.CheckSummary `json:"check_summary,omitempty"` - Canary *models.Canary `json:"canary,omitempty"` -} - -func (t *EventResource) AsMap() map[string]any { - output := map[string]any{} - - if t.Component != nil { - output["component"] = t.Component.AsMap() - } - if t.Config != nil { - output["config"] = t.Config.AsMap() - } - if t.Check != nil { - output["check"] = t.Check.AsMap() - } - if t.Canary != nil { - output["canary"] = t.Canary.AsMap() - } - if t.CheckSummary != nil { - output["check_summary"] = t.CheckSummary.AsMap() - } - - return output -} - type playbookScheduler struct { Ring *events.EventRing } @@ -130,38 +99,9 @@ func (t *playbookScheduler) Handle(ctx context.Context, event models.Event) erro return nil } - var eventResource EventResource - switch event.Name { - case api.EventCheckFailed, api.EventCheckPassed: - checkID := event.Properties["id"] - if err := ctx.DB().Where("id = ?", checkID).First(&eventResource.Check).Error; err != nil { - return dutyAPI.Errorf(dutyAPI.ENOTFOUND, "check(id=%s) not found", checkID) - } - - if summary, err := duty.CheckSummary(ctx, checkID); err != nil { - return err - } else if summary != nil { - eventResource.CheckSummary = summary - } - - if err := ctx.DB().Where("id = ?", eventResource.Check.CanaryID).First(&eventResource.Canary).Error; err != nil { - return dutyAPI.Errorf(dutyAPI.ENOTFOUND, "canary(id=%s) not found", eventResource.Check.CanaryID) - } - - case api.EventComponentHealthy, api.EventComponentUnhealthy, api.EventComponentWarning, api.EventComponentUnknown: - if err := ctx.DB().Model(&models.Component{}).Where("id = ?", event.Properties["id"]).First(&eventResource.Component).Error; err != nil { - return dutyAPI.Errorf(dutyAPI.ENOTFOUND, "component(id=%s) not found", event.Properties["id"]) - } - - case api.EventConfigHealthy, api.EventConfigUnhealthy, api.EventConfigWarning, api.EventConfigUnknown, api.EventConfigDegraded: - if err := ctx.DB().Model(&models.ConfigItem{}).Where("id = ?", event.Properties["id"]).First(&eventResource.Config).Error; err != nil { - return dutyAPI.Errorf(dutyAPI.ENOTFOUND, "config(id=%s) not found", event.Properties["id"]) - } - - case api.EventConfigCreated, api.EventConfigUpdated, api.EventConfigDeleted, api.EventConfigChanged: - if err := ctx.DB().Model(&models.ConfigItem{}).Where("id = ?", event.Properties["id"]).First(&eventResource.Config).Error; err != nil { - return dutyAPI.Errorf(dutyAPI.ENOTFOUND, "config(id=%s) not found", event.Properties["id"]) - } + eventResource, err := events.BuildEventResource(ctx, event) + if err != nil { + return err } for _, p := range playbooks { @@ -277,7 +217,13 @@ const ( ) func onNewRun(ctx context.Context, event models.Event) error { - var playbookID = event.Properties["id"] + playbookID := event.Properties["playbook_id"] + if playbookID == "" { + playbookID = event.Properties["id"] + } + if playbookID == "" { + playbookID = event.EventID.String() + } // What triggered the run? // Must be either a notification, or a playbook run. @@ -428,7 +374,7 @@ func onNewRun(ctx context.Context, event models.Event) error { } func onApprovalUpdated(ctx context.Context, event models.Event) error { - playbookID := event.Properties["id"] + playbookID := event.EventID.String() var playbook models.Playbook if err := ctx.DB().Where("id = ?", playbookID).First(&playbook).Error; err != nil { diff --git a/playbook/events_test.go b/playbook/events_test.go index a98750eb1..8e6b4e060 100644 --- a/playbook/events_test.go +++ b/playbook/events_test.go @@ -7,13 +7,14 @@ import ( "github.com/flanksource/duty/models" "github.com/flanksource/duty/tests/fixtures/dummy" "github.com/flanksource/duty/types" - v1 "github.com/flanksource/incident-commander/api/v1" - "github.com/flanksource/incident-commander/events" "github.com/google/uuid" ginkgo "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/samber/lo" "gorm.io/gorm/clause" + + v1 "github.com/flanksource/incident-commander/api/v1" + "github.com/flanksource/incident-commander/events" ) var _ = ginkgo.Describe("Playbook Events", ginkgo.Ordered, func() { @@ -198,7 +199,7 @@ var _ = ginkgo.Describe("Match Resource", func() { ginkgo.It("should match resources", func() { type args struct { labels map[string]string - eventResource EventResource + eventResource events.EventResource matchFilters []v1.PlaybookTriggerEvent } tests := []struct { @@ -213,7 +214,7 @@ var _ = ginkgo.Describe("Match Resource", func() { labels: map[string]string{ "telemetry": "enabled", }, - eventResource: EventResource{ + eventResource: events.EventResource{ Component: &models.Component{ Type: "Entity", }, @@ -226,7 +227,7 @@ var _ = ginkgo.Describe("Match Resource", func() { { name: "With Filter | Without Labels | No match", args: args{ - eventResource: EventResource{ + eventResource: events.EventResource{ Component: &models.Component{ Type: "Database", }, @@ -242,7 +243,7 @@ var _ = ginkgo.Describe("Match Resource", func() { labels: map[string]string{ "telemetry": "enabled", }, - eventResource: EventResource{}, + eventResource: events.EventResource{}, matchFilters: []v1.PlaybookTriggerEvent{ { Labels: map[string]string{ @@ -260,7 +261,7 @@ var _ = ginkgo.Describe("Match Resource", func() { labels: map[string]string{ "telemetry": "enabled", }, - eventResource: EventResource{}, + eventResource: events.EventResource{}, matchFilters: []v1.PlaybookTriggerEvent{ { Labels: map[string]string{ @@ -279,7 +280,7 @@ var _ = ginkgo.Describe("Match Resource", func() { labels: map[string]string{ "telemetry": "enabled", }, - eventResource: EventResource{ + eventResource: events.EventResource{ Check: &models.Check{ Type: "http", }, @@ -302,7 +303,7 @@ var _ = ginkgo.Describe("Match Resource", func() { labels: map[string]string{ "telemetry": "enabled", }, - eventResource: EventResource{ + eventResource: events.EventResource{ Check: &models.Check{ Type: "http", }, @@ -325,7 +326,7 @@ var _ = ginkgo.Describe("Match Resource", func() { labels: map[string]string{ "telemetry": "enabled", }, - eventResource: EventResource{ + eventResource: events.EventResource{ Check: &models.Check{ Type: "http", }, @@ -352,7 +353,7 @@ var _ = ginkgo.Describe("Match Resource", func() { { name: "Invalid filter expression", args: args{ - eventResource: EventResource{ + eventResource: events.EventResource{ Check: &models.Check{ Type: "http", }, @@ -372,7 +373,7 @@ var _ = ginkgo.Describe("Match Resource", func() { { name: "Expression not returning boolean", args: args{ - eventResource: EventResource{ + eventResource: events.EventResource{ Check: &models.Check{ Type: "http", }, diff --git a/playbook/runner/exec.go b/playbook/runner/exec.go index 5dbf035f1..0979c7aab 100644 --- a/playbook/runner/exec.go +++ b/playbook/runner/exec.go @@ -67,7 +67,7 @@ func executeAction(ctx context.Context, playbookID any, runID uuid.UUID, runActi var e actions.ExecAction result, err = e.Run(ctx, *actionSpec.Exec) } else if actionSpec.AI != nil { - e := actions.NewAIAction(stringOrUuid(playbookID), runID, templateEnv) + e := actions.NewAIAction(stringOrUuid(playbookID), runID, runAction.ID, templateEnv) result, err = e.Run(ctx, *actionSpec.AI) } else if actionSpec.HTTP != nil { var e actions.HTTP diff --git a/playbook/scheduler_test.go b/playbook/scheduler_test.go new file mode 100644 index 000000000..ca626874a --- /dev/null +++ b/playbook/scheduler_test.go @@ -0,0 +1,498 @@ +package playbook + +import ( + "time" + + "github.com/flanksource/duty/context" + "github.com/flanksource/duty/models" + "github.com/flanksource/duty/tests/setup" + "github.com/google/uuid" + ginkgo "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gstruct" + "github.com/samber/lo" + + "github.com/flanksource/incident-commander/api" + pkgEvents "github.com/flanksource/incident-commander/events" +) + +type TestResource struct { + Check *models.Check + Canary *models.Canary + Config *models.ConfigItem + Component *models.Component +} + +type TestCase struct { + Name string + Resources TestResource + DatabaseChange func(context.Context, TestResource) error + IgnoreEvents []string + ExpectedEvents []string + ExpectedEventResource func(TestResource) pkgEvents.EventResource +} + +var _ = ginkgo.Describe("Playbook Scheduler EventResource Generation", ginkgo.Ordered, func() { + var ctx context.Context + var cleanup func() + + ginkgo.BeforeAll(func() { + // Create an isolated database so that other tests do not interfere with this one + var err error + isolatedContextPtr, cleanupFunc, err := setup.NewDB(DefaultContext, "playbook_scheduler_test") + Expect(err).NotTo(HaveOccurred()) + + ctx = *isolatedContextPtr + cleanup = cleanupFunc + }) + + ginkgo.AfterAll(func() { + if cleanup != nil { + cleanup() + } + }) + + ginkgo.AfterEach(func() { + // Clear event queue after each test to ensure isolation + err := ctx.DB().Exec("DELETE FROM event_queue").Error + Expect(err).NotTo(HaveOccurred()) + }) + + testEvent := func(tc TestCase) { + ginkgo.It(tc.Name, func() { + // Create test resources + if tc.Resources.Canary != nil { + err := ctx.DB().Create(tc.Resources.Canary).Error + Expect(err).NotTo(HaveOccurred()) + ginkgo.DeferCleanup(func() { + ctx.DB().Delete(tc.Resources.Canary) + }) + } + + if tc.Resources.Check != nil { + tc.Resources.Check.CanaryID = tc.Resources.Canary.ID + err := ctx.DB().Create(tc.Resources.Check).Error + Expect(err).NotTo(HaveOccurred()) + ginkgo.DeferCleanup(func() { + ctx.DB().Delete(tc.Resources.Check) + }) + } + + if tc.Resources.Config != nil { + err := ctx.DB().Create(tc.Resources.Config).Error + Expect(err).NotTo(HaveOccurred()) + ginkgo.DeferCleanup(func() { + ctx.DB().Delete(tc.Resources.Config) + }) + } + + if tc.Resources.Component != nil { + err := ctx.DB().Create(tc.Resources.Component).Error + Expect(err).NotTo(HaveOccurred()) + ginkgo.DeferCleanup(func() { + ctx.DB().Delete(tc.Resources.Component) + }) + } + + // Make database change to trigger event + err := tc.DatabaseChange(ctx, tc.Resources) + Expect(err).NotTo(HaveOccurred()) + + var events []models.Event + query := ctx.DB().Order("created_at ASC") + if len(tc.IgnoreEvents) > 0 { + query = query.Where("name NOT IN ?", tc.IgnoreEvents) + } + Expect(query.Find(&events).Error).NotTo(HaveOccurred()) + Expect(events).To(HaveLen(len(tc.ExpectedEvents)), "Should have events %v", tc.ExpectedEvents) + + eventNames := lo.Map(events, func(event models.Event, _ int) string { + return event.Name + }) + Expect(eventNames).To(ConsistOf(tc.ExpectedEvents)) + + for _, event := range events { + actualEventResource, err := pkgEvents.BuildEventResource(ctx, event) + Expect(err).NotTo(HaveOccurred()) + + expectedEventResource := tc.ExpectedEventResource(tc.Resources) + if expectedEventResource.Check != nil { + Expect(actualEventResource.Check).To(gstruct.PointTo(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "ID": Equal(expectedEventResource.Check.ID), + "Name": Equal(expectedEventResource.Check.Name), + "Type": Equal(expectedEventResource.Check.Type), + "CanaryID": Equal(expectedEventResource.Check.CanaryID), + }))) + Expect(actualEventResource.CheckSummary).NotTo(BeNil()) + } + if expectedEventResource.Config != nil { + Expect(actualEventResource.Config).To(gstruct.PointTo(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "ID": Equal(expectedEventResource.Config.ID), + "Name": Equal(expectedEventResource.Config.Name), + "ConfigClass": Equal(expectedEventResource.Config.ConfigClass), + }))) + } + if expectedEventResource.Component != nil { + Expect(actualEventResource.Component).To(gstruct.PointTo(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "ID": Equal(expectedEventResource.Component.ID), + "Name": Equal(expectedEventResource.Component.Name), + "Type": Equal(expectedEventResource.Component.Type), + }))) + } + if expectedEventResource.Canary != nil { + Expect(actualEventResource.Canary).To(gstruct.PointTo(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "ID": Equal(expectedEventResource.Canary.ID), + "Name": Equal(expectedEventResource.Canary.Name), + }))) + } + } + }) + } + + var _ = ginkgo.Describe("Check Events", ginkgo.Ordered, func() { + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered check.passed event", + Resources: TestResource{ + Canary: &models.Canary{ + ID: uuid.New(), + Name: "test-canary-passed", + Spec: []byte(`{"http": [{"name": "test", "url": "http://example.com"}]}`), + }, + Check: &models.Check{ + ID: uuid.New(), + CanaryID: uuid.New(), + Name: "test-check-passed", + Type: "http", + Status: models.CheckStatusUnhealthy, // Start as unhealthy + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + res.Check.CanaryID = res.Canary.ID + Expect(ctx.DB().Save(res.Check).Error).NotTo(HaveOccurred()) + return ctx.DB().Model(res.Check).UpdateColumn("status", models.CheckStatusHealthy).Error + }, + ExpectedEvents: []string{api.EventCheckPassed}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Check: res.Check, + Canary: res.Canary, + } + }, + }) + + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered check.failed event", + Resources: TestResource{ + Canary: &models.Canary{ + ID: uuid.New(), + Name: "test-canary-failed", + Spec: []byte(`{"http": [{"name": "test", "url": "http://example.com"}]}`), + }, + Check: &models.Check{ + ID: uuid.New(), + Name: "test-check-failed", + Type: "http", + Status: models.CheckStatusHealthy, // Start as healthy + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + return ctx.DB().Model(res.Check).UpdateColumn("status", models.CheckStatusUnhealthy).Error + }, + ExpectedEvents: []string{api.EventCheckFailed}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Check: res.Check, + Canary: res.Canary, + } + }, + }) + }) + + var _ = ginkgo.Describe("Component Events", func() { + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered component.unhealthy event", + Resources: TestResource{ + Component: &models.Component{ + ID: uuid.New(), + Name: "test-component-unhealthy", + Type: "Entity", + Health: lo.ToPtr(models.HealthHealthy), // Start as healthy + Labels: map[string]string{"telemetry": "enabled"}, + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + return ctx.DB().Model(res.Component).UpdateColumn("health", models.HealthUnhealthy).Error + }, + ExpectedEvents: []string{api.EventComponentUnhealthy}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Component: res.Component, + } + }, + }) + + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered component.healthy event", + Resources: TestResource{ + Component: &models.Component{ + ID: uuid.New(), + Name: "test-component-healthy", + Type: "Entity", + Health: lo.ToPtr(models.HealthUnhealthy), // Start as unhealthy + Labels: map[string]string{"telemetry": "enabled"}, + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + return ctx.DB().Model(res.Component).UpdateColumn("health", models.HealthHealthy).Error + }, + ExpectedEvents: []string{api.EventComponentHealthy}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Component: res.Component, + } + }, + }) + + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered component.warning event", + Resources: TestResource{ + Component: &models.Component{ + ID: uuid.New(), + Name: "test-component-warning", + Type: "Entity", + Health: lo.ToPtr(models.HealthHealthy), // Start as healthy + Labels: map[string]string{"telemetry": "enabled"}, + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + return ctx.DB().Model(res.Component).UpdateColumn("health", models.HealthWarning).Error + }, + ExpectedEvents: []string{api.EventComponentWarning}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Component: res.Component, + } + }, + }) + + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered component.unknown event", + Resources: TestResource{ + Component: &models.Component{ + ID: uuid.New(), + Name: "test-component-unknown", + Type: "Entity", + Health: lo.ToPtr(models.HealthHealthy), // Start as healthy + Labels: map[string]string{"telemetry": "enabled"}, + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + return ctx.DB().Model(res.Component).UpdateColumn("health", "").Error + }, + ExpectedEvents: []string{api.EventComponentUnknown}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Component: res.Component, + } + }, + }) + }) + + var _ = ginkgo.Describe("Config Health Events", func() { + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered config.unhealthy event", + Resources: TestResource{ + Config: &models.ConfigItem{ + ID: uuid.New(), + Name: lo.ToPtr("test-config-unhealthy"), + ConfigClass: models.ConfigClassDeployment, + Type: lo.ToPtr("Kubernetes::Deployment"), + Health: lo.ToPtr(models.HealthHealthy), // Start as healthy + Tags: map[string]string{"env": "test"}, + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + return ctx.DB().Model(res.Config).UpdateColumn("health", models.HealthUnhealthy).Error + }, + IgnoreEvents: []string{api.EventConfigChanged, api.EventConfigCreated}, + ExpectedEvents: []string{api.EventConfigUnhealthy}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Config: res.Config, + } + }, + }) + + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered config.healthy event", + Resources: TestResource{ + Config: &models.ConfigItem{ + ID: uuid.New(), + Name: lo.ToPtr("test-config-healthy"), + ConfigClass: models.ConfigClassDeployment, + Type: lo.ToPtr("Kubernetes::Deployment"), + Health: lo.ToPtr(models.HealthUnhealthy), // Start as unhealthy + Tags: map[string]string{"env": "test"}, + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + return ctx.DB().Model(res.Config).UpdateColumn("health", models.HealthHealthy).Error + }, + IgnoreEvents: []string{api.EventConfigChanged, api.EventConfigCreated, api.EventConfigUnhealthy}, + ExpectedEvents: []string{api.EventConfigHealthy}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Config: res.Config, + } + }, + }) + + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered config.warning event", + Resources: TestResource{ + Config: &models.ConfigItem{ + ID: uuid.New(), + Name: lo.ToPtr("test-config-warning"), + ConfigClass: models.ConfigClassDeployment, + Type: lo.ToPtr("Kubernetes::Deployment"), + Health: lo.ToPtr(models.HealthHealthy), // Start as healthy + Tags: map[string]string{"env": "test"}, + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + return ctx.DB().Model(res.Config).UpdateColumn("health", models.HealthWarning).Error + }, + IgnoreEvents: []string{api.EventConfigCreated, api.EventConfigChanged}, + ExpectedEvents: []string{api.EventConfigWarning}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Config: res.Config, + } + }, + }) + + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered config.degraded event", + Resources: TestResource{ + Config: &models.ConfigItem{ + ID: uuid.New(), + Name: lo.ToPtr("test-config-degraded"), + ConfigClass: models.ConfigClassDeployment, + Type: lo.ToPtr("Kubernetes::Deployment"), + Health: lo.ToPtr(models.HealthUnhealthy), // Start as unhealthy + Tags: map[string]string{"env": "test"}, + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + // unhealthy -> warning triggers degraded event + return ctx.DB().Model(res.Config).UpdateColumn("health", models.HealthWarning).Error + }, + IgnoreEvents: []string{api.EventConfigCreated, api.EventConfigChanged, api.EventConfigUnhealthy}, + ExpectedEvents: []string{api.EventConfigDegraded}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Config: res.Config, + } + }, + }) + + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered config.unknown event", + Resources: TestResource{ + Config: &models.ConfigItem{ + ID: uuid.New(), + Name: lo.ToPtr("test-config-unknown"), + ConfigClass: models.ConfigClassDeployment, + Type: lo.ToPtr("Kubernetes::Deployment"), + Health: lo.ToPtr(models.HealthHealthy), // Start as healthy + Tags: map[string]string{"env": "test"}, + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + return ctx.DB().Model(res.Config).UpdateColumn("health", "").Error + }, + IgnoreEvents: []string{api.EventConfigCreated, api.EventConfigChanged}, + ExpectedEvents: []string{api.EventConfigUnknown}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Config: res.Config, + } + }, + }) + }) + + var _ = ginkgo.Describe("Config Lifecycle Events", func() { + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered config.created event", + Resources: TestResource{ + Config: &models.ConfigItem{ + ID: uuid.New(), + Name: lo.ToPtr("test-config-created"), + ConfigClass: models.ConfigClassPod, + Type: lo.ToPtr("Kubernetes::Pod"), + Tags: map[string]string{"test": "created"}, + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + return nil // do nothing + }, + ExpectedEvents: []string{api.EventConfigCreated}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Config: res.Config, + } + }, + }) + + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered config.changed event", + Resources: TestResource{ + Config: &models.ConfigItem{ + ID: uuid.New(), + Name: lo.ToPtr("test-config-updated"), + ConfigClass: models.ConfigClassDeployment, + Type: lo.ToPtr("Kubernetes::Deployment"), + Tags: map[string]string{"test": "updated"}, + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + change := models.ConfigChange{ + ID: uuid.New().String(), + ConfigID: res.Config.ID.String(), + ChangeType: "Scaling Up", + } + return ctx.DB().Create(&change).Error + }, + IgnoreEvents: []string{api.EventConfigCreated}, + ExpectedEvents: []string{api.EventConfigChanged}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Config: res.Config, + } + }, + }) + + testEvent(TestCase{ + Name: "should generate correct EventResource for organically triggered config.deleted event", + Resources: TestResource{ + Config: &models.ConfigItem{ + ID: uuid.New(), + Name: lo.ToPtr("test-config-deleted"), + ConfigClass: models.ConfigClassDeployment, + Type: lo.ToPtr("Kubernetes::Deployment"), + Tags: map[string]string{"test": "deleted"}, + }, + }, + DatabaseChange: func(ctx context.Context, res TestResource) error { + return ctx.DB().Model(res.Config).UpdateColumn("deleted_at", time.Now()).Error + }, + IgnoreEvents: []string{api.EventConfigCreated}, + ExpectedEvents: []string{api.EventConfigDeleted}, + ExpectedEventResource: func(res TestResource) pkgEvents.EventResource { + return pkgEvents.EventResource{ + Config: res.Config, + } + }, + }) + }) +})