Skip to content

Commit

Permalink
Link option
Browse files Browse the repository at this point in the history
  • Loading branch information
chrismccord committed Mar 13, 2024
1 parent 05e078c commit a7acb24
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 51 deletions.
29 changes: 27 additions & 2 deletions lib/flame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ defmodule FLAME do
The executed function will also be terminated on the remote flame if
the timeout is reached.
* `:link` – Whether the caller should be linked to the remote call. Defaults to `true`
to avoid long-running orphaned resources. Set to `false` to support long-running work
that you want to complete within the `:shutdown_timeout` of the remote runner, even
when the parent process or node is terminated.
## Examples
def my_expensive_thing(arg) do
Expand All @@ -196,9 +201,17 @@ defmodule FLAME do

@doc """
Casts a function to a remote runner for the given `FLAME.Pool`.
## Options
* `:link` – Whether the caller should be linked to the remote call. Defaults to `true`
to avoid long-running orphaned resources. Set to `false` to support long-running work
that you want to complete within the `:shutdown_timeout` of the remote runner, even
when the parent process or node is terminated.
"""
def cast(pool, func) when is_atom(pool) and is_function(func, 0) do
FLAME.Pool.cast(pool, func)
def cast(pool, func, opts \\ [])
when is_atom(pool) and is_function(func, 0) and is_list(opts) do
FLAME.Pool.cast(pool, func, opts)
end

@doc """
Expand All @@ -217,6 +230,18 @@ defmodule FLAME do
exits. If you want restart behavior, you need to monitor on the parent node and
replace the child yourself.
## Options
* `:timeout` - The timeout the caller is willing to wait for a response before an
exit with `:timeout`. Defaults to the configured timeout of the pool.
The executed function will also be terminated on the remote flame if
the timeout is reached.
* `:link` – Whether the caller should be linked to the remote call. Defaults to `true`
to avoid long-running orphaned resources. Set to `false` to support long-running work
that you want to complete within the `:shutdown_timeout` of the remote runner, even
when the parent process or node is terminated.
Accepts any child spec.
## Examples
Expand Down
48 changes: 42 additions & 6 deletions lib/flame/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,47 @@ defmodule FLAME.Pool do
See `FLAME.call/3` for more information.
"""
def call(name, func, opts \\ []) when is_function(func, 0) and is_list(opts) do
opts = put_link(opts, self())
do_call(name, func, opts)
end

# opts are normalized before this to set caller :link
defp do_call(name, func, opts) do
caller_checkout!(name, opts, :call, [name, func, opts], fn runner_pid, remaining_timeout ->
{:cancel, :ok, Runner.call(runner_pid, func, remaining_timeout)}
opts = Keyword.put_new(opts, :timeout, remaining_timeout)
{:cancel, :ok, Runner.call(runner_pid, func, opts)}
end)
end

defp put_link(opts, pid) do
case Keyword.fetch(opts, :link) do
{:ok, true} -> Keyword.put(opts, :link, pid)
{:ok, val} when val in [false, nil] -> Keyword.put(opts, :link, false)
:error -> Keyword.put(opts, :link, pid)
end
end

@doc """
Casts a function to a remote runner for the given `FLAME.Pool`.
See `FLAME.cast/2` for more information.
See `FLAME.cast/3` for more information.
"""
def cast(name, func) when is_function(func, 0) do
def cast(name, func, opts) when is_function(func, 0) and is_list(opts) do
%{task_sup: task_sup} = lookup_meta(name)

Task.Supervisor.async_nolink(task_sup, fn -> call(name, func, timeout: :infinity) end)
opts =
opts
|> Keyword.put_new(:timeout, :infinity)
|> put_link(self())

# we don't care about the result
wrapped = fn ->
func.()
:ok
end

Task.Supervisor.start_child(task_sup, fn -> do_call(name, wrapped, opts) end)

:ok
end

