diff --git a/media/rtmp2segment.go b/media/rtmp2segment.go index 4cb3310cf..c2770dbf9 100644 --- a/media/rtmp2segment.go +++ b/media/rtmp2segment.go @@ -32,10 +32,12 @@ type MediaSegmenter struct { func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmentHandler SegmentHandler) { outFilePattern := filepath.Join(ms.Workdir, randomString()+"-%d.ts") completionSignal := make(chan bool, 1) + procCtx, procCancel := context.WithCancel(context.Background()) // parent ctx is a short lived http request wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() + defer procCancel() processSegments(ctx, segmentHandler, outFilePattern, completionSignal) }() @@ -49,7 +51,7 @@ func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmen clog.Errorf(ctx, "Stopping segmentation, input stream does not exist. in=%s err=%s", in, err) break } - cmd := exec.Command("ffmpeg", + cmd := exec.CommandContext(procCtx, "ffmpeg", "-i", in, "-c:a", "copy", "-c:v", "copy", @@ -58,7 +60,7 @@ func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmen ) output, err := cmd.CombinedOutput() if err != nil { - clog.Errorf(ctx, "Error sending RTMP out process: %v", err) + clog.Errorf(ctx, "Error receiving RTMP: %v", err) clog.Infof(ctx, "Process output: %s", output) return } @@ -185,12 +187,12 @@ func processSegments(ctx context.Context, segmentHandler SegmentHandler, outFile defer mu.Unlock() if currentSegment != nil { // Trigger EOF on the current segment by closing the file - slog.Info("Completion signal received. Closing current segment to trigger EOF.") + clog.Infof(ctx, "Completion signal received. Closing current segment to trigger EOF.") currentSegment.Close() } isComplete = true pipeCompletion <- true - slog.Info("Got completion signal") + clog.Infof(ctx, "Got completion signal") }() pipeNum := 0 @@ -207,7 +209,7 @@ func processSegments(ctx context.Context, segmentHandler SegmentHandler, outFile // Blocks if no writer is available so do some tricks to it file, err := openNonBlockingWithRetry(pipeName, waitTimeout, pipeCompletion) if err != nil { - slog.Error("Error opening pipe", "pipeName", pipeName, "err", err) + clog.Errorf(ctx, "Error opening pipe pipeName=%s err=%s", pipeName, err) cleanUpPipe(pipeName) cleanUpPipe(nextPipeName) break