Skip to content

Commit

Permalink
Merge pull request #1404 from openmeterio/fix/fix-infinite-loop-proce…
Browse files Browse the repository at this point in the history
…ssing-events

fix: infinite loop processing events
  • Loading branch information
turip authored Aug 21, 2024
2 parents 3be872d + bceb056 commit 09c5c34
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 84 deletions.
7 changes: 4 additions & 3 deletions cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,10 @@ func main() {
IngestEventsTopic: conf.Events.IngestEvents.Topic,

Router: router.Options{
Subscriber: wmSubscriber,
Publisher: eventPublisherDriver,
Logger: logger,
Subscriber: wmSubscriber,
Publisher: eventPublisherDriver,
Logger: logger,
MetricMeter: metricMeter,

Config: conf.BalanceWorker.ConsumerConfiguration,
},
Expand Down
7 changes: 4 additions & 3 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,10 @@ func main() {
consumerOptions := consumer.Options{
SystemEventsTopic: conf.Events.SystemEvents.Topic,
Router: router.Options{
Subscriber: wmSubscriber,
Publisher: eventPublisherDriver,
Logger: logger,
Subscriber: wmSubscriber,
Publisher: eventPublisherDriver,
Logger: logger,
MetricMeter: metricMeter,

Config: conf.Notification.Consumer,
},
Expand Down
143 changes: 75 additions & 68 deletions internal/entitlement/balanceworker/entitlementhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,7 @@ import (
"github.com/openmeterio/openmeter/pkg/convert"
)

func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent entitlement.EntitlementDeletedEvent) (marshaler.Event, error) {
namespace := delEvent.Namespace.ID

feature, err := w.entitlement.Feature.GetFeature(ctx, namespace, delEvent.FeatureID, productcatalog.IncludeArchivedFeatureTrue)
if err != nil {
return nil, fmt.Errorf("failed to get feature: %w", err)
}

subject := models.Subject{
Key: delEvent.SubjectKey,
}

if w.opts.SubjectResolver != nil {
subject, err = w.opts.SubjectResolver.GetSubjectByKey(ctx, namespace, delEvent.SubjectKey)
if err != nil {
return nil, fmt.Errorf("failed to get subject: %w", err)
}
}

calculationTime := time.Now()

event := marshaler.WithSource(
metadata.ComposeResourcePath(namespace, metadata.EntityEntitlement, delEvent.ID),
snapshot.SnapshotEvent{
Entitlement: delEvent.Entitlement,
Namespace: models.NamespaceID{
ID: namespace,
},
Subject: subject,
Feature: *feature,
Operation: snapshot.ValueOperationDelete,

CalculatedAt: convert.ToPointer(calculationTime),

CurrentUsagePeriod: delEvent.CurrentUsagePeriod,
},
)

_ = w.highWatermarkCache.Add(delEvent.ID, highWatermarkCacheEntry{
HighWatermark: calculationTime.Add(-defaultClockDrift),
IsDeleted: true,
})

return event, nil
}

func (w *Worker) handleEntitlementUpdateEvent(ctx context.Context, entitlementID NamespacedID, source string) (marshaler.Event, error) {
func (w *Worker) handleEntitlementEvent(ctx context.Context, entitlementID NamespacedID, source string) (marshaler.Event, error) {
calculatedAt := time.Now()

if entry, ok := w.highWatermarkCache.Get(entitlementID.ID); ok {
Expand All @@ -70,19 +24,6 @@ func (w *Worker) handleEntitlementUpdateEvent(ctx context.Context, entitlementID
}
}

snapshot, err := w.createSnapshotEvent(ctx, entitlementID, source, calculatedAt)
if err != nil {
return nil, fmt.Errorf("failed to create entitlement update snapshot event: %w", err)
}

_ = w.highWatermarkCache.Add(entitlementID.ID, highWatermarkCacheEntry{
HighWatermark: calculatedAt.Add(-defaultClockDrift),
})

return snapshot, nil
}

func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID NamespacedID, source string, calculatedAt time.Time) (marshaler.Event, error) {
entitlements, err := w.entitlement.Entitlement.ListEntitlements(ctx, entitlement.ListEntitlementsParams{
Namespaces: []string{entitlementID.Namespace},
IDs: []string{entitlementID.ID},
Expand All @@ -101,21 +42,48 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac
}

entitlementEntity := &entitlements.Items[0]

if entitlementEntity.DeletedAt != nil {
// entitlement got deleted while processing changes => let's create a delete event so that we are not working
// on entitlement updates that are not relevant anymore
return w.handleEntitlementDeleteEvent(ctx, entitlement.EntitlementDeletedEvent{
Entitlement: *entitlementEntity,
Namespace: models.NamespaceID{ID: entitlementID.Namespace},

snapshot, err := w.createDeletedSnapshotEvent(ctx,
entitlement.EntitlementDeletedEvent{
Entitlement: *entitlementEntity,
Namespace: models.NamespaceID{
ID: entitlementEntity.Namespace,
},
}, calculatedAt)
if err != nil {
return nil, fmt.Errorf("failed to create entitlement delete snapshot event: %w", err)
}

_ = w.highWatermarkCache.Add(entitlementID.ID, highWatermarkCacheEntry{
HighWatermark: calculatedAt.Add(-defaultClockDrift),
IsDeleted: true,
})

return snapshot, nil
}

feature, err := w.entitlement.Feature.GetFeature(ctx, entitlementID.Namespace, entitlementEntity.FeatureID, productcatalog.IncludeArchivedFeatureTrue)
snapshot, err := w.createSnapshotEvent(ctx, entitlementEntity, source, calculatedAt)
if err != nil {
return nil, fmt.Errorf("failed to create entitlement update snapshot event: %w", err)
}

_ = w.highWatermarkCache.Add(entitlementID.ID, highWatermarkCacheEntry{
HighWatermark: calculatedAt.Add(-defaultClockDrift),
})

return snapshot, nil
}

func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementEntity *entitlement.Entitlement, source string, calculatedAt time.Time) (marshaler.Event, error) {
feature, err := w.entitlement.Feature.GetFeature(ctx, entitlementEntity.Namespace, entitlementEntity.FeatureID, productcatalog.IncludeArchivedFeatureTrue)
if err != nil {
return nil, fmt.Errorf("failed to get feature: %w", err)
}

value, err := w.entitlement.Entitlement.GetEntitlementValue(ctx, entitlementID.Namespace, entitlementEntity.SubjectKey, entitlementEntity.ID, calculatedAt)
value, err := w.entitlement.Entitlement.GetEntitlementValue(ctx, entitlementEntity.Namespace, entitlementEntity.SubjectKey, entitlementEntity.ID, calculatedAt)
if err != nil {
return nil, fmt.Errorf("failed to get entitlement value: %w", err)
}
Expand All @@ -129,7 +97,7 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac
Key: entitlementEntity.SubjectKey,
}
if w.opts.SubjectResolver != nil {
subject, err = w.opts.SubjectResolver.GetSubjectByKey(ctx, entitlementID.Namespace, entitlementEntity.SubjectKey)
subject, err = w.opts.SubjectResolver.GetSubjectByKey(ctx, entitlementEntity.Namespace, entitlementEntity.SubjectKey)
if err != nil {
return nil, fmt.Errorf("failed to get subject ID: %w", err)
}
Expand All @@ -140,7 +108,7 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac
snapshot.SnapshotEvent{
Entitlement: *entitlementEntity,
Namespace: models.NamespaceID{
ID: entitlementID.Namespace,
ID: entitlementEntity.Namespace,
},
Subject: subject,
Feature: *feature,
Expand All @@ -155,3 +123,42 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac

return event, nil
}

func (w *Worker) createDeletedSnapshotEvent(ctx context.Context, delEvent entitlement.EntitlementDeletedEvent, calculationTime time.Time) (marshaler.Event, error) {
namespace := delEvent.Namespace.ID

feature, err := w.entitlement.Feature.GetFeature(ctx, namespace, delEvent.FeatureID, productcatalog.IncludeArchivedFeatureTrue)
if err != nil {
return nil, fmt.Errorf("failed to get feature: %w", err)
}

subject := models.Subject{
Key: delEvent.SubjectKey,
}

if w.opts.SubjectResolver != nil {
subject, err = w.opts.SubjectResolver.GetSubjectByKey(ctx, namespace, delEvent.SubjectKey)
if err != nil {
return nil, fmt.Errorf("failed to get subject: %w", err)
}
}

event := marshaler.WithSource(
metadata.ComposeResourcePath(namespace, metadata.EntityEntitlement, delEvent.ID),
snapshot.SnapshotEvent{
Entitlement: delEvent.Entitlement,
Namespace: models.NamespaceID{
ID: namespace,
},
Subject: subject,
Feature: *feature,
Operation: snapshot.ValueOperationDelete,

CalculatedAt: convert.ToPointer(calculationTime),

CurrentUsagePeriod: delEvent.CurrentUsagePeriod,
},
)

return event, nil
}
9 changes: 7 additions & 2 deletions internal/entitlement/balanceworker/ingesthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package balanceworker
import (
"context"
"errors"
"fmt"

"github.com/openmeterio/openmeter/internal/entitlement"
"github.com/openmeterio/openmeter/internal/event/metadata"
Expand All @@ -27,7 +28,7 @@ func (w *Worker) handleBatchedIngestEvent(ctx context.Context, event ingestevent
var handlingError error

for _, entitlement := range affectedEntitlements {
event, err := w.handleEntitlementUpdateEvent(
event, err := w.handleEntitlementEvent(
ctx,
NamespacedID{Namespace: entitlement.Namespace, ID: entitlement.EntitlementID},
metadata.ComposeResourcePath(entitlement.Namespace, metadata.EntityEvent),
Expand All @@ -39,10 +40,14 @@ func (w *Worker) handleBatchedIngestEvent(ctx context.Context, event ingestevent
}

if err := w.opts.EventBus.Publish(ctx, event); err != nil {
handlingError = errors.Join(handlingError, err)
handlingError = errors.Join(handlingError, fmt.Errorf("handling entitlement event for %s: %w", entitlement.EntitlementID, err))
}
}

if handlingError != nil {
w.opts.Logger.Error("error handling batched ingest event", "error", handlingError)
}

return handlingError
}

Expand Down
13 changes: 8 additions & 5 deletions internal/entitlement/balanceworker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (w *Worker) eventHandler() message.NoPublishHandlerFunc {
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *entitlement.EntitlementCreatedEvent) error {
return w.opts.EventBus.
WithContext(ctx).
PublishIfNoError(w.handleEntitlementUpdateEvent(
PublishIfNoError(w.handleEntitlementEvent(
ctx,
NamespacedID{Namespace: event.Namespace.ID, ID: event.ID},
metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, event.ID),
Expand All @@ -124,14 +124,17 @@ func (w *Worker) eventHandler() message.NoPublishHandlerFunc {
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *entitlement.EntitlementDeletedEvent) error {
return w.opts.EventBus.
WithContext(ctx).
PublishIfNoError(w.handleEntitlementDeleteEvent(ctx, *event))
PublishIfNoError(w.handleEntitlementEvent(ctx,
NamespacedID{Namespace: event.Namespace.ID, ID: event.ID},
metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, event.ID),
))
}),

// Grant created event
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *grant.CreatedEvent) error {
return w.opts.EventBus.
WithContext(ctx).
PublishIfNoError(w.handleEntitlementUpdateEvent(
PublishIfNoError(w.handleEntitlementEvent(
ctx,
NamespacedID{Namespace: event.Namespace.ID, ID: string(event.OwnerID)},
metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, string(event.OwnerID), metadata.EntityGrant, event.ID),
Expand All @@ -142,7 +145,7 @@ func (w *Worker) eventHandler() message.NoPublishHandlerFunc {
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *grant.VoidedEvent) error {
return w.opts.EventBus.
WithContext(ctx).
PublishIfNoError(w.handleEntitlementUpdateEvent(
PublishIfNoError(w.handleEntitlementEvent(
ctx,
NamespacedID{Namespace: event.Namespace.ID, ID: string(event.OwnerID)},
metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, string(event.OwnerID), metadata.EntityGrant, event.ID),
Expand All @@ -153,7 +156,7 @@ func (w *Worker) eventHandler() message.NoPublishHandlerFunc {
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *meteredentitlement.EntitlementResetEvent) error {
return w.opts.EventBus.
WithContext(ctx).
PublishIfNoError(w.handleEntitlementUpdateEvent(
PublishIfNoError(w.handleEntitlementEvent(
ctx,
NamespacedID{Namespace: event.Namespace.ID, ID: event.EntitlementID},
metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, event.EntitlementID),
Expand Down
5 changes: 5 additions & 0 deletions internal/watermill/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ type publisher struct {
}

func (p publisher) Publish(ctx context.Context, event marshaler.Event) error {
if event == nil {
// nil events are always ignored as the handler signifies that it doesn't want to publish anything
return nil
}

return p.eventBus.Publish(ctx, event)
}

Expand Down
59 changes: 59 additions & 0 deletions internal/watermill/router/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package router

import (
"fmt"
"log/slog"
"time"

"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel/metric"
)

const (
messageProcessingTimeMetricName = "message_processing_time"
messageProcessedCount = "message_processed_count"
messageProcessingErrorCount = "message_processing_error_count"
)

func Metrics(metricMeter metric.Meter, prefix string, log *slog.Logger) (func(message.HandlerFunc) message.HandlerFunc, error) {
messageProcessingTime, err := metricMeter.Float64Histogram(
fmt.Sprintf("%s.%s", prefix, messageProcessingTimeMetricName),
metric.WithDescription("Time spent processing a message"),
)
if err != nil {
return nil, err
}

messageProcessed, err := metricMeter.Int64Counter(
fmt.Sprintf("%s.%s", prefix, messageProcessedCount),
metric.WithDescription("Number of messages processed"),
)
if err != nil {
return nil, err
}

messageProcessingError, err := metricMeter.Int64Counter(
fmt.Sprintf("%s.%s", prefix, messageProcessingErrorCount),
metric.WithDescription("Number of messages that failed to process"),
)
if err != nil {
return nil, err
}

return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
start := time.Now()

resMsg, err := h(msg)
if err != nil {
log.Error("Failed to process message", "error", err, "message_metadata", msg.Metadata, "message_payload", string(msg.Payload))
messageProcessingError.Add(msg.Context(), 1)
return resMsg, err
}

messageProcessingTime.Record(msg.Context(), time.Since(start).Seconds())
messageProcessed.Add(msg.Context(), 1)
return resMsg, nil
}
}, nil
}
Loading

0 comments on commit 09c5c34

Please sign in to comment.