Expand All @@ -184,9 +211,18 @@ defmodule FLAME.Pool do
def place_child(name, child_spec, opts) do
caller_checkout!(name, opts, :place_child, [name, child_spec, opts], fn runner_pid,
remaining_timeout ->
case Runner.place_child(runner_pid, child_spec, opts[:timeout] || remaining_timeout) do
place_opts =
opts
|> Keyword.put(:timeout, opts[:timeout] || remaining_timeout)
|> put_link(self())

case Runner.place_child(runner_pid, child_spec, place_opts) do
{:ok, child_pid} = result ->
Process.link(child_pid)
# we are placing the link back on the parent node, but we are protected
# from racing the link on the child FLAME because the terminator on
# the remote flame is monitoring the caller and will terminator the child
# if we go away
if place_opts[:link], do: Process.link(child_pid)
{:cancel, {:replace, child_pid}, result}

:ignore ->
Expand Down
40 changes: 14 additions & 26 deletions lib/flame/runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ defmodule FLAME.Runner do
# {:ok, runner} = Runner.start_link(backend: FLAME.FlyBackend)
# :ok = Runner.remote_boot(runner)
# Runner.call(runner, fn -> :operation1 end)
# Runner.cast(runner, fn -> :operation2 end)
# Runner.shutdown(runner)
#
# When a caller exits or crashes, the remote node will automatically be terminated.
Expand Down Expand Up @@ -93,8 +92,8 @@ defmodule FLAME.Runner do
to ensure that the child is not restarted if it exits. If you want restart
behavior, you must monitor the process yourself on the parent node and replace it.
"""
def place_child(runner_pid, child_spec, timeout)
when is_pid(runner_pid) and (is_integer(timeout) or timeout in [:infinity, nil]) do
def place_child(runner_pid, child_spec, opts)
when is_pid(runner_pid) and is_list(opts) do
# we must rewrite :temporary restart strategy for the spec to avoid restarting placed children
new_spec = Supervisor.child_spec(child_spec, restart: :temporary)
caller = self()
Expand All @@ -104,23 +103,31 @@ defmodule FLAME.Runner do
fn terminator ->
Terminator.place_child(terminator, caller, new_spec)
end,
timeout
opts
)
end

@doc """
Calls a function on the remote node.
"""
def call(runner_pid, func, timeout \\ nil) when is_pid(runner_pid) and is_function(func) do
def call(runner_pid, func, opts \\ [])
when is_pid(runner_pid) and is_function(func) and is_list(opts) do
link_caller_pid =
case Keyword.fetch(opts, :link) do
{:ok, pid} when is_pid(pid) -> pid
{:ok, false} -> false
:error -> false
end

timeout = opts[:timeout] || nil
{ref, %Runner{} = runner, backend_state} = checkout(runner_pid)
%Runner{terminator: terminator} = runner
call_timeout = timeout || runner.timeout
caller_pid = self()

result =
remote_call(runner, backend_state, call_timeout, fn ->
:ok = Terminator.deadline_me(terminator, call_timeout)
Process.link(caller_pid)
if link_caller_pid, do: Process.link(link_caller_pid)
if is_function(func, 1), do: func.(terminator), else: func.()
end)

Expand All @@ -140,25 +147,6 @@ defmodule FLAME.Runner do
end
end

@doc """
Casts a function to the remote node.
"""
def cast(runner_pid, func) when is_pid(runner_pid) and is_function(func, 0) do
{ref, runner, backend_state} = checkout(runner_pid)

%Runner{backend: backend, timeout: timeout, terminator: terminator} =
runner

{:ok, {_remote_pid, _remote_monitor_ref}} =
backend.remote_spawn_monitor(backend_state, fn ->
# This runs on the remote node
:ok = Terminator.deadline_me(terminator, timeout)
func.()
end)

:ok = checkin(runner_pid, ref)
end

defp checkout(runner_pid) do
GenServer.call(runner_pid, :checkout)
end
Expand Down
2 changes: 0 additions & 2 deletions lib/flame/terminator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ defmodule FLAME.Terminator do

{:noreply, new_state}
else
IO.inspect({:DOWN, reason}, label: "[TERMINATOR]")
{:noreply, drop_caller(state, ref)}
end
end
Expand Down Expand Up @@ -266,7 +265,6 @@ defmodule FLAME.Terminator do
|> cancel_idle_shutdown()
|> system_stop("terminating")

IO.inspect(state.calls, label: "[TERMINATE] calls")
# supervisor will force kill us if we take longer than configured shutdown_timeout
Enum.each(state.calls, fn
# skip callers that placed a child since they are on the remote node
Expand Down
Loading

0 comments on commit a7acb24

Please sign in to comment.