Skip to content

Commit

Permalink
Merge branch 'master' into feat.fetchRemoteSchemaAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Dec 6, 2024
2 parents 5a45e90 + e46ce86 commit 59ae13c
Show file tree
Hide file tree
Showing 39 changed files with 806 additions and 411 deletions.
2 changes: 1 addition & 1 deletion app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
if err != nil {
return fmt.Errorf("could not run tracked users database migration: %w", err)
}
reporting := a.app.Features().Reporting.Setup(ctx, backendconfig.DefaultBackendConfig)
reporting := a.app.Features().Reporting.Setup(ctx, config, backendconfig.DefaultBackendConfig)
defer reporting.Stop()
syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config, "reporting")})
g.Go(func() error {
Expand Down
2 changes: 1 addition & 1 deletion app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
return fmt.Errorf("could not run tracked users database migration: %w", err)
}

reporting := a.app.Features().Reporting.Setup(ctx, backendconfig.DefaultBackendConfig)
reporting := a.app.Features().Reporting.Setup(ctx, config, backendconfig.DefaultBackendConfig)
defer reporting.Stop()
syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config, "reporting")})
g.Go(crash.Wrapper(func() error {
Expand Down
2 changes: 1 addition & 1 deletion app/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Reporting Feature

// ReportingFeature handles reporting statuses / errors to reporting service
type ReportingFeature interface {
Setup(cxt context.Context, backendConfig backendconfig.BackendConfig) types.Reporting
Setup(cxt context.Context, conf *config.Config, backendConfig backendconfig.BackendConfig) types.Reporting
}

// Features contains optional implementations of Enterprise only features.
Expand Down
4 changes: 2 additions & 2 deletions enterprise/reporting/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Mediator struct {
cronRunners []flusher.Runner
}

func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToken string, backendConfig backendconfig.BackendConfig) *Mediator {
func NewReportingMediator(ctx context.Context, conf *config.Config, log logger.Logger, enterpriseToken string, backendConfig backendconfig.BackendConfig) *Mediator {
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)

Expand All @@ -58,7 +58,7 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke
})

// default reporting implementation
defaultReporter := NewDefaultReporter(rm.ctx, rm.log, configSubscriber, rm.stats)
defaultReporter := NewDefaultReporter(rm.ctx, conf, rm.log, configSubscriber, rm.stats)
rm.reporters = append(rm.reporters, defaultReporter)

// error reporting implementation
Expand Down
13 changes: 7 additions & 6 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ type DefaultReporter struct {
getReportsQueryTime stats.Measurement
requestLatency stats.Measurement
stats stats.Stats
maxReportsCountInARequest config.ValueLoader[int]
}

func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber *configSubscriber, stats stats.Stats) *DefaultReporter {
func NewDefaultReporter(ctx context.Context, conf *config.Config, log logger.Logger, configSubscriber *configSubscriber, stats stats.Stats) *DefaultReporter {
var dbQueryTimeout *config.Reloadable[time.Duration]

reportingServiceURL := config.GetString("REPORTING_URL", "https://reporting.rudderstack.com/")
Expand All @@ -88,6 +89,7 @@ func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber
maxConcurrentRequests := config.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests")
maxOpenConnections := config.GetIntVar(32, 1, "Reporting.maxOpenConnections")
dbQueryTimeout = config.GetReloadableDurationVar(60, time.Second, "Reporting.dbQueryTimeout")
maxReportsCountInARequest := conf.GetReloadableIntVar(10, 1, "Reporting.maxReportsCountInARequest")
// only send reports for wh actions sources if whActionsOnly is configured
whActionsOnly := config.GetBool("REPORTING_WH_ACTIONS_ONLY", false)
if whActionsOnly {
Expand All @@ -114,6 +116,7 @@ func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber
maxOpenConnections: maxOpenConnections,
maxConcurrentRequests: maxConcurrentRequests,
dbQueryTimeout: dbQueryTimeout,
maxReportsCountInARequest: maxReportsCountInARequest,
stats: stats,
}
}
Expand Down Expand Up @@ -265,8 +268,9 @@ func (r *DefaultReporter) getReports(currentMs int64, syncerKey string) (reports
return metricReports, queryMin.Int64, err
}

func (*DefaultReporter) getAggregatedReports(reports []*types.ReportByStatus) []*types.Metric {
func (r *DefaultReporter) getAggregatedReports(reports []*types.ReportByStatus) []*types.Metric {
metricsByGroup := map[string]*types.Metric{}
maxReportsCountInARequest := r.maxReportsCountInARequest.Load()
var values []*types.Metric

reportIdentifier := func(report *types.ReportByStatus) string {
Expand All @@ -282,16 +286,13 @@ func (*DefaultReporter) getAggregatedReports(reports []*types.ReportByStatus) []
report.ConnectionDetails.TrackingPlanID,
strconv.Itoa(report.ConnectionDetails.TrackingPlanVersion),
report.PUDetails.InPU, report.PUDetails.PU,
report.StatusDetail.Status,
strconv.Itoa(report.StatusDetail.StatusCode),
report.StatusDetail.EventName, report.StatusDetail.EventType,
}
return strings.Join(groupingIdentifiers, `::`)
}

for _, report := range reports {
identifier := reportIdentifier(report)
if _, ok := metricsByGroup[identifier]; !ok {
if _, ok := metricsByGroup[identifier]; !ok || len(metricsByGroup[identifier].StatusDetails) >= maxReportsCountInARequest {
metricsByGroup[identifier] = &types.Metric{
InstanceDetails: types.InstanceDetails{
WorkspaceID: report.WorkspaceID,
Expand Down
Loading

0 comments on commit 59ae13c

Please sign in to comment.