Skip to content

Commit

Permalink
net.http: implement http.download_file_with_progress/2, saving each c…
Browse files Browse the repository at this point in the history
…hunk, as it is received, without growing the memory usage (#21633)
  • Loading branch information
spytheman authored Jun 2, 2024
1 parent 2e567ff commit 8504bea
Show file tree
Hide file tree
Showing 9 changed files with 354 additions and 85 deletions.
39 changes: 28 additions & 11 deletions cmd/tools/vdownload.v
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import crypto.sha256
struct Context {
mut:
show_help bool
show_sha1 bool
show_sha256 bool
target_folder string
continue_on_failure bool
retries int
Expand All @@ -27,6 +29,8 @@ fn main() {
fp.limit_free_args_to_at_least(1)!
ctx.show_help = fp.bool('help', `h`, false, 'Show this help screen.')
ctx.target_folder = fp.string('target-folder', `t`, '.', 'The target folder, where the file will be stored. It will be created, if it does not exist. Default is current folder.')
ctx.show_sha1 = fp.bool('sha1', `1`, false, 'Show the SHA1 hash of the downloaded file.')
ctx.show_sha256 = fp.bool('sha256', `2`, false, 'Show the SHA256 hash of the downloaded file.')
ctx.continue_on_failure = fp.bool('continue', `c`, false, 'Continue on download failures. If you download 5 URLs, and several of them fail, continue without error. False by default.')
ctx.retries = fp.int('retries', `r`, 10, 'Number of retries, when an URL fails to download.')
ctx.delay = time.Duration(u64(fp.float('delay', `d`, 1.0, 'Delay in seconds, after each retry.') * time.second))
Expand All @@ -49,13 +53,18 @@ fn main() {
sw := time.new_stopwatch()
mut errors := 0
mut downloaded := 0
downloader := if os.is_atty(1) > 0 {
&http.Downloader(http.TerminalStreamingDownloader{})
} else {
&http.Downloader(http.SilentStreamingDownloader{})
}
for idx, url in ctx.urls {
fname := url.all_after_last('/')
fpath := '${ctx.target_folder}/${fname}'
mut file_errors := 0
log.info('Downloading [${idx + 1}/${ctx.urls.len}] from url: ${url} to ${fpath} ...')
for retry in 0 .. ctx.retries {
http.download_file(url, fname) or {
http.download_file_with_progress(url, fname, downloader: downloader) or {
log.error(' retry ${retry + 1}/${ctx.retries}, failed downloading from url: ${url}. Error: ${err}.')
file_errors++
time.sleep(ctx.delay)
Expand All @@ -77,18 +86,26 @@ fn main() {
log.info(' Finished downloading file: ${fpath} .')
log.info(' size: ${fstat.size} bytes')

if !ctx.show_sha256 && !ctx.show_sha1 {
continue
}

fbytes := os.read_bytes(fname)!
mut digest256 := sha256.new()
digest256.write(fbytes)!
mut sum256 := digest256.sum([])
hash256 := sum256.hex()
if ctx.show_sha1 {
mut digest1 := sha1.new()
digest1.write(fbytes)!
mut sum1 := digest1.sum([])
hash1 := sum1.hex()
log.info(' SHA1: ${hash1}')
}

mut digest1 := sha1.new()
digest1.write(fbytes)!
mut sum1 := digest1.sum([])
hash1 := sum1.hex()
log.info(' SHA1: ${hash1}')
log.info(' SHA256: ${hash256}')
if ctx.show_sha256 {
mut digest256 := sha256.new()
digest256.write(fbytes)!
mut sum256 := digest256.sum([])
hash256 := sum256.hex()
log.info(' SHA256: ${hash256}')
}
}
println('Downloaded: ${downloaded} file(s) . Elapsed time: ${sw.elapsed()} . Errors: ${errors} .')
if !ctx.continue_on_failure && errors > 0 {
Expand Down
37 changes: 13 additions & 24 deletions vlib/net/http/backend.c.v
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,29 @@ fn net_ssl_do(req &Request, port int, method Method, host_name string, path stri

req_headers := req.build_request_headers(method, host_name, path)
$if trace_http_request ? {
eprintln('> ${req_headers}')
eprint('> ')
eprint(req_headers)
eprintln('')
}

return req.do_request(req_headers, mut ssl_conn)!
}

fn read_from_ssl_connection_cb(con voidptr, buf &u8, bufsize int) !int {
mut ssl_conn := unsafe { &ssl.SSLConn(con) }
return ssl_conn.socket_read_into_ptr(buf, bufsize)
}

fn (req &Request) do_request(req_headers string, mut ssl_conn ssl.SSLConn) !Response {
ssl_conn.write_string(req_headers) or { return err }

mut content := strings.new_builder(100)
mut buff := [bufsize]u8{}
bp := unsafe { &buff[0] }
mut readcounter := 0
for {
readcounter++
len := ssl_conn.socket_read_into_ptr(bp, bufsize) or { break }
$if debug_http ? {
eprintln('ssl_do, read ${readcounter:4d} | len: ${len}')
eprintln('-'.repeat(20))
eprintln(unsafe { tos(bp, len) })
eprintln('-'.repeat(20))
}
if len <= 0 {
break
}
unsafe { content.write_ptr(bp, len) }
if req.on_progress != unsafe { nil } {
req.on_progress(req, content[content.len - len..], u64(content.len))!
}
}
mut content := strings.new_builder(4096)
req.receive_all_data_from_cb_in_builder(mut content, voidptr(ssl_conn), read_from_ssl_connection_cb)!
ssl_conn.shutdown()!
response_text := content.str()
$if trace_http_response ? {
eprintln('< ${response_text}')
eprint('< ')
eprint(response_text)
eprinln('')
}
if req.on_finish != unsafe { nil } {
req.on_finish(req, u64(response_text.len))!
Expand Down
5 changes: 0 additions & 5 deletions vlib/net/http/download.v
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ pub fn download_file(url string, out_file_path string) ! {
os.write_file(out_file_path, s.body)!
}

// TODO: implement download_file_with_progress
// type DownloadChunkFn = fn (written int)
// type DownloadFinishedFn = fn ()
// pub fn download_file_with_progress(url string, out_file_path string, cb_chunk DownloadChunkFn, cb_finished DownloadFinishedFn)

pub fn download_file_with_cookies(url string, out_file_path string, cookies map[string]string) ! {
$if debug_http ? {
println('http.download_file url=${url} out_file_path=${out_file_path}')
Expand Down
77 changes: 77 additions & 0 deletions vlib/net/http/download_progress.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
module http

// Downloader is the interface that you have to implement, if you need to customise
// how download_file_with_progress works, and what output it produces while a file
// is downloaded.
pub interface Downloader {
mut:
// Called once, at the start of the streaming download. You can do setup here,
// like opening a target file, changing request.stop_copying_limit to a different value,
// if you need it.
on_start(mut request Request, path string) !
// Called many times, once a chunk of data is received
on_chunk(request &Request, chunk []u8, already_received u64, expected u64) !
// Called once, at the end of the streaming download. Do cleanup here,
// like closing a file (opened in on_start), reporting stats etc.
on_finish(request &Request, response &Response) !
}

// DownloaderParams is similar to FetchConfig, but it also allows you to pass
// a `downloader: your_downloader_instance` parameter.
// See also http.SilentStreamingDownloader, and http.TerminalStreamingDownloader .
@[params]
pub struct DownloaderParams {
FetchConfig
pub mut:
downloader &Downloader = TerminalStreamingDownloader{}
}

// download_file_with_progress will save the URL `url` to the filepath `path` .
// Unlike download_file/2, it *does not* load the whole content in memory, but
// instead streams it chunk by chunk to the target `path`, as the chunks are received
// from the network. This makes it suitable for downloading big files, *without* increasing
// the memory consumption of your application.
//
// By default, it will also show a progress line, while the download happens.
// If you do not want a status line, you can call it like this:
// `http.download_file_with_progress(url, path, downloader: http.SilentStreamingDownloader{})`,
// or you can implement your own http.Downloader and pass that instead.
//
// Note: the returned response by this function, will have a truncated .body, after the first
// few KBs, because it does not accumulate all its data in memory, instead relying on the
// downloaders to save the received data chunk by chunk. You can parametrise this by
// using `stop_copying_limit:` but you need to pass a number that is big enough to fit
// at least all headers in the response, otherwise the parsing of the response at the end will
// fail, despite saving all the data in the file before that. The default is 65536 bytes.
pub fn download_file_with_progress(url string, path string, params DownloaderParams) !Response {
mut d := unsafe { params.downloader }
mut config := params.FetchConfig
config.url = url
config.user_ptr = voidptr(d)
config.on_progress_body = download_progres_cb
if config.stop_copying_limit == -1 {
// leave more than enough space for potential redirect headers
config.stop_copying_limit = 65536
}
mut req := prepare(config)!
d.on_start(mut req, path)!
response := req.do()!
d.on_finish(req, response)!
return response
}

const zz = &Downloader(unsafe { nil })

fn download_progres_cb(request &Request, chunk []u8, body_so_far u64, expected_size u64, status_code int) ! {
// TODO: remove this hack, when `unsafe { &Downloader( request.user_ptr ) }` works reliably,
// by just casting, without trying to promote the argument to the heap at all.
mut d := unsafe { http.zz }
pd := unsafe { &voidptr(&d) }
unsafe {
*pd = request.user_ptr
}
if status_code == 200 {
// ignore redirects, we are interested in the chunks of the final file:
d.on_chunk(request, chunk, body_so_far, expected_size)!
}
}
28 changes: 28 additions & 0 deletions vlib/net/http/download_silent_downloader.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module http

import os

// SilentStreamingDownloader just saves the downloaded file chunks to the given path.
// It does *no reporting at all*.
// Note: the folder part of the path should already exist, and has to be writable.
pub struct SilentStreamingDownloader {
pub mut:
path string
f os.File
}

// on_start is called once at the start of the download.
pub fn (mut d SilentStreamingDownloader) on_start(mut request Request, path string) ! {
d.path = path
d.f = os.create(path)!
}

// on_chunk is called multiple times, once per chunk of received content.
pub fn (mut d SilentStreamingDownloader) on_chunk(request &Request, chunk []u8, already_received u64, expected u64) ! {
d.f.write(chunk)!
}

// on_finish is called once at the end of the download.
pub fn (mut d SilentStreamingDownloader) on_finish(request &Request, response &Response) ! {
d.f.close()
}
46 changes: 46 additions & 0 deletions vlib/net/http/download_terminal_downloader.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
module http

import time

// TerminalStreamingDownloader is the same as http.SilentStreamingDownloader, but produces a progress line on stdout.
pub struct TerminalStreamingDownloader {
SilentStreamingDownloader
mut:
start_time time.Time
past_time time.Time
past_received u64
}

// on_start is called once at the start of the download.
pub fn (mut d TerminalStreamingDownloader) on_start(mut request Request, path string) ! {
d.SilentStreamingDownloader.on_start(mut request, path)!
d.start_time = time.now()
d.past_time = time.now()
}

// on_chunk is called multiple times, once per chunk of received content.
pub fn (mut d TerminalStreamingDownloader) on_chunk(request &Request, chunk []u8, already_received u64, expected u64) ! {
now := time.now()
elapsed := now - d.start_time
// delta_elapsed := now - d.past_time
// delta_bytes := already_received - d.past_received
d.past_time = now
d.past_received = already_received
ratio := f64(already_received) / f64(expected)
estimated := time.Duration(i64(f64(elapsed) / ratio))
speed := f64(time.millisecond) * f64(already_received) / f64(elapsed)
elapsed_s := elapsed.seconds()
estimated_s := estimated.seconds()
eta_s := f64_max(estimated_s - elapsed_s, 0.0)

d.SilentStreamingDownloader.on_chunk(request, chunk, already_received, expected)!
print('\rDownloading to `${d.path}` ${100.0 * ratio:6.2f}%, ${f64(already_received) / (1024 * 1024):7.3f}/${f64(expected) / (1024 * 1024):-7.3f}MB, ${speed:6.0f}KB/s, elapsed: ${elapsed_s:6.0f}s, eta: ${eta_s:6.0f}s')
flush_stdout()
}

// on_finish is called once at the end of the download.
pub fn (mut d TerminalStreamingDownloader) on_finish(request &Request, response &Response) ! {
d.SilentStreamingDownloader.on_finish(request, response)!
println('')
flush_stdout()
}
37 changes: 26 additions & 11 deletions vlib/net/http/http.v
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import net.urllib
const max_redirects = 16 // safari max - other browsers allow up to 20

const content_type_default = 'text/plain'
const bufsize = 1536

const bufsize = 64 * 1024

// FetchConfig holds configuration data for the fetch function.
pub struct FetchConfig {
Expand All @@ -31,10 +32,14 @@ pub mut:
in_memory_verification bool // if true, verify, cert, and cert_key are read from memory, not from a file
allow_redirect bool = true // whether to allow redirect
max_retries int = 5 // maximum number of retries required when an underlying socket error occurs
// callbacks to allow custom reporting code to run, while the request is running
on_redirect RequestRedirectFn = unsafe { nil }
on_progress RequestProgressFn = unsafe { nil }
on_finish RequestFinishFn = unsafe { nil }
// callbacks to allow custom reporting code to run, while the request is running, and to implement streaming
on_redirect RequestRedirectFn = unsafe { nil }
on_progress RequestProgressFn = unsafe { nil }
on_progress_body RequestProgressBodyFn = unsafe { nil }
on_finish RequestFinishFn = unsafe { nil }
//
stop_copying_limit i64 = -1 // after this many bytes are received, stop copying to the response. Note that on_progress and on_progress_body callbacks, will continue to fire normally, until the full response is read, which allows you to implement streaming downloads, without keeping the whole big response in memory
stop_receiving_limit i64 = -1 // after this many bytes are received, break out of the loop that reads the response, effectively stopping the request early. No more on_progress callbacks will be fired. The on_finish callback will fire.
}

// new_request creates a new Request given the request `method`, `url_`, and
Expand Down Expand Up @@ -153,10 +158,10 @@ pub fn delete(url string) !Response {
return fetch(method: .delete, url: url)
}

// TODO: @[noinline] attribute is used for temporary fix the 'get_text()' intermittent segfault / nil value when compiling with GCC 13.2.x and -prod option ( Issue #20506 )
// fetch sends an HTTP request to the `url` with the given method and configuration.
@[noinline]
pub fn fetch(config FetchConfig) !Response {
// prepare prepares a new request for fetching, but does not call its .do() method.
// It is useful, if you want to reuse request objects, for several requests in a row,
// modifying the request each time, then calling .do() to get the new response.
pub fn prepare(config FetchConfig) !Request {
if config.url == '' {
return error('http.fetch: empty url')
}
Expand All @@ -179,11 +184,21 @@ pub fn fetch(config FetchConfig) !Response {
allow_redirect: config.allow_redirect
max_retries: config.max_retries
on_progress: config.on_progress
on_progress_body: config.on_progress_body
on_redirect: config.on_redirect
on_finish: config.on_finish
stop_copying_limit: config.stop_copying_limit
stop_receiving_limit: config.stop_receiving_limit
}
res := req.do()!
return res
return req
}

// TODO: @[noinline] attribute is used for temporary fix the 'get_text()' intermittent segfault / nil value when compiling with GCC 13.2.x and -prod option ( Issue #20506 )
// fetch sends an HTTP request to the `url` with the given method and configuration.
@[noinline]
pub fn fetch(config FetchConfig) !Response {
req := prepare(config)!
return req.do()!
}

// get_text sends an HTTP GET request to the given `url` and returns the text content of the response.
Expand Down
Loading

0 comments on commit 8504bea

Please sign in to comment.