From 02ef2f182a72f68a3546ac0f30900b7f3de1024c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Wilczy=C5=84ski?= Date: Fri, 10 Jan 2025 13:53:55 +0100 Subject: [PATCH] [ENG-2343] Stream attempts metric --- monitor/census.go | 13 +++++++++++++ server/ai_process.go | 3 +++ 2 files changed, 16 insertions(+) diff --git a/monitor/census.go b/monitor/census.go index e5587c0e3..5c8fd9bdc 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -206,6 +206,7 @@ type ( mAIResultSaveFailed *stats.Int64Measure mAICurrentLivePipelines *stats.Int64Measure mAIFirstSegmentDelay *stats.Int64Measure + mAILiveAttempts *stats.Int64Measure lock sync.Mutex emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo @@ -377,6 +378,7 @@ func InitCensus(nodeType NodeType, version string) { census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot") census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot") census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay_ms", "Delay of the first live AI segment being processed", "ms") + census.mAILiveAttempts = stats.Int64("ai_live_attempts", "AI Live stream attempted", "tot") glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version()) glog.Infof("Livepeer version: %s", version) @@ -991,6 +993,13 @@ func InitCensus(nodeType NodeType, version string) { TagKeys: baseTagsWithOrchInfo, Aggregation: view.Distribution(0, .10, .20, .50, .100, .150, .200, .500, .1000, .5000, 10.000), }, + { + Name: "ai_live_attempt", + Measure: census.mAILiveAttempts, + Description: "AI Live stream attempted", + TagKeys: baseTags, + Aggregation: view.Count(), + }, } // Register the views @@ -1992,6 +2001,10 @@ func AIFirstSegmentDelay(delayMs int64, orchInfo *lpnet.OrchestratorInfo) { } } +func AILiveVideoAttempt() { + stats.Record(census.ctx, census.mAILiveAttempts.M(1)) +} + // Convert wei to gwei func wei2gwei(wei *big.Int) float64 { gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64() diff --git a/server/ai_process.go b/server/ai_process.go index 384cadf20..d7197b101 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -1027,6 +1027,9 @@ const initPixelsToPay = 30 * 30 * 3200 * 1800 // 30 seconds, 30fps, 1800p func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *AISession, req worker.GenLiveVideoToVideoJSONRequestBody) (any, error) { startTime := time.Now() + if monitor.Enabled { + monitor.AILiveVideoAttempt() + } // Live Video should not reuse the existing session balance, because it could lead to not sending the init // payment, which in turns may cause "Insufficient Balance" on the Orchestrator's side. // It works differently than other AI Jobs, because Live Video is accounted by mid on the Orchestrator's side.