Skip to content

Commit

Permalink
[#120] Partial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jfacorro committed Nov 19, 2015
1 parent db0c995 commit 445dc10
Showing 1 changed file with 127 additions and 80 deletions.
207 changes: 127 additions & 80 deletions src/shotgun.erl
Original file line number Diff line number Diff line change
Expand Up @@ -265,26 +265,32 @@ request(Pid, get, Uri, Headers0, Body, Options) ->
async := IsAsync,
async_mode := AsyncMode,
headers := Headers,
timeout := Timeout} = process_options(Options, Headers0, get),
timeout := Timeout} = process_options(Options, Headers0, get, Body),

Request = #{uri => Uri, headers => Headers, body => Body},

Event = case IsAsync of
true ->
{get_async,
{HandleEvent, AsyncMode},
{Uri, Headers, Body}};
false ->
{get, {Uri, Headers, Body}}
true -> { get_async
, {HandleEvent, AsyncMode}
, Request
};
false -> {get, Request}
end,

gen_fsm:sync_send_event(Pid, Event, Timeout)
catch
_:Reason -> {error, Reason}
end;
request(Pid, Method, Uri, Headers0, Body, Options) ->
try
check_uri(Uri),
#{headers := Headers, timeout := Timeout} =
process_options(Options, Headers0, Method),
Event = {Method, {Uri, Headers, Body}},
#{ headers := Headers
, timeout := Timeout
} = process_options(Options, Headers0, Method, Body),

Request = #{uri => Uri, headers => Headers, body => Body},
Event = {Method, Request},

