Skip to content

Commit

Permalink
[ENG-2343] Stream attempts metric
Browse files Browse the repository at this point in the history
  • Loading branch information
pwilczynskiclearcode committed Jan 10, 2025
1 parent b026313 commit 8afa4f1
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
13 changes: 13 additions & 0 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ type (
mAIResultSaveFailed *stats.Int64Measure
mAICurrentLivePipelines *stats.Int64Measure
mAIFirstSegmentDelay *stats.Int64Measure
mAILiveAttempts *stats.Int64Measure

lock sync.Mutex
emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo
Expand Down Expand Up @@ -377,6 +378,7 @@ func InitCensus(nodeType NodeType, version string) {
census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot")
census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot")
census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay_ms", "Delay of the first live AI segment being processed", "ms")
census.mAILiveAttempts = stats.Int64("ai_live_attempts", "AI Live stream attempted", "tot")

glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version())
glog.Infof("Livepeer version: %s", version)
Expand Down Expand Up @@ -991,6 +993,13 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: baseTagsWithOrchInfo,
Aggregation: view.Distribution(0, .10, .20, .50, .100, .150, .200, .500, .1000, .5000, 10.000),
},
{
Name: "ai_live_attempt",
Measure: census.mAILiveAttempts,
Description: "AI Live stream attempted",
TagKeys: baseTags,
Aggregation: view.Count(),
},
}

// Register the views
Expand Down Expand Up @@ -1992,6 +2001,10 @@ func AIFirstSegmentDelay(delayMs int64, orchInfo *lpnet.OrchestratorInfo) {
}
}

func AILiveVideoAttempt() {
stats.Record(census.ctx, census.mAILiveAttempts.M(1))
}

// Convert wei to gwei
func wei2gwei(wei *big.Int) float64 {
gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64()
Expand Down
4 changes: 3 additions & 1 deletion server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func aiHttpHandle[I any](h *lphttp, decoderFunc func(*I, *http.Request) error) h

func (h *lphttp) StartLiveVideoToVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

remoteAddr := getRemoteAddr(r)
ctx := clog.AddVal(r.Context(), clog.ClientIP, remoteAddr)
requestID := string(core.RandomManifestID())
Expand All @@ -111,6 +110,9 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
cap := core.Capability_LiveVideoToVideo
modelID := *req.ModelId
clog.V(common.VERBOSE).Infof(ctx, "Received request id=%v cap=%v modelID=%v", requestID, cap, modelID)
if monitor.Enabled {
monitor.AILiveVideoAttempt()
}

// Create storage for the request (for AI Workers, must run before CheckAICapacity)
err := orch.CreateStorageForRequest(requestID)
Expand Down
5 changes: 4 additions & 1 deletion server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,9 +1079,12 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A
startControlPublish(control, params)
startTricklePublish(ctx, pub, params, sess)
startTrickleSubscribe(ctx, sub, params, func() {
delayMs := time.Since(startTime).Milliseconds()
if monitor.Enabled {
monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds(), sess.OrchestratorInfo)
monitor.AIFirstSegmentDelay(delayMs, sess.OrchestratorInfo)
}
clog.V(common.VERBOSE).Infof(ctx, "First Segment delay=%dms streamID=%s", delayMs, params.liveParams.streamID)

})
startEventsSubscribe(ctx, events, params, sess)
return resp, nil
Expand Down

0 comments on commit 8afa4f1

Please sign in to comment.