diff --git a/server/ai_live_video.go b/server/ai_live_video.go index 3f1a59aac..c37eddb5f 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -54,31 +54,56 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara // check for end of stream if _, eos := reader.(*media.EOSReader); eos { if err := publisher.Close(); err != nil { - clog.Infof(ctx, "Error closing trickle publisher. err=%s", err) + clog.Infof(ctx, "Error closing trickle publisher. err=%v", err) } cancel() return } - if _, atMax := slowOrchChecker.BeginSegment(); atMax { + thisSeq, atMax := slowOrchChecker.BeginSegment() + if atMax { clog.Infof(ctx, "Orchestrator is slow - terminating") cancel() return // TODO kill the rest of the processing, including ingest // TODO switch orchestrators } - go func() { + go func(seq int) { defer slowOrchChecker.EndSegment() var r io.Reader = reader if paymentProcessor != nil { r = paymentProcessor.process(reader) } - clog.V(8).Infof(ctx, "trickle publish writing data") - // TODO this blocks! very bad! - if err := publisher.Write(r); err != nil { - clog.Infof(ctx, "Error writing to trickle publisher. err=%s", err) + clog.V(8).Infof(ctx, "trickle publish writing data seq=%d", seq) + segment, err := publisher.Next() + if err != nil { + clog.Infof(ctx, "error getting next publish handle; dropping segment err=%v", err) + return } - }() + for { + currentSeq := slowOrchChecker.GetCount() + if seq != currentSeq { + clog.Infof(ctx, "Next segment has already started; skipping this one seq=%d currentSeq=%d", seq, currentSeq) + return + } + n, err := segment.Write(r) + if err == nil { + // no error, all done, let's leave + return + } + // Retry segment only if nothing has been sent yet + // and the next segment has not yet started + // otherwise drop + if n > 0 { + clog.Infof(ctx, "Error publishing segment; dropping remainder wrote=%d err=%v", n, err) + return + } + clog.Infof(ctx, "Error publishing segment before writing; retrying err=%v", err) + // Clone in case read head was incremented somewhere, which cloning ressets + r = reader.Clone() + time.Sleep(250 * time.Millisecond) + } + }(thisSeq) }) clog.Infof(ctx, "trickle pub") } @@ -251,7 +276,7 @@ func startEventsSubscribe(ctx context.Context, url *url.URL, params aiRequestPar } } - clog.Infof(ctx, "Received event for stream=%s event=%+v", stream, event) + clog.V(8).Infof(ctx, "Received event for stream=%s event=%+v", stream, event) eventType, ok := event["type"].(string) if !ok { @@ -328,3 +353,9 @@ func (s *SlowOrchChecker) EndSegment() { defer s.mu.Unlock() s.completeCount += 1 } + +func (s *SlowOrchChecker) GetCount() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.segmentCount +} diff --git a/trickle/trickle_publisher.go b/trickle/trickle_publisher.go index a6ca937ff..2caae4459 100644 --- a/trickle/trickle_publisher.go +++ b/trickle/trickle_publisher.go @@ -36,6 +36,10 @@ type pendingPost struct { index int writer *io.PipeWriter errCh chan error + + // needed to help with reconnects + written bool + client *TricklePublisher } // NewTricklePublisher creates a new trickle stream client @@ -53,7 +57,6 @@ func NewTricklePublisher(url string) (*TricklePublisher, error) { return c, nil } -// Acquire lock to manage access to pendingPost and index // NB expects to have the lock already since we mutate the index func (c *TricklePublisher) preconnect() (*pendingPost, error) { @@ -113,6 +116,7 @@ func (c *TricklePublisher) preconnect() (*pendingPost, error) { writer: pw, index: index, errCh: errCh, + client: c, }, nil } @@ -136,11 +140,10 @@ func (c *TricklePublisher) Close() error { return nil } -// Write sends data to the current segment, sets up the next segment concurrently, and blocks until completion -func (c *TricklePublisher) Write(data io.Reader) error { - +func (c *TricklePublisher) Next() (*pendingPost, error) { // Acquire lock to manage access to pendingPost and index c.writeLock.Lock() + defer c.writeLock.Unlock() // Get the writer to use pp := c.pendingPost @@ -148,29 +151,61 @@ func (c *TricklePublisher) Write(data io.Reader) error { p, err := c.preconnect() if err != nil { c.writeLock.Unlock() - return err + return nil, err } pp = p } - writer := pp.writer - index := pp.index - errCh := pp.errCh // Set up the next connection nextPost, err := c.preconnect() if err != nil { c.writeLock.Unlock() - return err + return nil, err } c.pendingPost = nextPost - // Now unlock so the copy does not block - c.writeLock.Unlock() + return pp, nil +} + +func (p *pendingPost) reconnect() (*pendingPost, error) { + // This is a little gnarly but works for now: + // Set the publisher's sequence sequence to the intended reconnect + // Call publisher's preconnect (which increments its sequence) + // then reset publisher's sequence back to the original + //slog.Info("Re-connecting", "url", p.client.baseURL, "seq", p.client.index) + p.client.writeLock.Lock() + defer p.client.writeLock.Unlock() + currentSeq := p.client.index + p.client.index = p.index + pp, err := p.client.preconnect() + p.client.index = currentSeq + return pp, err +} + +func (p *pendingPost) Write(data io.Reader) (int64, error) { + + // If writing multiple times, reconnect + if p.written { + pp, err := p.reconnect() + if err != nil { + return 0, err + } + p = pp + } + + var ( + writer = p.writer + index = p.index + errCh = p.errCh + ) + + // Mark as written + p.written = true // before writing, check for error from preconnects select { case err := <-errCh: - return err + return 0, err default: // no error, continue } @@ -192,18 +227,28 @@ func (c *TricklePublisher) Write(data io.Reader) error { // also prioritize errors over this channel compared to io errors // such as "read/write on closed pipe" if err := <-errCh; err != nil { - return err + return n, err } if ioError != nil { - return fmt.Errorf("error streaming data to segment %d: %w", index, err) + return n, fmt.Errorf("error streaming data to segment %d: %w", index, ioError) } if closeErr != nil { - return fmt.Errorf("error closing writer for segment %d: %w", index, err) + return n, fmt.Errorf("error closing writer for segment %d: %w", index, closeErr) } - return nil + return n, nil +} + +// Write sends data to the current segment, sets up the next segment concurrently, and blocks until completion +func (c *TricklePublisher) Write(data io.Reader) error { + pp, err := c.Next() + if err != nil { + return err + } + _, err = pp.Write(data) + return err } func humanBytes(bytes int64) string { diff --git a/trickle/trickle_server.go b/trickle/trickle_server.go index ddeb6679f..785cd3859 100644 --- a/trickle/trickle_server.go +++ b/trickle/trickle_server.go @@ -333,7 +333,9 @@ func (s *Stream) handlePost(w http.ResponseWriter, r *http.Request, idx int) { if exists { slog.Warn("Overwriting existing entry", "idx", idx) // Overwrite anything that exists now. TODO figure out a safer behavior? - return + // TODO fix concurrent writes to the same segment; would be very bad + segment.buffer.Reset() + segment.closed = false } // Wrap the request body with the custom timeoutReader so we can send @@ -527,6 +529,8 @@ func (s *Segment) readData(startPos int) ([]byte, bool) { } if startPos > totalLen { slog.Info("Invalid start pos, invoking eof") + // This might happen if the buffer was reset + // eg because of a repeated POST return nil, true } if s.closed {