From a1800dce8adbd05d7d7f74c8f772c25c350c5082 Mon Sep 17 00:00:00 2001 From: Josh Allmann Date: Thu, 23 Jan 2025 02:23:26 -0800 Subject: [PATCH] ai/live: Cancel ffmpeg exec when segmenter exits. (#3367) This prevents ffmpeg from reading indefinitely when there is no segment handling attached. That in turn can lead to disk filling up since ffmpeg writes segments to disk without removing them. Also use context logging for some segmenter error messages. --- media/rtmp2segment.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/media/rtmp2segment.go b/media/rtmp2segment.go index 4cb3310cf..c2770dbf9 100644 --- a/media/rtmp2segment.go +++ b/media/rtmp2segment.go @@ -32,10 +32,12 @@ type MediaSegmenter struct { func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmentHandler SegmentHandler) { outFilePattern := filepath.Join(ms.Workdir, randomString()+"-%d.ts") completionSignal := make(chan bool, 1) + procCtx, procCancel := context.WithCancel(context.Background()) // parent ctx is a short lived http request wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() + defer procCancel() processSegments(ctx, segmentHandler, outFilePattern, completionSignal) }() @@ -49,7 +51,7 @@ func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmen clog.Errorf(ctx, "Stopping segmentation, input stream does not exist. in=%s err=%s", in, err) break } - cmd := exec.Command("ffmpeg", + cmd := exec.CommandContext(procCtx, "ffmpeg", "-i", in, "-c:a", "copy", "-c:v", "copy", @@ -58,7 +60,7 @@ func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmen ) output, err := cmd.CombinedOutput() if err != nil { - clog.Errorf(ctx, "Error sending RTMP out process: %v", err) + clog.Errorf(ctx, "Error receiving RTMP: %v", err) clog.Infof(ctx, "Process output: %s", output) return } @@ -185,12 +187,12 @@ func processSegments(ctx context.Context, segmentHandler SegmentHandler, outFile defer mu.Unlock() if currentSegment != nil { // Trigger EOF on the current segment by closing the file - slog.Info("Completion signal received. Closing current segment to trigger EOF.") + clog.Infof(ctx, "Completion signal received. Closing current segment to trigger EOF.") currentSegment.Close() } isComplete = true pipeCompletion <- true - slog.Info("Got completion signal") + clog.Infof(ctx, "Got completion signal") }() pipeNum := 0 @@ -207,7 +209,7 @@ func processSegments(ctx context.Context, segmentHandler SegmentHandler, outFile // Blocks if no writer is available so do some tricks to it file, err := openNonBlockingWithRetry(pipeName, waitTimeout, pipeCompletion) if err != nil { - slog.Error("Error opening pipe", "pipeName", pipeName, "err", err) + clog.Errorf(ctx, "Error opening pipe pipeName=%s err=%s", pipeName, err) cleanUpPipe(pipeName) cleanUpPipe(nextPipeName) break