Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net.http: implement http.download_file_with_progress/2, saving each chunk, as it is received, without growing the memory usage #21633

Merged
merged 4 commits into from
Jun 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions cmd/tools/vdownload.v
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import crypto.sha256
struct Context {
mut:
show_help bool
show_sha1 bool
show_sha256 bool
target_folder string
continue_on_failure bool
retries int
Expand All @@ -27,6 +29,8 @@ fn main() {
fp.limit_free_args_to_at_least(1)!
ctx.show_help = fp.bool('help', `h`, false, 'Show this help screen.')
ctx.target_folder = fp.string('target-folder', `t`, '.', 'The target folder, where the file will be stored. It will be created, if it does not exist. Default is current folder.')
ctx.show_sha1 = fp.bool('sha1', `1`, false, 'Show the SHA1 hash of the downloaded file.')
ctx.show_sha256 = fp.bool('sha256', `2`, false, 'Show the SHA256 hash of the downloaded file.')
ctx.continue_on_failure = fp.bool('continue', `c`, false, 'Continue on download failures. If you download 5 URLs, and several of them fail, continue without error. False by default.')
ctx.retries = fp.int('retries', `r`, 10, 'Number of retries, when an URL fails to download.')
ctx.delay = time.Duration(u64(fp.float('delay', `d`, 1.0, 'Delay in seconds, after each retry.') * time.second))
Expand All @@ -49,13 +53,18 @@ fn main() {
sw := time.new_stopwatch()
mut errors := 0
mut downloaded := 0
downloader := if os.is_atty(1) > 0 {
&http.Downloader(http.TerminalStreamingDownloader{})
} else {
&http.Downloader(http.SilentStreamingDownloader{})
}
for idx, url in ctx.urls {
fname := url.all_after_last('/')
fpath := '${ctx.target_folder}/${fname}'
mut file_errors := 0
log.info('Downloading [${idx + 1}/${ctx.urls.len}] from url: ${url} to ${fpath} ...')
for retry in 0 .. ctx.retries {
http.download_file(url, fname) or {
http.download_file_with_progress(url, fname, downloader: downloader) or {
log.error(' retry ${retry + 1}/${ctx.retries}, failed downloading from url: ${url}. Error: ${err}.')
file_errors++
time.sleep(ctx.delay)
Expand All @@ -77,18 +86,26 @@ fn main() {
log.info(' Finished downloading file: ${fpath} .')
log.info(' size: ${fstat.size} bytes')

if !ctx.show_sha256 && !ctx.show_sha1 {
continue
}

fbytes := os.read_bytes(fname)!
mut digest256 := sha256.new()
digest256.write(fbytes)!
mut sum256 := digest256.sum([])
hash256 := sum256.hex()
if ctx.show_sha1 {
mut digest1 := sha1.new()
digest1.write(fbytes)!
mut sum1 := digest1.sum([])
hash1 := sum1.hex()
log.info(' SHA1: ${hash1}')
}

mut digest1 := sha1.new()
digest1.write(fbytes)!
mut sum1 := digest1.sum([])
hash1 := sum1.hex()
log.info(' SHA1: ${hash1}')
log.info(' SHA256: ${hash256}')
if ctx.show_sha256 {
mut digest256 := sha256.new()
digest256.write(fbytes)!
mut sum256 := digest256.sum([])
hash256 := sum256.hex()
log.info(' SHA256: ${hash256}')
}
}
println('Downloaded: ${downloaded} file(s) . Elapsed time: ${sw.elapsed()} . Errors: ${errors} .')
if !ctx.continue_on_failure && errors > 0 {
Expand Down
37 changes: 13 additions & 24 deletions vlib/net/http/backend.c.v
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,29 @@ fn net_ssl_do(req &Request, port int, method Method, host_name string, path stri

req_headers := req.build_request_headers(method, host_name, path)
$if trace_http_request ? {
eprintln('> ${req_headers}')
eprint('> ')
eprint(req_headers)
eprintln('')
}

return req.do_request(req_headers, mut ssl_conn)!
}

fn read_from_ssl_connection_cb(con voidptr, buf &u8, bufsize int) !int {
mut ssl_conn := unsafe { &ssl.SSLConn(con) }
return ssl_conn.socket_read_into_ptr(buf, bufsize)
}

fn (req &Request) do_request(req_headers string, mut ssl_conn ssl.SSLConn) !Response {
ssl_conn.write_string(req_headers) or { return err }

mut content := strings.new_builder(100)
mut buff := [bufsize]u8{}
bp := unsafe { &buff[0] }
mut readcounter := 0
for {
readcounter++
len := ssl_conn.socket_read_into_ptr(bp, bufsize) or { break }
$if debug_http ? {
eprintln('ssl_do, read ${readcounter:4d} | len: ${len}')
eprintln('-'.repeat(20))
eprintln(unsafe { tos(bp, len) })
eprintln('-'.repeat(20))
}
if len <= 0 {
break
}
unsafe { content.write_ptr(bp, len) }
if req.on_progress != unsafe { nil } {
req.on_progress(req, content[content.len - len..], u64(content.len))!
}
}
mut content := strings.new_builder(4096)
req.receive_all_data_from_cb_in_builder(mut content, voidptr(ssl_conn), read_from_ssl_connection_cb)!
ssl_conn.shutdown()!
response_text := content.str()
$if trace_http_response ? {
eprintln('< ${response_text}')
eprint('< ')
eprint(response_text)
eprinln('')
}
if req.on_finish != unsafe { nil } {
req.on_finish(req, u64(response_text.len))!
Expand Down
5 changes: 0 additions & 5 deletions vlib/net/http/download.v
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ pub fn download_file(url string, out_file_path string) ! {
os.write_file(out_file_path, s.body)!
}

// TODO: implement download_file_with_progress
// type DownloadChunkFn = fn (written int)
// type DownloadFinishedFn = fn ()
// pub fn download_file_with_progress(url string, out_file_path string, cb_chunk DownloadChunkFn, cb_finished DownloadFinishedFn)

pub fn download_file_with_cookies(url string, out_file_path string, cookies map[string]string) ! {
$if debug_http ? {
println('http.download_file url=${url} out_file_path=${out_file_path}')
Expand Down
77 changes: 77 additions & 0 deletions vlib/net/http/download_progress.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
module http

// Downloader is the interface that you have to implement, if you need to customise
// how download_file_with_progress works, and what output it produces while a file
// is downloaded.
pub interface Downloader {
mut:
// Called once, at the start of the streaming download. You can do setup here,
// like opening a target file, changing request.stop_copying_limit to a different value,
// if you need it.
on_start(mut request Request, path string) !
// Called many times, once a chunk of data is received
on_chunk(request &Request, chunk []u8, already_received u64, expected u64) !
// Called once, at the end of the streaming download. Do cleanup here,
// like closing a file (opened in on_start), reporting stats etc.
on_finish(request &Request, response &Response) !
}

// DownloaderParams is similar to FetchConfig, but it also allows you to pass
// a `downloader: your_downloader_instance` parameter.
// See also http.SilentStreamingDownloader, and http.TerminalStreamingDownloader .
@[params]
pub struct DownloaderParams {
FetchConfig
pub mut:
downloader &Downloader = TerminalStreamingDownloader{}
}

// download_file_with_progress will save the URL `url` to the filepath `path` .
// Unlike download_file/2, it *does not* load the whole content in memory, but
// instead streams it chunk by chunk to the target `path`, as the chunks are received
// from the network. This makes it suitable for downloading big files, *without* increasing
// the memory consumption of your application.
//
// By default, it will also show a progress line, while the download happens.
// If you do not want a status line, you can call it like this:
// `http.download_file_with_progress(url, path, downloader: http.SilentStreamingDownloader{})`,
// or you can implement your own http.Downloader and pass that instead.
//
// Note: the returned response by this function, will have a truncated .body, after the first
// few KBs, because it does not accumulate all its data in memory, instead relying on the
// downloaders to save the received data chunk by chunk. You can parametrise this by
// using `stop_copying_limit:` but you need to pass a number that is big enough to fit
// at least all headers in the response, otherwise the parsing of the response at the end will
// fail, despite saving all the data in the file before that. The default is 65536 bytes.
pub fn download_file_with_progress(url string, path string, params DownloaderParams) !Response {
mut d := unsafe { params.downloader }
mut config := params.FetchConfig
config.url = url
config.user_ptr = voidptr(d)
config.on_progress_body = download_progres_cb
if config.stop_copying_limit == -1 {
// leave more than enough space for potential redirect headers
config.stop_copying_limit = 65536
}
mut req := prepare(config)!
d.on_start(mut req, path)!
response := req.do()!
d.on_finish(req, response)!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spytheman great work, thanks!

Do you think it would be wise to guarantee on_finish() gets always called if on_start() finished successfully by defering it?

return response
}

const zz = &Downloader(unsafe { nil })

fn download_progres_cb(request &Request, chunk []u8, body_so_far u64, expected_size u64, status_code int) ! {
// TODO: remove this hack, when `unsafe { &Downloader( request.user_ptr ) }` works reliably,
// by just casting, without trying to promote the argument to the heap at all.
mut d := unsafe { http.zz }
pd := unsafe { &voidptr(&d) }
unsafe {
*pd = request.user_ptr
}
if status_code == 200 {
// ignore redirects, we are interested in the chunks of the final file:
d.on_chunk(request, chunk, body_so_far, expected_size)!
}
}
28 changes: 28 additions & 0 deletions vlib/net/http/download_silent_downloader.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module http

import os

// SilentStreamingDownloader just saves the downloaded file chunks to the given path.
// It does *no reporting at all*.
// Note: the folder part of the path should already exist, and has to be writable.
pub struct SilentStreamingDownloader {
pub mut:
path string
f os.File
}

// on_start is called once at the start of the download.
pub fn (mut d SilentStreamingDownloader) on_start(mut request Request, path string) ! {
d.path = path
d.f = os.create(path)!
}

// on_chunk is called multiple times, once per chunk of received content.
pub fn (mut d SilentStreamingDownloader) on_chunk(request &Request, chunk []u8, already_received u64, expected u64) ! {
d.f.write(chunk)!
}

// on_finish is called once at the end of the download.
pub fn (mut d SilentStreamingDownloader) on_finish(request &Request, response &Response) ! {
d.f.close()
}
46 changes: 46 additions & 0 deletions vlib/net/http/download_terminal_downloader.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
module http

import time

// TerminalStreamingDownloader is the same as http.SilentStreamingDownloader, but produces a progress line on stdout.
pub struct TerminalStreamingDownloader {
SilentStreamingDownloader
mut:
start_time time.Time
past_time time.Time
past_received u64
}

// on_start is called once at the start of the download.
pub fn (mut d TerminalStreamingDownloader) on_start(mut request Request, path string) ! {
d.SilentStreamingDownloader.on_start(mut request, path)!
d.start_time = time.now()
d.past_time = time.now()
}

// on_chunk is called multiple times, once per chunk of received content.
pub fn (mut d TerminalStreamingDownloader) on_chunk(request &Request, chunk []u8, already_received u64, expected u64) ! {
now := time.now()
elapsed := now - d.start_time
// delta_elapsed := now - d.past_time
// delta_bytes := already_received - d.past_received
d.past_time = now
d.past_received = already_received
ratio := f64(already_received) / f64(expected)
estimated := time.Duration(i64(f64(elapsed) / ratio))
speed := f64(time.millisecond) * f64(already_received) / f64(elapsed)
elapsed_s := elapsed.seconds()
estimated_s := estimated.seconds()
eta_s := f64_max(estimated_s - elapsed_s, 0.0)

d.SilentStreamingDownloader.on_chunk(request, chunk, already_received, expected)!
print('\rDownloading to `${d.path}` ${100.0 * ratio:6.2f}%, ${f64(already_received) / (1024 * 1024):7.3f}/${f64(expected) / (1024 * 1024):-7.3f}MB, ${speed:6.0f}KB/s, elapsed: ${elapsed_s:6.0f}s, eta: ${eta_s:6.0f}s')
flush_stdout()
}

// on_finish is called once at the end of the download.
pub fn (mut d TerminalStreamingDownloader) on_finish(request &Request, response &Response) ! {
d.SilentStreamingDownloader.on_finish(request, response)!
println('')
flush_stdout()
}
37 changes: 26 additions & 11 deletions vlib/net/http/http.v
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import net.urllib
const max_redirects = 16 // safari max - other browsers allow up to 20

const content_type_default = 'text/plain'
const bufsize = 1536

const bufsize = 64 * 1024

// FetchConfig holds configuration data for the fetch function.
pub struct FetchConfig {
Expand All @@ -31,10 +32,14 @@ pub mut:
in_memory_verification bool // if true, verify, cert, and cert_key are read from memory, not from a file
allow_redirect bool = true // whether to allow redirect
max_retries int = 5 // maximum number of retries required when an underlying socket error occurs
// callbacks to allow custom reporting code to run, while the request is running
on_redirect RequestRedirectFn = unsafe { nil }
on_progress RequestProgressFn = unsafe { nil }
on_finish RequestFinishFn = unsafe { nil }
// callbacks to allow custom reporting code to run, while the request is running, and to implement streaming
on_redirect RequestRedirectFn = unsafe { nil }
on_progress RequestProgressFn = unsafe { nil }
on_progress_body RequestProgressBodyFn = unsafe { nil }
on_finish RequestFinishFn = unsafe { nil }
//
stop_copying_limit i64 = -1 // after this many bytes are received, stop copying to the response. Note that on_progress and on_progress_body callbacks, will continue to fire normally, until the full response is read, which allows you to implement streaming downloads, without keeping the whole big response in memory
stop_receiving_limit i64 = -1 // after this many bytes are received, break out of the loop that reads the response, effectively stopping the request early. No more on_progress callbacks will be fired. The on_finish callback will fire.
}

// new_request creates a new Request given the request `method`, `url_`, and
Expand Down Expand Up @@ -153,10 +158,10 @@ pub fn delete(url string) !Response {
return fetch(method: .delete, url: url)
}

// TODO: @[noinline] attribute is used for temporary fix the 'get_text()' intermittent segfault / nil value when compiling with GCC 13.2.x and -prod option ( Issue #20506 )
// fetch sends an HTTP request to the `url` with the given method and configuration.
@[noinline]
pub fn fetch(config FetchConfig) !Response {
// prepare prepares a new request for fetching, but does not call its .do() method.
// It is useful, if you want to reuse request objects, for several requests in a row,
// modifying the request each time, then calling .do() to get the new response.
pub fn prepare(config FetchConfig) !Request {
if config.url == '' {
return error('http.fetch: empty url')
}
Expand All @@ -179,11 +184,21 @@ pub fn fetch(config FetchConfig) !Response {
allow_redirect: config.allow_redirect
max_retries: config.max_retries
on_progress: config.on_progress
on_progress_body: config.on_progress_body
on_redirect: config.on_redirect
on_finish: config.on_finish
stop_copying_limit: config.stop_copying_limit
stop_receiving_limit: config.stop_receiving_limit
}
res := req.do()!
return res
return req
}

// TODO: @[noinline] attribute is used for temporary fix the 'get_text()' intermittent segfault / nil value when compiling with GCC 13.2.x and -prod option ( Issue #20506 )
// fetch sends an HTTP request to the `url` with the given method and configuration.
@[noinline]
pub fn fetch(config FetchConfig) !Response {
req := prepare(config)!
return req.do()!
}

// get_text sends an HTTP GET request to the given `url` and returns the text content of the response.
Expand Down
Loading
Loading