Skip to content

Commit

Permalink
Merge pull request #208 from inaka/run_api
Browse files Browse the repository at this point in the history
Extend the API with opaque runs
  • Loading branch information
elbrujohalcon authored Sep 23, 2024
2 parents 2f260af + 66a6d15 commit cd2e97f
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 43 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"-no_auto_compile -dir ebin -logdir log/ct --erl_args -smp enable -boot start_sasl"},
{cover_enabled, true},
{cover_export_enabled, true},
{cover_opts, [verbose]},
{cover_opts, [verbose, {min_coverage, 92}]},
{ct_opts, [{verbose, true}]},
{deps, [{katana, "1.0.0"}, {mixer, "1.2.0", {pkg, inaka_mixer}}, {meck, "0.9.2"}]},
{dialyzer,
Expand Down
76 changes: 67 additions & 9 deletions src/wpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
%%
%% Defaults to `5'. See {@link wpool_pool} for more details.

-type queue_type() :: wpool_queue_manager:queue_type().
-type queue_type() :: fifo | lifo.
%% Order in which requests will be stored and handled by workers.
%%
%% This option can take values `lifo' or `fifo'. Defaults to `fifo'.
Expand All @@ -164,6 +164,27 @@
%% Callbacks can be added and removed later by `wpool_pool:add_callback_module/2' and
%% `wpool_pool:remove_callback_module/2'.

-type run(Result) :: fun((name() | pid(), timeout()) -> Result).
%% A function to run with a given worker.
%%
%% It can be used to enable APIs that hide the gen_server behind a complex logic
%% that might for example curate parameters or run side-effects, for example, `supervisor'.
%%
%% For example:
%% ```
%% Opts =
%% #{workers => 3,
%% worker_shutdown => infinity,
%% worker => {supervisor, {Name, ModuleCallback, Args}}},
%% %% Note that the supervisor's `init/1' callback takes such 3-tuple.
%% {ok, Pid} = wpool:start_sup_pool(pool_of_supervisors, Opts),
%%
%% ...
%%
%% Run = fun(Sup, _) -> supervisor:start_child(Sup, Params) end,
%% {ok, Pid} = wpool:run(pool_of_supervisors, Run, next_worker),
%% '''

-type name() :: atom().
%% Name of the pool

Expand Down Expand Up @@ -273,13 +294,15 @@
{workers, [{pos_integer(), worker_stats()}]}].
%% Statistics about a given live pool.

-export_type([name/0, option/0, options/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]).
-export_type([name/0, option/0, options/0, custom_strategy/0, strategy/0,
queue_type/0, run/1, worker_stats/0, stats/0]).

-export([start/0, start/2, stop/0, stop/1]).
-export([child_spec/2, start_pool/1, start_pool/2, start_sup_pool/1, start_sup_pool/2]).
-export([stop_pool/1, stop_sup_pool/1]).
-export([call/2, cast/2, call/3, cast/3, call/4, broadcall/3, broadcast/2]).
-export([send_request/2, send_request/3, send_request/4]).
-export([call/2, call/3, call/4, cast/2, cast/3,
run/2, run/3, run/4, broadcall/3, broadcast/2,
send_request/2, send_request/3, send_request/4]).
-export([stats/0, stats/1, get_workers/1]).
-export([default_strategy/0]).

Expand Down Expand Up @@ -369,6 +392,38 @@ default_strategy() ->
Strategy
end.

%% @equiv run(Sup, Run, default_strategy())
-spec run(name(), run(Result)) -> Result.
run(Sup, Run) ->
run(Sup, Run, default_strategy()).

%% @equiv run(Sup, Run, Strategy, 5000)
-spec run(name(), run(Result), strategy()) -> Result.
run(Sup, Run, Strategy) ->
run(Sup, Run, Strategy, 5000).

%% @doc Picks a server and issues the run to it.
%%
%% For all strategies except available_worker, Timeout applies only to the
%% time spent on the actual run to the worker, because time spent finding
%% the worker in other strategies is negligible.
%% For available_worker the time used choosing a worker is also considered
-spec run(name(), run(Result), strategy(), timeout()) -> Result.
run(Sup, Run, available_worker, Timeout) ->
wpool_pool:run_with_available_worker(Sup, Run, Timeout);
run(Sup, Run, next_available_worker, Timeout) ->
wpool_process:run(wpool_pool:next_available_worker(Sup), Run, Timeout);
run(Sup, Run, next_worker, Timeout) ->
wpool_process:run(wpool_pool:next_worker(Sup), Run, Timeout);
run(Sup, Run, random_worker, Timeout) ->
wpool_process:run(wpool_pool:random_worker(Sup), Run, Timeout);
run(Sup, Run, best_worker, Timeout) ->
wpool_process:run(wpool_pool:best_worker(Sup), Run, Timeout);
run(Sup, Run, {hash_worker, HashKey}, Timeout) ->
wpool_process:run(wpool_pool:hash_worker(Sup, HashKey), Run, Timeout);
run(Sup, Run, Fun, Timeout) when is_function(Fun, 1) ->
wpool_process:run(Fun(Sup), Run, Timeout).

%% @equiv call(Sup, Call, default_strategy())
-spec call(name(), term()) -> term().
call(Sup, Call) ->
Expand All @@ -380,10 +435,11 @@ call(Sup, Call, Strategy) ->
call(Sup, Call, Strategy, 5000).

%% @doc Picks a server and issues the call to it.
%% For all strategies except available_worker, Timeout applies only to the
%% time spent on the actual call to the worker, because time spent finding
%% the worker in other strategies is negligible.
%% For available_worker the time used choosing a worker is also considered
%%
%% For all strategies except available_worker, Timeout applies only to the
%% time spent on the actual call to the worker, because time spent finding
%% the worker in other strategies is negligible.
%% For available_worker the time used choosing a worker is also considered
-spec call(name(), term(), strategy(), timeout()) -> term().
call(Sup, Call, available_worker, Timeout) ->
wpool_pool:call_available_worker(Sup, Call, Timeout);
Expand Down Expand Up @@ -434,7 +490,8 @@ send_request(Sup, Call, Strategy) ->
send_request(Sup, Call, Strategy, 5000).

%% @doc Picks a server and issues the call to it.
%% Timeout applies only for the time used choosing a worker in the available_worker strategy
%%
%% Timeout applies only for the time used choosing a worker in the available_worker strategy
-spec send_request(name(), term(), strategy(), timeout()) ->
noproc | timeout | gen_server:request_id().
send_request(Sup, Call, available_worker, Timeout) ->
Expand Down Expand Up @@ -486,6 +543,7 @@ stats(Sup) ->
wpool_pool:stats(Sup).

%% @doc Retrieves the list of worker registered names.
%%
%% This can be useful to manually inspect the workers or do custom work on them.
-spec get_workers(name()) -> [atom()].
get_workers(Sup) ->
Expand Down
40 changes: 33 additions & 7 deletions src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
%% API
-export([start_link/2]).
-export([best_worker/1, random_worker/1, next_worker/1, hash_worker/2,
next_available_worker/1, send_request_available_worker/3, call_available_worker/3]).
next_available_worker/1, send_request_available_worker/3, call_available_worker/3,
run_with_available_worker/3]).
-export([cast_to_available_worker/2, broadcast/2, broadcall/3]).
-export([stats/0, stats/1, get_workers/1]).
-export([worker_name/2, find_wpool/1]).
Expand Down Expand Up @@ -112,19 +113,44 @@ next_available_worker(Name) ->
end
end.

%% @doc Picks the first available worker and sends the call to it.
%% The timeout provided includes the time it takes to get a worker
%% and for it to process the call.
%% @throws no_workers | timeout
-spec run_with_available_worker(wpool:name(), wpool:run(Result), timeout()) -> Result.
run_with_available_worker(Name, Run, Timeout) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
#wpool{qmanager = QManager} ->
case wpool_queue_manager:run_with_available_worker(QManager, Run, Timeout) of
noproc ->
exit(no_workers);
timeout ->
exit(timeout);
Result ->
Result
end
end.

%% @doc Picks the first available worker and sends the call to it.
%% The timeout provided includes the time it takes to get a worker
%% and for it to process the call.
%% @throws no_workers | timeout
-spec call_available_worker(wpool:name(), any(), timeout()) -> any().
call_available_worker(Name, Call, Timeout) ->
case wpool_queue_manager:call_available_worker(queue_manager_name(Name), Call, Timeout) of
noproc ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
timeout ->
exit(timeout);
Result ->
Result
#wpool{qmanager = QManager} ->
case wpool_queue_manager:call_available_worker(QManager, Call, Timeout) of
noproc ->
exit(no_workers);
timeout ->
exit(timeout);
Result ->
Result
end
end.

%% @doc Picks the first available worker and sends the request to it.
Expand Down
7 changes: 6 additions & 1 deletion src/wpool_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
-export_type([next_step/0]).

%% api
-export([start_link/4, call/3, cast/2, send_request/2]).
-export([start_link/4, run/3, call/3, cast/2, send_request/2]).

-ifdef(TEST).

Expand All @@ -91,6 +91,11 @@ start_link(Name, Module, InitArgs, Options) ->
{Name, Module, InitArgs, FullOpts},
WorkerOpt).

%% @doc Runs a function that takes as a parameter the given process
-spec run(wpool:name() | pid(), wpool:run(Result), timeout()) -> Result.
run(Process, Run, Timeout) ->
Run(Process, Timeout).

%% @equiv gen_server:call(Process, Call, Timeout)
-spec call(wpool:name() | pid(), term(), timeout()) -> term().
call(Process, Call, Timeout) ->
Expand Down
47 changes: 30 additions & 17 deletions src/wpool_queue_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

%% api
-export([start_link/2, start_link/3]).
-export([call_available_worker/3, cast_to_available_worker/2, new_worker/2, worker_dead/2,
send_request_available_worker/3, worker_ready/2, worker_busy/2, pending_task_count/1]).
-export([run_with_available_worker/3, call_available_worker/3, cast_to_available_worker/2,
new_worker/2, worker_dead/2, send_request_available_worker/3, worker_ready/2,
worker_busy/2, pending_task_count/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

Expand All @@ -28,7 +29,7 @@
clients :: queue:queue({cast | {pid(), _}, term()}),
workers :: gb_sets:set(atom()),
monitors :: #{atom() := monitored_from()},
queue_type :: queue_type()}).
queue_type :: wpool:queue_type()}).

-opaque state() :: #state{}.

Expand All @@ -50,15 +51,14 @@

-type arg() :: option() | pool.
-type queue_mgr() :: atom().
-type queue_type() :: fifo | lifo.
-type worker_event() :: new_worker | worker_dead | worker_busy | worker_ready.

-export_type([worker_event/0]).

-type call_request() :: {available_worker, infinity | pos_integer()} | pending_task_count.

-export_type([call_request/0]).
-export_type([queue_mgr/0, queue_type/0]).
-export_type([queue_mgr/0]).

%%%===================================================================
%%% API
Expand All @@ -71,13 +71,27 @@ start_link(WPool, Name) ->
start_link(WPool, Name, Options) ->
gen_server:start_link({local, Name}, ?MODULE, [{pool, WPool} | Options], []).

%% @doc returns the first available worker in the pool
-spec run_with_available_worker(queue_mgr(), wpool:run(Result), timeout()) ->
noproc | timeout | Result.
run_with_available_worker(QueueManager, Call, Timeout) ->
case get_available_worker(QueueManager, Call, Timeout) of
{ok, Worker, TimeLeft} when TimeLeft > 0 ->
wpool_process:run(Worker, Call, TimeLeft);
{ok, Worker, _} ->
worker_ready(QueueManager, Worker),
timeout;
Other ->
Other
end.

%% @doc returns the first available worker in the pool
-spec call_available_worker(queue_mgr(), any(), timeout()) -> noproc | timeout | any().
call_available_worker(QueueManager, Call, Timeout) ->
case get_available_worker(QueueManager, Call, Timeout) of
{ok, TimeLeft, Worker} when TimeLeft > 0 ->
{ok, Worker, TimeLeft} when TimeLeft > 0 ->
wpool_process:call(Worker, Call, TimeLeft);
{ok, _, Worker} ->
{ok, Worker, _} ->
worker_ready(QueueManager, Worker),
timeout;
Other ->
Expand All @@ -97,7 +111,7 @@ cast_to_available_worker(QueueManager, Cast) ->
noproc | timeout | gen_server:request_id().
send_request_available_worker(QueueManager, Call, Timeout) ->
case get_available_worker(QueueManager, Call, Timeout) of
{ok, _TimeLeft, Worker} ->
{ok, Worker, _} ->
wpool_process:send_request(Worker, Call);
Other ->
Other
Expand Down Expand Up @@ -237,18 +251,17 @@ handle_info(_Info, State) ->
%%% private
%%%===================================================================
-spec get_available_worker(queue_mgr(), any(), timeout()) ->
noproc | timeout | {ok, timeout(), any()}.
noproc | timeout | {ok, atom(), timeout()}.
get_available_worker(QueueManager, Call, Timeout) ->
Start = now_in_milliseconds(),
ExpiresAt = expires(Timeout, Start),
ExpiresAt = expires(Timeout),
try gen_server:call(QueueManager, {available_worker, ExpiresAt}, Timeout) of
{'EXIT', _, noproc} ->
noproc;
{'EXIT', Worker, Exit} ->
exit({Exit, {gen_server, call, [Worker, Call, Timeout]}});
{ok, Worker} ->
TimeLeft = time_left(ExpiresAt),
{ok, TimeLeft, Worker}
{ok, Worker, TimeLeft}
catch
_:{noproc, {gen_server, call, _}} ->
noproc;
Expand All @@ -268,19 +281,19 @@ inc(Key) ->
dec(Key) ->
put(Key, get(Key) - 1).

-spec expires(timeout(), integer()) -> timeout().
expires(infinity, _) ->
-spec expires(timeout()) -> timeout().
expires(infinity) ->
infinity;
expires(Timeout, NowMs) ->
NowMs + Timeout.
expires(Timeout) ->
now_in_milliseconds() + Timeout.

-spec time_left(timeout()) -> timeout().
time_left(infinity) ->
infinity;
time_left(ExpiresAt) ->
ExpiresAt - now_in_milliseconds().

-spec is_expired(integer()) -> boolean().
-spec is_expired(timeout()) -> boolean().
is_expired(ExpiresAt) ->
ExpiresAt > now_in_milliseconds().

Expand Down
5 changes: 5 additions & 0 deletions test/echo_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
-behaviour(gen_server).

%% gen_server callbacks
-export([start_link/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2,
handle_continue/2, format_status/1]).

Expand All @@ -26,6 +27,10 @@

-export_type([from/0]).

-spec start_link(term()) -> gen_server:start_ret().
start_link(Something) ->
gen_server:start_link(?MODULE, Something, []).

%%%===================================================================
%%% callbacks
%%%===================================================================
Expand Down
23 changes: 23 additions & 0 deletions test/echo_supervisor.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-module(echo_supervisor).

-behaviour(supervisor).

-export([start_link/0]).
-export([init/1]).

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, noargs).

init(noargs) ->
Children =
#{id => undefined,
start => {echo_server, start_link, []},
restart => transient,
shutdown => 5000,
type => worker,
modules => [echo_server]},
Strategy =
#{strategy => simple_one_for_one,
intensity => 5,
period => 60},
{ok, {Strategy, [Children]}}.
Loading

0 comments on commit cd2e97f

Please sign in to comment.