Skip to content

Commit

Permalink
ai/live: Cancel ffmpeg exec when segmenter exits. (#3367)
Browse files Browse the repository at this point in the history
This prevents ffmpeg from reading indefinitely when there is no
segment handling attached. That in turn can lead to disk filling up
since ffmpeg writes segments to disk without removing them.

Also use context logging for some segmenter error messages.
  • Loading branch information
j0sh authored Jan 23, 2025
1 parent 3e94431 commit a1800dc
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions media/rtmp2segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ type MediaSegmenter struct {
func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmentHandler SegmentHandler) {
outFilePattern := filepath.Join(ms.Workdir, randomString()+"-%d.ts")
completionSignal := make(chan bool, 1)
procCtx, procCancel := context.WithCancel(context.Background()) // parent ctx is a short lived http request
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
defer procCancel()
processSegments(ctx, segmentHandler, outFilePattern, completionSignal)
}()

Expand All @@ -49,7 +51,7 @@ func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmen
clog.Errorf(ctx, "Stopping segmentation, input stream does not exist. in=%s err=%s", in, err)
break
}
cmd := exec.Command("ffmpeg",
cmd := exec.CommandContext(procCtx, "ffmpeg",
"-i", in,
"-c:a", "copy",
"-c:v", "copy",
Expand All @@ -58,7 +60,7 @@ func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmen
)
output, err := cmd.CombinedOutput()
if err != nil {
clog.Errorf(ctx, "Error sending RTMP out process: %v", err)
clog.Errorf(ctx, "Error receiving RTMP: %v", err)
clog.Infof(ctx, "Process output: %s", output)
return
}
Expand Down Expand Up @@ -185,12 +187,12 @@ func processSegments(ctx context.Context, segmentHandler SegmentHandler, outFile
defer mu.Unlock()
if currentSegment != nil {
// Trigger EOF on the current segment by closing the file
slog.Info("Completion signal received. Closing current segment to trigger EOF.")
clog.Infof(ctx, "Completion signal received. Closing current segment to trigger EOF.")
currentSegment.Close()
}
isComplete = true
pipeCompletion <- true
slog.Info("Got completion signal")
clog.Infof(ctx, "Got completion signal")
}()

pipeNum := 0
Expand All @@ -207,7 +209,7 @@ func processSegments(ctx context.Context, segmentHandler SegmentHandler, outFile
// Blocks if no writer is available so do some tricks to it
file, err := openNonBlockingWithRetry(pipeName, waitTimeout, pipeCompletion)
if err != nil {
slog.Error("Error opening pipe", "pipeName", pipeName, "err", err)
clog.Errorf(ctx, "Error opening pipe pipeName=%s err=%s", pipeName, err)
cleanUpPipe(pipeName)
cleanUpPipe(nextPipeName)
break
Expand Down

0 comments on commit a1800dc

Please sign in to comment.