Skip to content

Commit

Permalink
[ENG-2342] use function callback
Browse files Browse the repository at this point in the history
  • Loading branch information
pwilczynskiclearcode committed Jan 8, 2025
1 parent ee71e3b commit 48540cf
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
7 changes: 3 additions & 4 deletions server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/media"
"github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/go-livepeer/trickle"

"github.com/livepeer/lpms/ffmpeg"
Expand Down Expand Up @@ -125,7 +124,7 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara
clog.Infof(ctx, "trickle pub")
}

func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, startTime time.Time, orchInfo *net.OrchestratorInfo) {
func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, onFistSegment func()) {
// subscribe to the outputs and send them into LPMS
subscriber := trickle.NewTrickleSubscriber(url.String())
r, w, err := os.Pipe()
Expand Down Expand Up @@ -185,9 +184,9 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
params.liveParams.stopPipeline(fmt.Errorf("trickle subscribe error copying: %w", err))
return
}
if monitor.Enabled && firstSegment {
if firstSegment {
firstSegment = false
monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds(), orchInfo)
onFistSegment()
}
clog.V(8).Infof(ctx, "trickle subscribe read data completed seq=%d bytes=%s", seq, humanize.Bytes(uint64(n)))
}
Expand Down
6 changes: 5 additions & 1 deletion server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,11 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A

startControlPublish(control, params)
startTricklePublish(ctx, pub, params, sess)
startTrickleSubscribe(ctx, sub, params, startTime, sess.OrchestratorInfo)
startTrickleSubscribe(ctx, sub, params, func() {
if monitor.Enabled {
monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds(), sess.OrchestratorInfo)
}
})
startEventsSubscribe(ctx, events, params, sess)
return resp, nil
}
Expand Down

0 comments on commit 48540cf

Please sign in to comment.