gen_fsm:sync_send_event(Pid, Event, Timeout)
catch
_:Reason -> {error, Reason}
Expand Down Expand Up @@ -349,17 +355,18 @@ init([Host, Port, Type, Opts]) ->
Timeout = maps:get(timeout, Opts, 5000),
{ok, Pid} = gun:open(Host, Port, GunOpts),
case gun:await_up(Pid, Timeout) of
{ok, _} ->
State = clean_state(),
{ok, at_rest, State#{pid => Pid}};
%The only apparent timeout for gun:open is the connection timeout of the
%underlying transport. So, a timeout message here comes from gun:await_up.
{error, timeout} ->
{stop, gun_open_timeout};
%gun currently terminates with reason normal if gun:open fails to open
%the requested connection. This bubbles up through gun:await_up.
{error, normal} ->
{stop, gun_open_failed}
{ok, _} ->
State = clean_state(),
{ok, at_rest, State#{pid => Pid}};
%% The only apparent timeout for gun:open is the connection timeout of
%% the underlying transport. So, a timeout message here comes from
%% gun:await_up.
{error, timeout} ->
{stop, gun_open_timeout};
%% gun currently terminates with reason normal if gun:open fails to
%% open the requested connection. This bubbles up through gun:await_up.
{error, normal} ->
{stop, gun_open_failed}
end.

%% @private
Expand Down Expand Up @@ -422,110 +429,133 @@ at_rest(Event, From, State) ->

%% @private
-spec wait_response(term(), pid(), term()) -> term().
wait_response( {data, Data, FinNoFin}
, _From
wait_response({data, Data, FinNoFin}
, From
, #{stream := StreamRef, pid := Pid} = State) ->
ok = gun:data(Pid, StreamRef, FinNoFin, Data),
{reply, ok, wait_response, State};

NewState = State#{from => From},
case FinNoFin of
fin -> {next_state, wait_response, NewState};
nofin -> {reply, ok, wait_response, NewState}
end;
wait_response(Event, From, State) ->
enqueue_work_or_stop(wait_response, Event, From, State).

%% @private
-spec receive_data(term(), pid(), term()) -> term().
receive_data({data, _, _} = Event, _From, State) ->
unexpected_event_warning(at_rest, Event),
unexpected_event_warning(receive_data, Event),
{reply, {error, unexpected}, receive_data, State};
receive_data(Event, From, State) ->
enqueue_work_or_stop(receive_data, Event, From, State).

%% @private
-spec receive_chunk(term(), pid(), term()) -> term().
receive_chunk({data, _, _} = Event, _From, State) ->
unexpected_event_warning(at_rest, Event),
unexpected_event_warning(receive_chunk, Event),
{reply, {error, unexpected}, receive_chunk, State};
receive_chunk(Event, From, State) ->
enqueue_work_or_stop(receive_chunk, Event, From, State).

%See if we have work. If we do, dispatch.
%If we don't, stay in at_rest.
%% @doc See if we have work. If we do, dispatch.
%% If we don't, stay in at_rest.
%% @private
-spec at_rest(any(), state()) -> {next_state, atom(), state()}.
at_rest(timeout, State) ->
case get_work(State) of
no_work ->
{next_state, at_rest, State};
{ok, Work, NewState} ->
ok = gen_fsm:send_event(self(), Work),
{next_state, at_rest, NewState}
end;
at_rest({get_async, {HandleEvent, AsyncMode}, Args, From},
NewState = case get_work(State) of
no_work ->
State;
{ok, Work, State1} ->
ok = gen_fsm:send_event(self(), Work),
State1
end,

{next_state, at_rest, NewState};
at_rest({get_async, {HandleEvent, AsyncMode}, Request, From},
State = #{pid := Pid}) ->
StreamRef = do_http_verb(get, Pid, Args),
StreamRef = do_http_verb(get, Pid, Request),
CleanState = clean_state(State),
NewState = CleanState#{
from => From,
pid => Pid,
stream => StreamRef,
handle_event => HandleEvent,
async => true,
async_mode => AsyncMode
},
NewState = CleanState#{ from => From
, pid => Pid
, stream => StreamRef
, handle_event => HandleEvent
, async => true
, async_mode => AsyncMode
, request => Request
},

{next_state, wait_response, NewState};
at_rest({HttpVerb, {_, _, Body} = Args, From}, State = #{pid := Pid}) ->
StreamRef = do_http_verb(HttpVerb, Pid, Args),
at_rest({HttpVerb, Request, From}, State = #{pid := Pid}) ->
StreamRef = do_http_verb(HttpVerb, Pid, Request),
CleanState = clean_state(State),
NewState = CleanState#{ pid => Pid
, stream => StreamRef
, from => From
NewState = CleanState#{ pid => Pid
, stream => StreamRef
, from => From
, request => Request
},
case Body of
body_chunked ->
gen_fsm:send_event(self(), body_chunked);
_ -> ok
end,

{next_state, wait_response, NewState}.

%% @private
-spec wait_response(term(), term()) -> term().
wait_response({'DOWN', _, _, _, Reason}, _State) ->
exit(Reason);
wait_response({gun_response, _Pid, StreamRef, nofin, StatusCode, Headers},
#{ from := From
, request := #{body := ReqBody}
, stream := StreamRef
, async := Async} = State) ->
io:format("gun_response nofin ~p~n", [StatusCode]),

Response = #{ status_code => StatusCode
, headers => Headers
, body => undefined
},
Result = {ok, Response},

TransferEncoding = lists:keyfind(<<"transfer-encoding">>, 1, Headers),
NextStateName =
case {TransferEncoding, ReqBody} of
{{_, <<"chunked">>}, _} when Async == true ->
gen_fsm:reply(From, Result),
receive_chunk;
{_, body_chunked} ->
gen_fsm:reply(From, Result),
wait_response;
_ ->
receive_data
end,
io:format("gun_response nofin next_state ~p~n", [NextStateName]),

{ next_state
, NextStateName
, State#{status_code := StatusCode, headers := Headers}
};
wait_response({gun_response, _Pid, _StreamRef, fin, StatusCode, Headers},
#{from := From,
async := Async,
responses := Responses} = State) ->
Response = #{status_code => StatusCode, headers => Headers},
io:format("gun_response fin - aync? ~p~n", [Async]),
Response = #{ status_code => StatusCode
, headers => Headers
, body => undefined},
NewResponses =
case Async of
false ->
io:format("gun_response fin - replying to ~p~n", [From]),
gen_fsm:reply(From, {ok, Response}),
Responses;
true ->
gen_fsm:reply(From, {ok, Response})
queue:in(Response, Responses)
end,

{next_state, at_rest, State#{responses => NewResponses}, 0};
wait_response({gun_response, _Pid, _StreamRef, nofin, StatusCode, Headers},
#{from := From, stream := StreamRef, async := Async} = State) ->
StateName =
case lists:keyfind(<<"transfer-encoding">>, 1, Headers) of
{<<"transfer-encoding">>, <<"chunked">>} when Async == true->
Result = {ok, StreamRef},
gen_fsm:reply(From, Result),
receive_chunk;
_ ->
receive_data
end,
{ next_state
, StateName
, State#{status_code := StatusCode, headers := Headers}
};
wait_response({gun_error, _Pid, _StreamRef, Error},
#{from := From} = State) ->
io:format("gun_error~n"),
gen_fsm:reply(From, {error, Error}),
{next_state, at_rest, State, 0};
wait_response(body_chunked,
#{stream := StreamRef, from := From} = State) ->
gen_fsm:reply(From, {ok, StreamRef}),
{next_state, wait_response, State};
wait_response(Event, State) ->
{stop, {unexpected, Event}, State}.

Expand All @@ -536,11 +566,13 @@ receive_data({'DOWN', _, _, _, _Reason}, _State) ->
error(incomplete);
receive_data({gun_data, _Pid, StreamRef, nofin, Data},
#{stream := StreamRef, data := DataAcc} = State) ->
io:format("gun_data nofin ~p~n", [Data]),
NewData = <<DataAcc/binary, Data/binary>>,
{next_state, receive_data, State#{data => NewData}};
receive_data({gun_data, _Pid, _StreamRef, fin, Data},
#{data := DataAcc, from := From, status_code
:= StatusCode, headers := Headers} = State) ->
io:format("gun_data fin ~p~n", [Data]),
NewData = <<DataAcc/binary, Data/binary>>,
Result = {ok, #{status_code => StatusCode,
headers => Headers,
Expand Down Expand Up @@ -597,9 +629,16 @@ clean_state(Reqs) ->

%% @private
-spec do_http_verb(http_verb(), pid(), tuple()) -> reference().
do_http_verb(Method, Pid, {Uri, Headers, body_chunked}) ->
do_http_verb(Method, Pid, #{body := body_chunked} = Request) ->
#{ uri := Uri
, headers := Headers
} = Request,
gun:request(Pid, http_verb_bin(Method), Uri, Headers);
do_http_verb(Method, Pid, {Uri, Headers, Body}) ->
do_http_verb(Method, Pid, Request) ->
#{ uri := Uri
, headers := Headers
, body := Body
} = Request,
gun:request(Pid, http_verb_bin(Method), Uri, Headers, Body).

-spec http_verb_bin(atom()) -> binary().
Expand Down Expand Up @@ -639,8 +678,9 @@ manage_chunk(IsFin, StreamRef, Data,
NewState.

%% @private
process_options(Options, HeadersMap, HttpVerb) ->
Headers = basic_auth_header(HeadersMap),
process_options(Options, Headers0, HttpVerb, Body) ->
Headers1 = basic_auth_header(Headers0),
Headers = expect_header(Headers1, Body),
HandleEvent = maps:get(handle_event, Options, undefined),
Async = maps:get(async, Options, false),
AsyncMode = maps:get(async_mode, Options, binary),
Expand All @@ -650,6 +690,7 @@ process_options(Options, HeadersMap, HttpVerb) ->
{true, Other} -> throw({async_unsupported, Other});
_ -> ok
end,

#{handle_event => HandleEvent,
async => Async,
async_mode => AsyncMode,
Expand All @@ -676,6 +717,12 @@ encode_basic_auth([], []) ->
encode_basic_auth(Username, Password) ->
base64:encode(Username ++ [$: | Password]).

%% @private
expect_header(Headers, body_chunked) ->
[{<<"Expect">>, <<"100-continue">>} | Headers];
expect_header(Headers, _) ->
Headers.

%% @private
sse_events(IsFin, Data, State = #{buffer := Buffer}) ->
NewBuffer = <<Buffer/binary, Data/binary>>,
Expand Down

0 comments on commit 445dc10

Please sign in to comment.