Skip to content

Commit

Permalink
Merge pull request #207 from inaka/use_maps_as_opts
Browse files Browse the repository at this point in the history
Use maps as opts (and other small cosmetic changes)
  • Loading branch information
elbrujohalcon authored Aug 26, 2024
2 parents e7d83ae + 30fa495 commit b34a0db
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 166 deletions.
8 changes: 4 additions & 4 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
%% == Dependencies and plugins ==

{project_plugins,
[{rebar3_hank, "~> 1.4.0"},
{rebar3_hex, "~> 7.0.7"},
[{rebar3_hank, "~> 1.4.1"},
{rebar3_hex, "~> 7.0.8"},
{rebar3_format, "~> 1.3.0"},
{rebar3_lint, "~> 3.1.0"},
{rebar3_ex_doc, "~> 0.2.20"},
{rebar3_lint, "~> 3.2.6"},
{rebar3_ex_doc, "~> 0.2.23"},
{rebar3_depup, "~> 0.4.0"},
{covertool, "~> 2.0.6"}]}.

Expand Down
133 changes: 82 additions & 51 deletions src/wpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
%%
%% These are the same as described at the `gen_server' documentation.

-type worker_shutdown() :: worker_shutdown().
-type worker_shutdown() :: brutal_kill | timeout().
%% The `shutdown' option to be used over the individual workers.
%%
%% Defaults to `5000'. See {@link wpool_process_sup} for more details.
Expand Down Expand Up @@ -187,7 +187,27 @@
%% `child_spec/2', `start_pool/2', `start_sup_pool/2' are the callbacks
%% that take a list of these options as a parameter.

-type custom_strategy() :: fun(([atom()]) -> Atom :: atom()).
-type options() :: #{workers => workers(),
worker => worker(),
worker_opt => [worker_opt()],
strategy => supervisor_strategy(),
worker_shutdown => worker_shutdown(),
overrun_handler => overrun_handler(),
overrun_warning => overrun_warning(),
max_overrun_warnings => max_overrun_warnings(),
pool_sup_intensity => pool_sup_intensity(),
pool_sup_shutdown => pool_sup_shutdown(),
pool_sup_period => pool_sup_period(),
queue_type => queue_type(),
enable_callbacks => enable_callbacks(),
callbacks => callbacks(),
_ => _}.
%% Options that can be provided to a new pool.
%%
%% `child_spec/2', `start_pool/2', `start_sup_pool/2' are the callbacks
%% that take a list of these options as a parameter.

-type custom_strategy() :: fun((atom()) -> Atom :: atom()).
%% A callback that gets the pool name and returns a worker's name.

-type strategy() ::
Expand Down Expand Up @@ -246,14 +266,14 @@
-type stats() ::
[{pool, name()} |
{supervisor, pid()} |
{options, [option()]} |
{options, [option()] | options()} |
{size, non_neg_integer()} |
{next_worker, pos_integer()} |
{total_message_queue_len, non_neg_integer()} |
{workers, [{pos_integer(), worker_stats()}]}].
%% Statistics about a given live pool.

-export_type([name/0, option/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]).
-export_type([name/0, option/0, options/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]).

-export([start/0, start/2, stop/0, stop/1]).
-export([child_spec/2, start_pool/1, start_pool/2, start_sup_pool/1, start_sup_pool/2]).
Expand All @@ -280,7 +300,7 @@ stop() ->
%% BEHAVIOUR CALLBACKS
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @private
-spec start(any(), any()) -> {ok, pid()} | {error, term()}.
-spec start(any(), any()) -> supervisor:startlink_ret().
start(_StartType, _StartArgs) ->
wpool_sup:start_link().

Expand All @@ -293,20 +313,19 @@ stop(_State) ->
%% PUBLIC API
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @equiv start_pool(Name, [])
-spec start_pool(name()) -> {ok, pid()}.
-spec start_pool(name()) -> supervisor:startlink_ret().
start_pool(Name) ->
start_pool(Name, []).

%% @doc Starts (and links) a pool of N wpool_processes.
%% The result pid belongs to a supervisor (in case you want to add it to a
%% supervisor tree)
-spec start_pool(name(), [option()]) ->
{ok, pid()} | {error, {already_started, pid()} | term()}.
-spec start_pool(name(), [option()] | options()) -> supervisor:startlink_ret().
start_pool(Name, Options) ->
wpool_pool:start_link(Name, wpool_utils:add_defaults(Options)).

%% @doc Builds a child specification to pass to a supervisor.
-spec child_spec(name(), [option()]) -> supervisor:child_spec().
-spec child_spec(name(), [option()] | options()) -> supervisor:child_spec().
child_spec(Name, Options) ->
FullOptions = wpool_utils:add_defaults(Options),
#{id => Name,
Expand All @@ -326,13 +345,12 @@ stop_pool(Name) ->
end.

%% @equiv start_sup_pool(Name, [])
-spec start_sup_pool(name()) -> {ok, pid()} | {error, {already_started, pid()} | term()}.
-spec start_sup_pool(name()) -> supervisor:startchild_ret().
start_sup_pool(Name) ->
start_sup_pool(Name, []).

%% @doc Starts a pool of N wpool_processes supervised by `wpool_sup'
-spec start_sup_pool(name(), [option()]) ->
{ok, pid()} | {error, {already_started, pid()} | term()}.
-spec start_sup_pool(name(), [option()] | options()) -> supervisor:startchild_ret().
start_sup_pool(Name, Options) ->
wpool_sup:start_pool(Name, wpool_utils:add_defaults(Options)).

Expand Down Expand Up @@ -369,14 +387,18 @@ call(Sup, Call, Strategy) ->
-spec call(name(), term(), strategy(), timeout()) -> term().
call(Sup, Call, available_worker, Timeout) ->
wpool_pool:call_available_worker(Sup, Call, Timeout);
call(Sup, Call, next_available_worker, Timeout) ->
wpool_process:call(wpool_pool:next_available_worker(Sup), Call, Timeout);
call(Sup, Call, next_worker, Timeout) ->
wpool_process:call(wpool_pool:next_worker(Sup), Call, Timeout);
call(Sup, Call, random_worker, Timeout) ->
wpool_process:call(wpool_pool:random_worker(Sup), Call, Timeout);
call(Sup, Call, best_worker, Timeout) ->
wpool_process:call(wpool_pool:best_worker(Sup), Call, Timeout);
call(Sup, Call, {hash_worker, HashKey}, Timeout) ->
wpool_process:call(
wpool_pool:hash_worker(Sup, HashKey), Call, Timeout);
call(Sup, Call, Fun, Timeout) when is_function(Fun) ->
wpool_process:call(Fun(Sup), Call, Timeout);
call(Sup, Call, Strategy, Timeout) ->
wpool_process:call(
wpool_pool:Strategy(Sup), Call, Timeout).
wpool_process:call(wpool_pool:hash_worker(Sup, HashKey), Call, Timeout);
call(Sup, Call, Fun, Timeout) when is_function(Fun, 1) ->
wpool_process:call(Fun(Sup), Call, Timeout).

%% @equiv cast(Sup, Cast, default_strategy())
-spec cast(name(), term()) -> ok.
Expand All @@ -387,14 +409,18 @@ cast(Sup, Cast) ->
-spec cast(name(), term(), strategy()) -> ok.
cast(Sup, Cast, available_worker) ->
wpool_pool:cast_to_available_worker(Sup, Cast);
cast(Sup, Cast, next_available_worker) ->
wpool_process:cast(wpool_pool:next_available_worker(Sup), Cast);
cast(Sup, Cast, next_worker) ->
wpool_process:cast(wpool_pool:next_worker(Sup), Cast);
cast(Sup, Cast, random_worker) ->
wpool_process:cast(wpool_pool:random_worker(Sup), Cast);
cast(Sup, Cast, best_worker) ->
wpool_process:cast(wpool_pool:best_worker(Sup), Cast);
cast(Sup, Cast, {hash_worker, HashKey}) ->
wpool_process:cast(
wpool_pool:hash_worker(Sup, HashKey), Cast);
cast(Sup, Cast, Fun) when is_function(Fun) ->
wpool_process:cast(Fun(Sup), Cast);
cast(Sup, Cast, Strategy) ->
wpool_process:cast(
wpool_pool:Strategy(Sup), Cast).
wpool_process:cast(wpool_pool:hash_worker(Sup, HashKey), Cast);
cast(Sup, Cast, Fun) when is_function(Fun, 1) ->
wpool_process:cast(Fun(Sup), Cast).

%% @equiv send_request(Sup, Call, default_strategy(), 5000)
-spec send_request(name(), term()) -> noproc | timeout | gen_server:request_id().
Expand All @@ -413,14 +439,37 @@ send_request(Sup, Call, Strategy) ->
noproc | timeout | gen_server:request_id().
send_request(Sup, Call, available_worker, Timeout) ->
wpool_pool:send_request_available_worker(Sup, Call, Timeout);
send_request(Sup, Call, next_available_worker, _Timeout) ->
wpool_process:send_request(wpool_pool:next_available_worker(Sup), Call);
send_request(Sup, Call, next_worker, _Timeout) ->
wpool_process:send_request(wpool_pool:next_worker(Sup), Call);
send_request(Sup, Call, random_worker, _Timeout) ->
wpool_process:send_request(wpool_pool:random_worker(Sup), Call);
send_request(Sup, Call, best_worker, _Timeout) ->
wpool_process:send_request(wpool_pool:best_worker(Sup), Call);
send_request(Sup, Call, {hash_worker, HashKey}, _Timeout) ->
wpool_process:send_request(
wpool_pool:hash_worker(Sup, HashKey), Call);
send_request(Sup, Call, Fun, _Timeout) when is_function(Fun) ->
wpool_process:send_request(Fun(Sup), Call);
send_request(Sup, Call, Strategy, _Timeout) ->
wpool_process:send_request(
wpool_pool:Strategy(Sup), Call).
wpool_process:send_request(wpool_pool:hash_worker(Sup, HashKey), Call);
send_request(Sup, Call, Fun, _Timeout) when is_function(Fun, 1) ->
wpool_process:send_request(Fun(Sup), Call).


%% @doc Casts a message to all the workers within the given pool.
%%
%% <b>NOTE:</b> These messages don't get queued, they go straight to the worker's message queues, so
%% if you're using available_worker strategy to balance the charge and you have some tasks queued up
%% waiting for the next available worker, the broadcast will reach all the workers <b>before</b> the
%% queued up tasks.
-spec broadcast(wpool:name(), term()) -> ok.
broadcast(Sup, Cast) ->
wpool_pool:broadcast(Sup, Cast).

%% @doc Calls all the workers within the given pool async and waits for the responses synchronously.
%%
%% If one worker times out, the entire call is considered timed-out.
-spec broadcall(wpool:name(), term(), timeout()) ->
{[Replies :: term()], [Errors :: term()]}.
broadcall(Sup, Call, Timeout) ->
wpool_pool:broadcall(Sup, Call, Timeout).

%% @doc Retrieves a snapshot of statistics for all pools.
%%
Expand All @@ -441,21 +490,3 @@ stats(Sup) ->
-spec get_workers(name()) -> [atom()].
get_workers(Sup) ->
wpool_pool:get_workers(Sup).

%% @doc Casts a message to all the workers within the given pool.
%%
%% <b>NOTE:</b> These messages don't get queued, they go straight to the worker's message queues, so
%% if you're using available_worker strategy to balance the charge and you have some tasks queued up
%% waiting for the next available worker, the broadcast will reach all the workers <b>before</b> the
%% queued up tasks.
-spec broadcast(wpool:name(), term()) -> ok.
broadcast(Sup, Cast) ->
wpool_pool:broadcast(Sup, Cast).

%% @doc Calls all the workers within the given pool async and waits for the responses synchronously.
%%
%% If one worker times out, the entire call is considered timed-out.
-spec broadcall(wpool:name(), term(), timeout()) ->
{[Replies :: term()], [Errors :: term()]}.
broadcall(Sup, Call, Timeout) ->
wpool_pool:broadcall(Sup, Call, Timeout).
Loading

0 comments on commit b34a0db

Please sign in to comment.