Skip to content
14 changes: 14 additions & 0 deletions notes.org
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
* New Architecture
Based around how ~http_open/3~ works with keep-alive.

Keep a pool of connections for the given host & port; each call to put/post/whatever checks if there's already a connection & re-uses that state.

I guess still allow setting headers to control caching?

Do we still have a worker thread?

Maybe have an option to either synchronously wait for the given request to finish or give a hook to run async?

Have the request thing return a fake stream & the client worker thread keeps a map of HTTP2 stream id to Prolog stream & can just write the decoded data as it goes?

I think compatibility with the ~http_open~ API would require it to be blocking per request though, to fill out the headers at least.
4 changes: 2 additions & 2 deletions prolog/http2_client.pl
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
http2_open(URL, Http2Ctx, Options) :-
% Open TLS connection
parse_url(URL, [protocol(https),host(Host)|Attrs]),
(memberchk(port(Port), Attrs) ; Port = 443), !,
( memberchk(port(Port), Attrs) ; Port = 443 ), !,
debug(http2_client(open), "URL ~w -> Host ~w:~w", [URL, Host, Port]),
ssl_context(client, Ctx, [host(Host),
close_parent(true),
Expand All @@ -95,7 +95,7 @@
send_frame(Stream, settings_frame([enable_push-0])),
flush_output(Stream),
% XXX: ...then we read a SETTINGS from from server & ACK it
(memberchk(close_cb(CloseCb), Options), ! ; CloseCb = default_close_cb),
( memberchk(close_cb(CloseCb), Options), ! ; CloseCb = default_close_cb ),
make_http2_state([authority(Host),
stream(Stream),
close_cb(CloseCb)],
Expand Down
141 changes: 141 additions & 0 deletions prolog/simple_client.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
:- module(simple_client, [http2_simple_open/3]).

:- use_module(library(apply_macros)).
:- use_module(library(apply), [convlist/3,
maplist/3]).
:- use_module(http2_client, [http2_close/1,
http2_open/3,
http2_request/4]).
:- use_module(library(unix), [pipe/2]).
:- use_module(library(url), [parse_url/2]).
:- use_module(library(pcre), [re_replace/4]).

:- dynamic existing_url_context/2.

unwrap_header(Wrapped, Unwrapped) :-
Wrapped =.. [_, Unwrapped].

simple_complete_cb(ThreadId, OutStream, WrappedHeaders, Body) :-
format(OutStream, "~s", [Body]),
close(OutStream),
maplist(unwrap_header, WrappedHeaders, Headers),
thread_send_message(ThreadId, finished(Headers)).

close_cb(BaseURL, _Ctx, _Data) :-
debug(xxx, "Closing ~w", [BaseURL]),
retractall(existing_url_context(BaseURL, _)).

url_base_path(URL, Base, Path) :-
parse_url(URL, [protocol(Proto), host(Host)|URLAttrs]),
( memberchk(port(PortN), URLAttrs)
-> format(string(Port), ":~w", [PortN])
; Port = ""
),
format(string(Base), "~w://~w~w", [Proto, Host, Port]),
memberchk(path(Path_), URLAttrs),
( memberchk(search(Search), URLAttrs)
-> ( parse_url_search(Qcs, Search),
format(string(Query), "?~s", [Qcs]) )
; Query = "" ),
( memberchk(fragment(Frag), URLAttrs)
-> format(string(Fragment), "#~w", [Frag])
; Fragment = "" ),
atomic_list_concat([Path_, Query, Fragment], '', Path).

canonical_header(Header, CanonicalHeader) :-
string_lower(Header, HeaderLower),
re_replace("-"/g, "_", HeaderLower, CanonicalHeader).

extract_headers(Options, Headers) :-
convlist(extract_header(Headers), Options, _).

extract_header(Headers, header(Key, Value), _) :-
canonical_header(Key, CKey),
member(Header-V, Headers),
canonical_header(Header, CKey), !,
Value = V.
extract_header(_Headers, header(_, Value), _) :-
Value = ''.
extract_header(Headers, status_code(Code), _) :-
memberchk(':status'-Code, Headers).
extract_header(Headers, size(Size), _) :-
member(Header-V, Headers),
canonical_header(Header, "content_length"),
V = Size.
extract_header(_, version(2), _).

options_headers(OpenOptions, Headers) :-
convlist(option_header, OpenOptions, Headers).

option_header(user_agent(Agent), 'user-agent'-Agent).
option_header(request_header(Name=Value), Name-Value).

url_context(BaseURL, Ctx) :-
existing_url_context(BaseURL, Ctx), !.
url_context(BaseURL, Ctx) :-
debug(xxx, "Opening new connection ~w", [BaseURL]),
http2_open(BaseURL, Ctx, [close_cb(simple_client:close_cb(BaseURL, Ctx))]),
assertz(existing_url_context(BaseURL, Ctx)).

http2_simple_open(URL, Read, Options) :-
url_base_path(URL, BaseURL, Path),

url_context(BaseURL, Ctx),

pipe(Read, Write),

( memberchk(method(Meth), Options) ; Meth = get ),
( memberchk(headers(RespHeaders), Options) ; RespHeaders = _ ),
string_upper(Meth, Method),
thread_self(ThisId),

options_headers(Options, Headers),

( memberchk(post(Data), Options) ; Data = [] ),

http2_request(Ctx, [':method'-Method, ':path'-Path|Headers],
Data,
simple_complete_cb(ThisId, Write)),
thread_get_message(finished(RespHeaders)),
extract_headers(Options, RespHeaders).

http2_simple_close(URL) :-
url_base_path(URL, Base, _),
existing_url_context(Base, Ctx), !,
http2_close(Ctx).
http2_simple_close(_).

% Sample usage
test :-
debug(xxx),
http2_simple_open('https://occasionallycogent.com/entries.html',
Stream,
[headers(Headers),
header('Content-Length', ContentLen),
header(x_frame_options, FrameOpts),
header(some_other_thing, Nope),
status_code(Status),
size(Size)
]),
read_string(Stream, 50, Body),
debug(xxx, "body ~w", [Body]),
debug(xxx, "Status code ~w", [Status]),
debug(xxx, "response headers ~w", [Headers]),
debug(xxx, "Frame opts ~w", [FrameOpts]),
debug(xxx, "content len ~w (= ~w)", [ContentLen, Size]),
debug(xxx, "nonexistant header ~k", [Nope]),
close(Stream),

% reusing the same connection
http2_simple_open('https://occasionallycogent.com/index.html', Stream2, []),
read_string(Stream2, 50, Body2),
close(Stream2),
debug(xxx, "index body ~w", [Body2]),

% reusing the same connection
http2_simple_open('https://occasionallycogent.com/about.html', Stream3, []),
read_string(Stream3, 50, Body3),
close(Stream3),
debug(xxx, "about body ~w", [Body3]),

http2_simple_close('https://nghttp2.org').