Skip to content

Commit 28d8c5a

Browse files
committed
fix stream headers
1 parent 4e2549f commit 28d8c5a

File tree

2 files changed

+23
-107
lines changed

2 files changed

+23
-107
lines changed

server/torr/storage/torrstor/cache.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,11 @@ func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
8787
}
8888

8989
func (c *Cache) Close() error {
90-
log.TLogln("Close cache for:", c.hash)
90+
if c.torrent != nil {
91+
log.TLogln("Close cache for:", c.torrent.Name(), c.hash)
92+
} else {
93+
log.TLogln("Close cache for:", c.hash)
94+
}
9195
c.isClosed = true
9296

9397
delete(c.storage.caches, c.hash)

server/torr/stream.go

Lines changed: 18 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/hex"
66
"errors"
77
"fmt"
8-
"io"
98
"log"
109
"net"
1110
"net/http"
@@ -28,15 +27,13 @@ func (t *Torrent) Stream(fileID int, req *http.Request, resp http.ResponseWriter
2827
// Increment active streams counter
2928
streamID := atomic.AddInt32(&activeStreams, 1)
3029
defer atomic.AddInt32(&activeStreams, -1)
31-
32-
// Stream disconnect timeout
33-
StreamTimeout := sets.BTsets.TorrentDisconnectTimeout
30+
// Stream disconnect timeout (same as torrent)
31+
streamTimeout := sets.BTsets.TorrentDisconnectTimeout
3432

3533
if !t.GotInfo() {
3634
http.NotFound(resp, req)
3735
return errors.New("torrent doesn't have info yet")
3836
}
39-
4037
// Get file information
4138
st := t.Status()
4239
var stFile *state.TorrentFileStat
@@ -49,7 +46,6 @@ func (t *Torrent) Stream(fileID int, req *http.Request, resp http.ResponseWriter
4946
if stFile == nil {
5047
return fmt.Errorf("file with id %v not found", fileID)
5148
}
52-
5349
// Find the actual torrent file
5450
files := t.Files()
5551
var file *torrent.File
@@ -62,15 +58,13 @@ func (t *Torrent) Stream(fileID int, req *http.Request, resp http.ResponseWriter
6258
if file == nil {
6359
return fmt.Errorf("file with id %v not found", fileID)
6460
}
65-
6661
// Check file size limit
6762
if int64(sets.MaxSize) > 0 && file.Length() > int64(sets.MaxSize) {
6863
err := fmt.Errorf("file size exceeded max allowed %d bytes", sets.MaxSize)
6964
log.Printf("File %s size (%d) exceeded max allowed %d bytes", file.DisplayPath(), file.Length(), sets.MaxSize)
7065
http.Error(resp, err.Error(), http.StatusForbidden)
7166
return err
7267
}
73-
7468
// Create reader with context for timeout
7569
reader := t.NewReader(file)
7670
if reader == nil {
@@ -83,12 +77,11 @@ func (t *Torrent) Stream(fileID int, req *http.Request, resp http.ResponseWriter
8377
if sets.BTsets.ResponsiveMode {
8478
reader.SetResponsive()
8579
}
86-
87-
host, port, hperr := net.SplitHostPort(req.RemoteAddr)
88-
8980
// Log connection
81+
host, port, clerr := net.SplitHostPort(req.RemoteAddr)
82+
9083
if sets.BTsets.EnableDebug {
91-
if hperr != nil {
84+
if clerr != nil {
9285
log.Printf("[Stream:%d] Connect client (Active streams: %d)", streamID, atomic.LoadInt32(&activeStreams))
9386
} else {
9487
log.Printf("[Stream:%d] Connect client %s:%s (Active streams: %d)",
@@ -104,135 +97,54 @@ func (t *Torrent) Stream(fileID int, req *http.Request, resp http.ResponseWriter
10497

10598
// Set response headers
10699
resp.Header().Set("Connection", "close")
107-
108100
// Add timeout header if configured
109-
if StreamTimeout > 0 {
110-
resp.Header().Set("X-Stream-Timeout", fmt.Sprintf("%d", StreamTimeout))
101+
if streamTimeout > 0 {
102+
resp.Header().Set("X-Stream-Timeout", fmt.Sprintf("%d", streamTimeout))
111103
}
112-
104+
// Add ETag
113105
etag := hex.EncodeToString([]byte(fmt.Sprintf("%s/%s", t.Hash().HexString(), file.Path())))
114106
resp.Header().Set("ETag", httptoo.EncodeQuotedString(etag))
115-
116107
// DLNA headers
117108
resp.Header().Set("transferMode.dlna.org", "Streaming")
109+
// add MimeType
118110
mime, err := mt.MimeTypeByPath(file.Path())
119111
if err == nil && mime.IsMedia() {
120112
resp.Header().Set("content-type", mime.String())
121113
}
122-
114+
// DLNA Seek
123115
if req.Header.Get("getContentFeatures.dlna.org") != "" {
124116
resp.Header().Set("contentFeatures.dlna.org", dlna.ContentFeatures{
125117
SupportRange: true,
126118
SupportTimeSeek: true,
127119
}.String())
128120
}
129-
130121
// Add support for range requests
131122
if req.Header.Get("Range") != "" {
132123
resp.Header().Set("Accept-Ranges", "bytes")
133124
}
134-
135125
// Create a context with timeout if configured
136126
ctx := req.Context()
137-
if StreamTimeout > 0 {
127+
if streamTimeout > 0 {
138128
var cancel context.CancelFunc
139-
ctx, cancel = context.WithTimeout(ctx, time.Duration(StreamTimeout)*time.Second)
129+
ctx, cancel = context.WithTimeout(ctx, time.Duration(streamTimeout)*time.Second)
140130
defer cancel()
141-
142-
// Update request with new context
143-
req = req.WithContext(ctx)
144131
}
132+
// Update request with new context
133+
req = req.WithContext(ctx)
145134

146-
// Create a trackingReadSeeker that implements both io.Reader and io.Seeker
147-
tracker := &trackingReadSeeker{
148-
ReadSeeker: reader,
149-
StreamID: streamID,
150-
FileName: file.DisplayPath(),
151-
}
152-
153-
// Use a custom ResponseWriter to handle disconnections
154-
wrappedWriter := &responseWriter{
155-
ResponseWriter: resp,
156-
OnWrite: func(n int) {
157-
// Track bytes sent if needed
158-
atomic.AddInt64(&tracker.BytesSent, int64(n))
159-
},
160-
}
135+
http.ServeContent(resp, req, file.Path(), time.Unix(t.Timestamp, 0), reader)
161136

162-
// Serve content with our tracking read seeker
163-
http.ServeContent(wrappedWriter, req, file.Path(), time.Unix(t.Timestamp, 0), tracker)
164-
165-
// Log disconnection
166137
if sets.BTsets.EnableDebug {
167-
if hperr != nil {
168-
log.Printf("[Stream:%d] Disconnect client (Read: %d, Sent: %d bytes, Duration: %v)",
169-
streamID, tracker.BytesRead, tracker.BytesSent, time.Since(tracker.StartTime))
138+
if clerr != nil {
139+
log.Printf("[Stream:%d] Disconnect client", streamID)
170140
} else {
171-
log.Printf("[Stream:%d] Disconnect client %s:%s (Read: %d, Sent: %d bytes, Duration: %v)",
172-
streamID, host, port, tracker.BytesRead, tracker.BytesSent, time.Since(tracker.StartTime))
141+
log.Printf("[Stream:%d] Disconnect client %s:%s", streamID, host, port)
173142
}
174143
}
175-
176144
return nil
177145
}
178146

179147
// GetActiveStreams returns number of currently active streams
180148
func GetActiveStreams() int32 {
181149
return atomic.LoadInt32(&activeStreams)
182150
}
183-
184-
// trackingReadSeeker wraps an io.ReadSeeker to track bytes read
185-
type trackingReadSeeker struct {
186-
io.ReadSeeker
187-
StreamID int32
188-
FileName string
189-
BytesRead int64
190-
BytesSent int64
191-
StartTime time.Time
192-
}
193-
194-
func (trs *trackingReadSeeker) Read(p []byte) (n int, err error) {
195-
if trs.StartTime.IsZero() {
196-
trs.StartTime = time.Now()
197-
}
198-
199-
n, err = trs.ReadSeeker.Read(p)
200-
if n > 0 {
201-
atomic.AddInt64(&trs.BytesRead, int64(n))
202-
203-
// Log progress for large reads
204-
if sets.BTsets.EnableDebug && trs.BytesRead%(10*1024*1024) == 0 {
205-
log.Printf("[Stream:%d] %s: Read %d MB",
206-
trs.StreamID, trs.FileName, trs.BytesRead/(1024*1024))
207-
}
208-
}
209-
return
210-
}
211-
212-
func (trs *trackingReadSeeker) Seek(offset int64, whence int) (int64, error) {
213-
newPos, err := trs.ReadSeeker.Seek(offset, whence)
214-
if sets.BTsets.EnableDebug && err == nil {
215-
log.Printf("[Stream:%d] %s: Seek to %d (whence: %d)",
216-
trs.StreamID, trs.FileName, newPos, whence)
217-
}
218-
return newPos, err
219-
}
220-
221-
// Helper struct to handle response writing with callbacks
222-
type responseWriter struct {
223-
http.ResponseWriter
224-
OnWrite func(int)
225-
}
226-
227-
func (rw *responseWriter) Write(p []byte) (n int, err error) {
228-
n, err = rw.ResponseWriter.Write(p)
229-
if rw.OnWrite != nil && n > 0 {
230-
rw.OnWrite(n)
231-
}
232-
return
233-
}
234-
235-
func (rw *responseWriter) WriteHeader(statusCode int) {
236-
// You can track status codes here if needed
237-
rw.ResponseWriter.WriteHeader(statusCode)
238-
}

0 commit comments

Comments
 (0)