Skip to content

Commit ef2b01d

Browse files
committed
refactor stream, preload and add null reader checks
1 parent a77df20 commit ef2b01d

File tree

3 files changed

+348
-100
lines changed

3 files changed

+348
-100
lines changed

server/tgbot/upload/torrfile.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ func NewTorrFile(wrk *Worker, stFile *state.TorrentFileStat) (*TorrFile, error)
6262
}
6363

6464
reader := t.NewReader(file)
65+
if reader == nil {
66+
return nil, errors.New("cannot create torrent reader")
67+
}
6568
if sets.BTsets.ResponsiveMode {
6669
reader.SetResponsive()
6770
}

server/torr/preload.go

Lines changed: 195 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ import (
99

1010
"server/ffprobe"
1111

12-
"github.com/anacrolix/torrent"
13-
1412
"server/log"
1513
"server/settings"
1614
"server/torr/state"
1715
utils2 "server/utils"
16+
17+
"github.com/anacrolix/torrent"
1818
)
1919

2020
func (t *Torrent) Preload(index int, size int64) {
@@ -41,12 +41,14 @@ func (t *Torrent) Preload(index int, size int64) {
4141
t.muTorrent.Unlock()
4242

4343
defer func() {
44+
t.muTorrent.Lock()
4445
if t.Stat == state.TorrentPreload {
4546
t.Stat = state.TorrentWorking
46-
// Очистка по окончании прелоада
47-
t.BitRate = ""
48-
t.DurationSeconds = 0
4947
}
48+
t.muTorrent.Unlock()
49+
// Очистка по окончании прелоада
50+
t.BitRate = ""
51+
t.DurationSeconds = 0
5052
}()
5153

5254
file := t.findFileIndex(index)
@@ -58,111 +60,216 @@ func (t *Torrent) Preload(index int, size int64) {
5860
size = file.Length()
5961
}
6062

61-
if t.Info() != nil {
62-
timeout := time.Second * time.Duration(settings.BTsets.TorrentDisconnectTimeout)
63-
if timeout > time.Minute {
64-
timeout = time.Minute
65-
}
66-
// Запуск лога в отдельном потоке
67-
go func() {
68-
for t.Stat == state.TorrentPreload {
69-
stat := fmt.Sprint(file.Torrent().InfoHash().HexString(), " ", utils2.Format(float64(t.PreloadedBytes)), "/", utils2.Format(float64(t.PreloadSize)), " Speed:", utils2.Format(t.DownloadSpeed), " Peers:", t.Torrent.Stats().ActivePeers, "/", t.Torrent.Stats().TotalPeers, " [Seeds:", t.Torrent.Stats().ConnectedSeeders, "]")
70-
log.TLogln("Preload:", stat)
71-
t.AddExpiredTime(timeout)
72-
time.Sleep(time.Second)
73-
}
74-
}()
63+
if t.Info() == nil {
64+
return
65+
}
7566

76-
if ffprobe.Exists() {
77-
link := "http://127.0.0.1:" + settings.Port + "/play/" + t.Hash().HexString() + "/" + strconv.Itoa(index)
78-
if settings.Ssl {
79-
link = "https://127.0.0.1:" + settings.SslPort + "/play/" + t.Hash().HexString() + "/" + strconv.Itoa(index)
80-
}
81-
if data, err := ffprobe.ProbeUrl(link); err == nil {
82-
t.BitRate = data.Format.BitRate
83-
t.DurationSeconds = data.Format.DurationSeconds
67+
timeout := time.Second * time.Duration(settings.BTsets.TorrentDisconnectTimeout)
68+
if timeout > time.Minute {
69+
timeout = time.Minute
70+
}
71+
72+
// Create a stop channel for the logging goroutine
73+
logStopChan := make(chan struct{})
74+
defer close(logStopChan) // Ensure logging stops when function returns
75+
76+
// Запуск лога в отдельном потоке
77+
go func(stopChan <-chan struct{}) {
78+
ticker := time.NewTicker(time.Second)
79+
defer ticker.Stop()
80+
81+
for {
82+
select {
83+
case <-ticker.C:
84+
t.muTorrent.Lock()
85+
stat := t.Stat
86+
t.muTorrent.Unlock()
87+
88+
if stat != state.TorrentPreload {
89+
return
90+
}
91+
92+
statStr := fmt.Sprint(file.Torrent().InfoHash().HexString(), " ",
93+
utils2.Format(float64(t.PreloadedBytes)), "/",
94+
utils2.Format(float64(t.PreloadSize)), " Speed:",
95+
utils2.Format(t.DownloadSpeed), " Peers:",
96+
t.Torrent.Stats().ActivePeers, "/",
97+
t.Torrent.Stats().TotalPeers, " [Seeds:",
98+
t.Torrent.Stats().ConnectedSeeders, "]")
99+
log.TLogln("Preload:", statStr)
100+
t.AddExpiredTime(timeout)
101+
case <-stopChan:
102+
return
84103
}
85104
}
105+
}(logStopChan)
86106

87-
if t.Stat == state.TorrentClosed {
88-
log.TLogln("End preload: torrent closed")
89-
return
107+
if ffprobe.Exists() {
108+
link := "http://127.0.0.1:" + settings.Port + "/play/" + t.Hash().HexString() + "/" + strconv.Itoa(index)
109+
if settings.Ssl {
110+
link = "https://127.0.0.1:" + settings.SslPort + "/play/" + t.Hash().HexString() + "/" + strconv.Itoa(index)
90111
}
91-
92-
// startend -> 8/16 MB
93-
startend := t.Info().PieceLength
94-
if startend < 8<<20 {
95-
startend = 8 << 20
112+
if data, err := ffprobe.ProbeUrl(link); err == nil {
113+
t.BitRate = data.Format.BitRate
114+
t.DurationSeconds = data.Format.DurationSeconds
96115
}
116+
}
97117

98-
readerStart := file.NewReader()
99-
defer readerStart.Close()
100-
readerStart.SetResponsive()
101-
readerStart.SetReadahead(0)
102-
readerStartEnd := size - startend
118+
// Check if torrent was closed
119+
t.muTorrent.Lock()
120+
isClosed := t.Stat == state.TorrentClosed
121+
t.muTorrent.Unlock()
103122

104-
if readerStartEnd < 0 {
105-
// Если конец начального ридера оказался за началом
106-
readerStartEnd = size
107-
}
108-
if readerStartEnd > file.Length() {
109-
// Если конец начального ридера оказался после конца файла
110-
readerStartEnd = file.Length()
111-
}
123+
if isClosed {
124+
log.TLogln("End preload: torrent closed")
125+
return
126+
}
127+
128+
// startend -> 8/16 MB
129+
startend := t.Info().PieceLength
130+
if startend < 8<<20 {
131+
startend = 8 << 20
132+
}
133+
134+
readerStart := file.NewReader()
135+
if readerStart == nil {
136+
log.TLogln("End preload: null reader")
137+
return
138+
}
139+
defer readerStart.Close()
140+
141+
readerStart.SetResponsive()
142+
readerStart.SetReadahead(0)
143+
readerStartEnd := size - startend
144+
145+
if readerStartEnd < 0 {
146+
// Если конец начального ридера оказался за началом
147+
readerStartEnd = size
148+
}
149+
if readerStartEnd > file.Length() {
150+
// Если конец начального ридера оказался после конца файла
151+
readerStartEnd = file.Length()
152+
}
112153

113-
readerEndStart := file.Length() - startend
114-
readerEndEnd := file.Length()
154+
readerEndStart := file.Length() - startend
155+
readerEndEnd := file.Length()
115156

116-
var wg sync.WaitGroup
157+
var wg sync.WaitGroup
158+
var preloadErr error
159+
160+
// Start end range preload if needed
161+
if readerEndStart > readerStartEnd {
162+
wg.Add(1)
117163
go func() {
118-
offset := int64(0)
119-
if readerEndStart > readerStartEnd {
120-
// Если конечный ридер не входит в диапозон начального
121-
wg.Add(1)
122-
defer wg.Done()
123-
if t.Stat == state.TorrentPreload {
124-
readerEnd := file.NewReader()
125-
readerEnd.SetResponsive()
126-
readerEnd.SetReadahead(0)
127-
readerEnd.Seek(readerEndStart, io.SeekStart)
128-
offset = readerEndStart
129-
tmp := make([]byte, 32768)
130-
for offset+int64(len(tmp)) < readerEndEnd {
131-
n, err := readerEnd.Read(tmp)
132-
if err != nil {
133-
break
134-
}
135-
offset += int64(n)
164+
defer wg.Done()
165+
166+
// Check if we should still preload
167+
t.muTorrent.Lock()
168+
shouldPreload := t.Stat == state.TorrentPreload
169+
t.muTorrent.Unlock()
170+
171+
if !shouldPreload {
172+
return
173+
}
174+
175+
readerEnd := file.NewReader()
176+
if readerEnd == nil {
177+
log.TLogln("Err preload: null reader")
178+
preloadErr = fmt.Errorf("null reader for end range")
179+
return
180+
}
181+
defer readerEnd.Close() // Ensure reader is always closed
182+
183+
readerEnd.SetResponsive()
184+
readerEnd.SetReadahead(0)
185+
186+
_, err := readerEnd.Seek(readerEndStart, io.SeekStart)
187+
if err != nil {
188+
log.TLogln("Err preload seek:", err)
189+
preloadErr = err
190+
return
191+
}
192+
193+
offset := readerEndStart
194+
tmp := make([]byte, 32768)
195+
for offset+int64(len(tmp)) < readerEndEnd {
196+
n, err := readerEnd.Read(tmp)
197+
if err != nil {
198+
if err != io.EOF {
199+
log.TLogln("Err preload read:", err)
200+
preloadErr = err
136201
}
137-
readerEnd.Close()
202+
break
203+
}
204+
offset += int64(n)
205+
206+
// Check if we should continue
207+
t.muTorrent.Lock()
208+
shouldContinue := t.Stat == state.TorrentPreload
209+
t.muTorrent.Unlock()
210+
211+
if !shouldContinue {
212+
break
138213
}
139214
}
140215
}()
216+
}
141217

142-
pieceLength := t.Info().PieceLength
143-
readahead := pieceLength * 4
144-
if readerStartEnd < readahead {
145-
readahead = 0
218+
// Main preload section
219+
pieceLength := t.Info().PieceLength
220+
readahead := pieceLength * 4
221+
if readerStartEnd < readahead {
222+
readahead = 0
223+
}
224+
readerStart.SetReadahead(readahead)
225+
226+
offset := int64(0)
227+
tmp := make([]byte, 32768)
228+
for offset+int64(len(tmp)) < readerStartEnd {
229+
// Check if we should continue
230+
t.muTorrent.Lock()
231+
shouldContinue := t.Stat == state.TorrentPreload
232+
t.muTorrent.Unlock()
233+
234+
if !shouldContinue {
235+
log.TLogln("Preload cancelled")
236+
break
146237
}
147-
readerStart.SetReadahead(readahead)
148-
offset := int64(0)
149-
tmp := make([]byte, 32768)
150-
for offset+int64(len(tmp)) < readerStartEnd {
151-
n, err := readerStart.Read(tmp)
152-
if err != nil {
238+
239+
n, err := readerStart.Read(tmp)
240+
if err != nil {
241+
if err != io.EOF {
153242
log.TLogln("Error preload:", err)
154-
return
155-
}
156-
offset += int64(n)
157-
if readahead > 0 && readerStartEnd-(offset+int64(len(tmp))) < readahead {
158-
readahead = 0
159-
readerStart.SetReadahead(0)
160243
}
244+
break
161245
}
246+
offset += int64(n)
247+
248+
if readahead > 0 && readerStartEnd-(offset+int64(len(tmp))) < readahead {
249+
readahead = 0
250+
readerStart.SetReadahead(0)
251+
}
252+
}
253+
254+
// Wait for end range preload to complete
255+
wg.Wait()
256+
257+
// Check if end range preload failed
258+
if preloadErr != nil {
259+
log.TLogln("End range preload failed:", preloadErr)
260+
}
261+
262+
// Final log
263+
t.muTorrent.Lock()
264+
finalStat := t.Stat
265+
t.muTorrent.Unlock()
162266

163-
wg.Wait()
267+
if finalStat == state.TorrentPreload {
268+
log.TLogln("End preload:", file.Torrent().InfoHash().HexString(),
269+
"Peers:", t.Torrent.Stats().ActivePeers, "/",
270+
t.Torrent.Stats().TotalPeers, "[ Seeds:",
271+
t.Torrent.Stats().ConnectedSeeders, "]")
164272
}
165-
log.TLogln("End preload:", file.Torrent().InfoHash().HexString(), "Peers:", t.Torrent.Stats().ActivePeers, "/", t.Torrent.Stats().TotalPeers, "[ Seeds:", t.Torrent.Stats().ConnectedSeeders, "]")
166273
}
167274

168275
func (t *Torrent) findFileIndex(index int) *torrent.File {

0 commit comments

Comments
 (0)