Skip to content

Commit

Permalink
ai/live: Signal "stream exists but segment doesn't" with status 470 (#…
Browse files Browse the repository at this point in the history
…3327)

Clients can use this information to jump to the leading edge if
there are gaps in the sequence or they have fallen behind the
window of available segments.
  • Loading branch information
j0sh authored Dec 19, 2024
1 parent fa50112 commit e781cac
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
7 changes: 4 additions & 3 deletions trickle/local_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (c *TrickleLocalSubscriber) Read() (*TrickleData, error) {
}
c.mu.Lock()
defer c.mu.Unlock()
segment, exists := stream.getForRead(c.seq)
segment, latestSeq, exists := stream.getForRead(c.seq)
if !exists {
return nil, errors.New("seq not found")
}
Expand Down Expand Up @@ -70,8 +70,9 @@ func (c *TrickleLocalSubscriber) Read() (*TrickleData, error) {
return &TrickleData{
Reader: r,
Metadata: map[string]string{
"Lp-Trickle-Seq": strconv.Itoa(segment.idx),
"Content-Type": stream.mimeType,
"Lp-Trickle-Latest": strconv.Itoa(latestSeq),
"Lp-Trickle-Seq": strconv.Itoa(segment.idx),
"Content-Type": stream.mimeType,
}, // TODO take more metadata from http headers
}, nil
}
15 changes: 11 additions & 4 deletions trickle/trickle_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (s *Stream) getForWrite(idx int) (*Segment, bool) {
return segment, false
}

func (s *Stream) getForRead(idx int) (*Segment, bool) {
func (s *Stream) getForRead(idx int) (*Segment, int, bool) {
s.mutex.RLock()
defer s.mutex.RUnlock()
exists := func(seg *Segment, i int) bool {
Expand All @@ -421,7 +421,7 @@ func (s *Stream) getForRead(idx int) (*Segment, bool) {
slog.Info("GET precreating", "stream", s.name, "idx", idx, "latest", s.latestWrite)
}
slog.Info("GET segment", "stream", s.name, "idx", idx, "latest", s.latestWrite, "exists?", exists(segment, idx))
return segment, exists(segment, idx)
return segment, s.latestWrite, exists(segment, idx)
}

func (sm *Server) handleGet(w http.ResponseWriter, r *http.Request) {
Expand All @@ -439,9 +439,13 @@ func (sm *Server) handleGet(w http.ResponseWriter, r *http.Request) {
}

func (s *Stream) handleGet(w http.ResponseWriter, r *http.Request, idx int) {
segment, exists := s.getForRead(idx)
segment, latestSeq, exists := s.getForRead(idx)
if !exists {
http.Error(w, "Entry not found", http.StatusNotFound)
// Special status to indicate "stream exists but segment doesn't"
w.Header().Set("Lp-Trickle-Latest", strconv.Itoa(latestSeq))
w.Header().Set("Lp-Trickle-Seq", strconv.Itoa(idx))
w.WriteHeader(470)
w.Write([]byte("Entry not found"))
return
}

Expand Down Expand Up @@ -469,6 +473,9 @@ func (s *Stream) handleGet(w http.ResponseWriter, r *http.Request, idx int) {
data, eof := subscriber.readData()
if len(data) > 0 {
if totalWrites <= 0 {
if segment.idx != latestSeq {
w.Header().Set("Lp-Trickle-Latest", strconv.Itoa(latestSeq))
}
w.Header().Set("Lp-Trickle-Seq", strconv.Itoa(segment.idx))
w.Header().Set("Content-Type", s.mimeType)
}
Expand Down

0 comments on commit e781cac

Please sign in to comment.