1121 lines
36 KiB
Elixir
1121 lines
36 KiB
Elixir
defmodule NimblePool do
|
||
@external_resource "README.md"
|
||
@moduledoc "README.md"
|
||
|> File.read!()
|
||
|> String.split("<!-- MDOC !-->")
|
||
|> Enum.fetch!(1)
|
||
|
||
use GenServer
|
||
require Logger
|
||
|
||
@type from :: {pid, reference}
|
||
@type init_arg :: term
|
||
@type pool_state :: term
|
||
@type worker_state :: term
|
||
@type client_state :: term
|
||
@type user_reason :: term
|
||
|
||
@typedoc since: "1.1.0"
|
||
@type pool :: GenServer.server()
|
||
|
||
@doc """
|
||
Initializes the worker.
|
||
|
||
It receives the worker argument passed to `start_link/1` if `c:init_pool/1` is
|
||
not implemented, otherwise the pool state returned by `c:init_pool/1`. It must
|
||
return `{:ok, worker_state, pool_state}` or `{:async, fun, pool_state}`, where the `fun`
|
||
is a zero-arity function that must return the worker state.
|
||
|
||
If this callback returns `{:async, fun, pool_state}`, `fun` is executed in a **separate
|
||
one-off process**. Because of this, if you start resources that the pool needs to "own",
|
||
you need to transfer ownership to the pool process. For example, if your async `fun`
|
||
opens a `:gen_tcp` socket, you'll have to use `:gen_tcp.controlling_process/2` to transfer
|
||
ownership back to the pool.
|
||
|
||
> #### Blocking the pool {: .warning}
|
||
>
|
||
> This callback is synchronous and therefore will block the pool, potentially
|
||
> for a significant amount of time since it's executed in the pool process once
|
||
> per worker. > If you need to perform long initialization, consider using the
|
||
> `{:async, fun, pool_state}` return type.
|
||
"""
|
||
@doc callback: :worker
|
||
@callback init_worker(pool_state) ::
|
||
{:ok, worker_state, pool_state} | {:async, (-> worker_state), pool_state}
|
||
|
||
@doc """
|
||
Initializes the pool.
|
||
|
||
It receives the worker argument passed to `start_link/1` and must
|
||
return `{:ok, pool_state}` upon successful initialization,
|
||
`:ignore` to exit normally, or `{:stop, reason}` to exit with `reason`
|
||
and return `{:error, reason}`.
|
||
|
||
This is a good place to perform a registration, for example.
|
||
|
||
It must return the `pool_state`. The `pool_state` is given to
|
||
`init_worker`. By default, it simply returns the given arguments.
|
||
|
||
This callback is optional.
|
||
|
||
## Examples
|
||
|
||
@impl NimblePool
|
||
def init_pool(options) do
|
||
Registry.register(options[:registry], :some_key, :some_value)
|
||
end
|
||
|
||
"""
|
||
@doc callback: :pool
|
||
@callback init_pool(init_arg) :: {:ok, pool_state} | :ignore | {:stop, reason :: any()}
|
||
|
||
@doc """
|
||
Checks a worker out.
|
||
|
||
The `maybe_wrapped_command` is the `command` passed to `checkout!/4` if the worker
|
||
doesn't implement the `c:handle_enqueue/2` callback, otherwise it's the possibly-wrapped
|
||
command returned by `c:handle_enqueue/2`.
|
||
|
||
This callback must return one of:
|
||
|
||
* `{:ok, client_state, worker_state, pool_state}` — the client state is given to
|
||
the callback function passed to `checkout!/4`. `worker_state` and `pool_state`
|
||
can potentially update the state of the checked-out worker and the pool.
|
||
|
||
* `{:remove, reason, pool_state}` — `NimblePool` will remove the checked-out worker and
|
||
attempt to checkout another worker.
|
||
|
||
* `{:skip, Exception.t(), pool_state}` — `NimblePool` will skip the checkout, the client will
|
||
raise the returned exception, and the worker will be left ready for the next
|
||
checkout attempt.
|
||
|
||
> #### Blocking the pool {: .warning}
|
||
>
|
||
> This callback is synchronous and therefore will block the pool.
|
||
> Avoid performing long work in here. Instead, do as much work as
|
||
> possible on the client.
|
||
|
||
Once the worker is checked out, the worker won't handle any
|
||
messages targeted to `c:handle_info/2`.
|
||
"""
|
||
@doc callback: :worker
|
||
@callback handle_checkout(maybe_wrapped_command :: term, from, worker_state, pool_state) ::
|
||
{:ok, client_state, worker_state, pool_state}
|
||
| {:remove, user_reason, pool_state}
|
||
| {:skip, Exception.t(), pool_state}
|
||
|
||
@doc """
|
||
Checks a worker back in the pool.
|
||
|
||
It receives the potentially-updated `client_state`, returned by the `checkout!/4`
|
||
anonymous function, and it must return either
|
||
`{:ok, worker_state, pool_state}` or `{:remove, reason, pool_state}`.
|
||
|
||
> #### Blocking the pool {: .warning}
|
||
>
|
||
> This callback is synchronous and therefore will block the pool.
|
||
> Avoid performing long work in here, instead do as much work as
|
||
> possible on the client.
|
||
|
||
Once the connection is checked in, it may immediately be handed
|
||
to another client, without traversing any of the messages in the
|
||
pool inbox.
|
||
|
||
This callback is optional.
|
||
"""
|
||
@doc callback: :worker
|
||
@callback handle_checkin(client_state, from, worker_state, pool_state) ::
|
||
{:ok, worker_state, pool_state} | {:remove, user_reason, pool_state}
|
||
|
||
@doc """
|
||
Handles an update instruction from a checked out worker.
|
||
|
||
See `update/2` for more information.
|
||
|
||
This callback is optional.
|
||
"""
|
||
@doc callback: :worker
|
||
@callback handle_update(message :: term, worker_state, pool_state) ::
|
||
{:ok, worker_state, pool_state}
|
||
|
||
@doc """
|
||
Receives a message in the pool and handles it as each worker.
|
||
|
||
It receives the `message` and it must return either
|
||
`{:ok, worker_state}` to update the worker state, or `{:remove, reason}` to
|
||
remove the worker.
|
||
|
||
Since there is only a single pool process that can receive messages, this
|
||
callback is executed once for every worker when the pool receives `message`.
|
||
|
||
> #### Blocking the pool {: .warning}
|
||
>
|
||
> This callback is synchronous and therefore will block the pool while it
|
||
> executes for each worker. Avoid performing long work in here.
|
||
|
||
This callback is optional.
|
||
"""
|
||
@doc callback: :worker
|
||
@callback handle_info(message :: term, worker_state) ::
|
||
{:ok, worker_state} | {:remove, user_reason}
|
||
|
||
@doc """
|
||
Executed by the pool whenever a request to check out a worker is enqueued.
|
||
|
||
The `command` argument should be treated as an opaque value, but it can be
|
||
wrapped with some data to be used in `c:handle_checkout/4`.
|
||
|
||
It must return either `{:ok, maybe_wrapped_command, pool_state}` or
|
||
`{:skip, Exception.t(), pool_state}` if checkout is to be skipped.
|
||
|
||
> #### Blocking the pool {: .warning}
|
||
>
|
||
> This callback is synchronous and therefore will block the pool.
|
||
> Avoid performing long work in here.
|
||
|
||
This callback is optional.
|
||
|
||
## Examples
|
||
|
||
@impl NimblePool
|
||
def handle_enqueue(command, pool_state) do
|
||
{:ok, {:wrapped, command}, pool_state}
|
||
end
|
||
|
||
"""
|
||
@doc callback: :pool
|
||
@callback handle_enqueue(command :: term, pool_state) ::
|
||
{:ok, maybe_wrapped_command :: term, pool_state}
|
||
| {:skip, Exception.t(), pool_state}
|
||
|
||
@doc """
|
||
Terminates a worker.
|
||
|
||
The `reason` argument is:
|
||
|
||
* `:DOWN` whenever the client link breaks
|
||
* `:timeout` whenever the client times out
|
||
* one of `:throw`, `:error`, `:exit` whenever the client crashes with one
|
||
of the reasons above.
|
||
* `reason` if at any point you return `{:remove, reason}`
|
||
* if any callback raises, the raised exception will be given as `reason`.
|
||
|
||
It receives the latest known `worker_state`, which may not
|
||
be the latest state. For example, if a client checks out the
|
||
state and crashes, we don't fully know the `client_state`,
|
||
so the `c:terminate_worker/3` callback needs to take such scenarios
|
||
into account.
|
||
|
||
This callback must always return `{:ok, pool_state}` with the potentially-updated
|
||
pool state.
|
||
|
||
This callback is optional.
|
||
"""
|
||
@doc callback: :pool
|
||
@callback terminate_worker(
|
||
reason :: :DOWN | :timeout | :throw | :error | :exit | user_reason,
|
||
worker_state,
|
||
pool_state
|
||
) ::
|
||
{:ok, pool_state}
|
||
|
||
@doc """
|
||
Handle pings due to inactivity on the worker.
|
||
|
||
Executed whenever the idle worker periodic timer verifies that a worker has been idle
|
||
on the pool for longer than the `:worker_idle_timeout` pool configuration (in milliseconds).
|
||
|
||
This callback must return one of the following values:
|
||
|
||
* `{:ok, worker_state}`: Updates worker state.
|
||
|
||
* `{:remove, user_reason}`: The pool will proceed to the standard worker termination
|
||
defined in `terminate_worker/3`.
|
||
|
||
* `{:stop, user_reason}`: The entire pool process will be terminated, and `terminate_worker/3`
|
||
will be called for every worker on the pool.
|
||
|
||
This callback is optional.
|
||
|
||
## Max idle pings
|
||
|
||
The `:max_idle_pings` pool option is useful to prevent sequential termination of a large number
|
||
of workers. However, it is important to keep in mind the following behaviours whenever
|
||
utilizing it.
|
||
|
||
* If you are not terminating workers with `c:handle_ping/2`, you may end up pinging only
|
||
the same workers over and over again because each cycle will ping only the first
|
||
`:max_idle_pings` workers.
|
||
|
||
* If you are terminating workers with `c:handle_ping/2`, the last worker may be terminated
|
||
after up to `worker_idle_timeout + worker_idle_timeout * ceil(number_of_workers/max_idle_pings)`,
|
||
instead of `2 * worker_idle_timeout` milliseconds of idle time.
|
||
|
||
For instance consider a pool with 10 workers and a ping of 1 second.
|
||
|
||
Given a negligible worker termination time and a worst-case scenario where all the workers
|
||
go idle right after a verification cycle is started, then without `max_idle_pings` the
|
||
last worker will be terminated in the next cycle (2 seconds), whereas with a
|
||
`max_idle_pings` of 2 the last worker will be terminated only in the 5th cycle (6 seconds).
|
||
|
||
## Disclaimers
|
||
|
||
* On lazy pools, if no worker is currently on the pool the callback will never be called.
|
||
Therefore you can not rely on this callback to terminate empty lazy pools.
|
||
|
||
* On not lazy pools, if you return `{:remove, user_reason}` you may end up
|
||
terminating and initializing workers at the same time every idle verification cycle.
|
||
|
||
* On large pools, if many resources go idle at the same cycle, you may end up terminating
|
||
a large number of workers sequentially, which could lead to the pool being unable to
|
||
fulfill requests. See `:max_idle_pings` option to prevent this.
|
||
|
||
"""
|
||
@doc callback: :worker
|
||
@callback handle_ping(
|
||
worker_state,
|
||
pool_state
|
||
) ::
|
||
{:ok, worker_state} | {:remove, user_reason()} | {:stop, user_reason()}
|
||
|
||
@doc """
|
||
Handle pool termination.
|
||
|
||
The `reason` argmument is the same given to GenServer's terminate/2 callback.
|
||
|
||
It is not necessary to terminate workers here because the
|
||
`terminate_worker/3` callback has already been invoked.
|
||
|
||
This should be used only for clean up extra resources that can not be
|
||
handled by `terminate_worker/3` callback.
|
||
|
||
This callback is optional.
|
||
"""
|
||
@doc callback: :pool
|
||
@callback terminate_pool(
|
||
reason :: :DOWN | :timeout | :throw | :error | :exit | user_reason,
|
||
pool_state
|
||
) :: :ok
|
||
|
||
@doc """
|
||
Handle cancelled checkout requests.
|
||
|
||
This callback is executed when a checkout request is cancelled unexpectedly.
|
||
|
||
The context argument may be `:queued` or `:checked_out`:
|
||
|
||
* `:queued` means the cancellation happened before resource checkout. This may happen
|
||
when the pool is starving under load and can not serve resources.
|
||
|
||
* `:checked_out` means the cancellation happened after resource checkout. This may happen
|
||
when the function given to `checkout!/4` raises.
|
||
|
||
This callback is optional.
|
||
"""
|
||
@doc callback: :pool
|
||
@callback handle_cancelled(
|
||
context :: :queued | :checked_out,
|
||
pool_state
|
||
) :: :ok
|
||
|
||
@optional_callbacks init_pool: 1,
|
||
handle_checkin: 4,
|
||
handle_info: 2,
|
||
handle_enqueue: 2,
|
||
handle_update: 3,
|
||
handle_ping: 2,
|
||
terminate_worker: 3,
|
||
terminate_pool: 2,
|
||
handle_cancelled: 2
|
||
|
||
@doc """
|
||
Defines a pool to be started under the supervision tree.
|
||
|
||
It accepts the same options as `start_link/1` with the
|
||
addition or `:restart` and `:shutdown` that control the
|
||
"Child Specification".
|
||
|
||
## Examples
|
||
|
||
NimblePool.child_spec(worker: {__MODULE__, :some_arg}, restart: :temporary)
|
||
|
||
"""
|
||
@spec child_spec(keyword) :: Supervisor.child_spec()
|
||
def child_spec(opts) when is_list(opts) do
|
||
{worker, _} = Keyword.fetch!(opts, :worker)
|
||
{restart, opts} = Keyword.pop(opts, :restart, :permanent)
|
||
{shutdown, opts} = Keyword.pop(opts, :shutdown, 5_000)
|
||
|
||
%{
|
||
id: worker,
|
||
start: {__MODULE__, :start_link, [opts]},
|
||
shutdown: shutdown,
|
||
restart: restart
|
||
}
|
||
end
|
||
|
||
@doc """
|
||
Starts a pool.
|
||
|
||
## Options
|
||
|
||
* `:worker` - a `{worker_mod, worker_init_arg}` tuple with the worker
|
||
module that implements the `NimblePool` behaviour and the worker
|
||
initial argument. This argument is **required**.
|
||
|
||
* `:pool_size` - how many workers in the pool. Defaults to `10`.
|
||
|
||
* `:lazy` - When `true`, workers are started lazily, only when necessary.
|
||
Defaults to `false`.
|
||
|
||
* `:worker_idle_timeout` - Timeout in milliseconds to tag a worker as idle.
|
||
If not nil, starts a periodic timer on the same frequency that will ping
|
||
all idle workers using `handle_ping/2` optional callback .
|
||
Defaults to no timeout.
|
||
|
||
* `:max_idle_pings` - Defines a limit to the number of workers that can be pinged
|
||
for each cycle of the `handle_ping/2` optional callback.
|
||
Defaults to no limit. See `handle_ping/2` for more details.
|
||
|
||
"""
|
||
@spec start_link(keyword) :: GenServer.on_start()
|
||
def start_link(opts) when is_list(opts) do
|
||
{{worker, arg}, opts} =
|
||
Keyword.pop_lazy(opts, :worker, fn ->
|
||
raise ArgumentError, "missing required :worker option"
|
||
end)
|
||
|
||
{pool_size, opts} = Keyword.pop(opts, :pool_size, 10)
|
||
{lazy, opts} = Keyword.pop(opts, :lazy, false)
|
||
{worker_idle_timeout, opts} = Keyword.pop(opts, :worker_idle_timeout, nil)
|
||
{max_idle_pings, opts} = Keyword.pop(opts, :max_idle_pings, -1)
|
||
|
||
unless is_atom(worker) do
|
||
raise ArgumentError, "worker must be an atom, got: #{inspect(worker)}"
|
||
end
|
||
|
||
unless is_integer(pool_size) and pool_size > 0 do
|
||
raise ArgumentError, "pool_size must be a positive integer, got: #{inspect(pool_size)}"
|
||
end
|
||
|
||
GenServer.start_link(
|
||
__MODULE__,
|
||
{worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings},
|
||
opts
|
||
)
|
||
end
|
||
|
||
@doc """
|
||
Stops the given `pool`.
|
||
|
||
The pool exits with the given `reason`. The pool has `timeout` milliseconds
|
||
to terminate, otherwise it will be brutally terminated.
|
||
|
||
## Examples
|
||
|
||
NimblePool.stop(pool)
|
||
#=> :ok
|
||
|
||
"""
|
||
@spec stop(pool, reason :: term, timeout) :: :ok
|
||
def stop(pool, reason \\ :normal, timeout \\ :infinity) do
|
||
GenServer.stop(pool, reason, timeout)
|
||
end
|
||
|
||
@doc """
|
||
Checks out a worker from the pool.
|
||
|
||
It expects a command, which will be passed to the `c:handle_checkout/4`
|
||
callback. The `c:handle_checkout/4` callback will return a client state,
|
||
which is given to the `function`.
|
||
|
||
The `function` receives two arguments, the request
|
||
(`{pid(), reference()}`) and the `client_state`.
|
||
The function must return a two-element tuple, where the first element is the
|
||
return value for `checkout!/4`, and the second element is the updated `client_state`,
|
||
which will be given as the first argument to `c:handle_checkin/4`.
|
||
|
||
`checkout!/4` also has an optional `timeout` value. This value will be applied
|
||
to the checkout operation itself. The "check in" operation happens asynchronously.
|
||
"""
|
||
@spec checkout!(pool, command :: term, function, timeout) :: result
|
||
when function: (from, client_state -> {result, client_state}), result: var
|
||
def checkout!(pool, command, function, timeout \\ 5_000) when is_function(function, 2) do
|
||
# Re-implementation of gen.erl call to avoid multiple monitors.
|
||
pid = GenServer.whereis(pool)
|
||
|
||
unless pid do
|
||
exit!(:noproc, :checkout, [pool])
|
||
end
|
||
|
||
ref = Process.monitor(pid)
|
||
send_call(pid, ref, {:checkout, command, deadline(timeout)})
|
||
|
||
receive do
|
||
{^ref, {:skipped, exception}} ->
|
||
raise exception
|
||
|
||
{^ref, client_state} ->
|
||
Process.demonitor(ref, [:flush])
|
||
|
||
try do
|
||
function.({pid, ref}, client_state)
|
||
catch
|
||
kind, reason ->
|
||
send(pid, {__MODULE__, :cancel, ref, kind})
|
||
:erlang.raise(kind, reason, __STACKTRACE__)
|
||
else
|
||
{result, client_state} ->
|
||
send(pid, {__MODULE__, :checkin, ref, client_state})
|
||
result
|
||
end
|
||
|
||
{:DOWN, ^ref, _, _, :noconnection} ->
|
||
exit!({:nodedown, get_node(pid)}, :checkout, [pool])
|
||
|
||
{:DOWN, ^ref, _, _, reason} ->
|
||
exit!(reason, :checkout, [pool])
|
||
after
|
||
timeout ->
|
||
send(pid, {__MODULE__, :cancel, ref, :timeout})
|
||
Process.demonitor(ref, [:flush])
|
||
exit!(:timeout, :checkout, [pool])
|
||
end
|
||
end
|
||
|
||
@doc """
|
||
Sends an **update** instruction to the pool about the checked out worker.
|
||
|
||
This must be called inside the `checkout!/4` callback function with
|
||
the `from` value given to `c:handle_checkout/4`.
|
||
|
||
This is useful to update the pool's state before effectively
|
||
checking the state in, which is handy when transferring
|
||
resources requires two steps.
|
||
"""
|
||
@spec update(from, command :: term) :: :ok
|
||
def update({pid, ref} = _from, command) do
|
||
send(pid, {__MODULE__, :update, ref, command})
|
||
:ok
|
||
end
|
||
|
||
defp deadline(timeout) when is_integer(timeout) do
|
||
System.monotonic_time() + System.convert_time_unit(timeout, :millisecond, :native)
|
||
end
|
||
|
||
defp deadline(:infinity), do: :infinity
|
||
|
||
defp get_node({_, node}), do: node
|
||
defp get_node(pid) when is_pid(pid), do: node(pid)
|
||
|
||
defp send_call(pid, ref, message) do
|
||
# Auto-connect is asynchronous. But we still use :noconnect to make sure
|
||
# we send on the monitored connection, and not trigger a new auto-connect.
|
||
Process.send(pid, {:"$gen_call", {self(), ref}, message}, [:noconnect])
|
||
end
|
||
|
||
defp exit!(reason, fun, args) do
|
||
exit({reason, {__MODULE__, fun, args}})
|
||
end
|
||
|
||
## Callbacks
|
||
|
||
@impl true
|
||
def init({worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings}) do
|
||
Process.flag(:trap_exit, true)
|
||
|
||
case Code.ensure_loaded(worker) do
|
||
{:module, _} ->
|
||
:ok
|
||
|
||
{:error, reason} ->
|
||
raise ArgumentError, "failed to load worker module #{inspect(worker)}: #{inspect(reason)}"
|
||
end
|
||
|
||
lazy = if lazy, do: pool_size, else: nil
|
||
|
||
if worker_idle_timeout do
|
||
if function_exported?(worker, :handle_ping, 2) do
|
||
Process.send_after(self(), :check_idle, worker_idle_timeout)
|
||
else
|
||
IO.warn(
|
||
":worker_idle_timeout was given but the worker does not export a handle_ping/2 callback"
|
||
)
|
||
end
|
||
end
|
||
|
||
with {:ok, pool_state} <- do_init_pool(worker, arg) do
|
||
{pool_state, resources, async} =
|
||
if is_nil(lazy) do
|
||
Enum.reduce(1..pool_size, {pool_state, :queue.new(), %{}}, fn
|
||
_, {pool_state, resources, async} ->
|
||
init_worker(worker, pool_state, resources, async, worker_idle_timeout)
|
||
end)
|
||
else
|
||
{pool_state, :queue.new(), %{}}
|
||
end
|
||
|
||
state = %{
|
||
worker: worker,
|
||
queue: :queue.new(),
|
||
requests: %{},
|
||
monitors: %{},
|
||
resources: resources,
|
||
async: async,
|
||
state: pool_state,
|
||
lazy: lazy,
|
||
worker_idle_timeout: worker_idle_timeout,
|
||
max_idle_pings: max_idle_pings
|
||
}
|
||
|
||
{:ok, state}
|
||
end
|
||
end
|
||
|
||
@impl true
|
||
def handle_call({:checkout, command, deadline}, {pid, ref} = from, state) do
|
||
%{requests: requests, monitors: monitors, worker: worker, state: pool_state} = state
|
||
mon_ref = Process.monitor(pid)
|
||
requests = Map.put(requests, ref, {pid, mon_ref, :command, command, deadline})
|
||
monitors = Map.put(monitors, mon_ref, ref)
|
||
state = %{state | requests: requests, monitors: monitors}
|
||
|
||
case handle_enqueue(worker, command, pool_state) do
|
||
{:ok, command, pool_state} ->
|
||
{:noreply, maybe_checkout(command, mon_ref, deadline, from, %{state | state: pool_state})}
|
||
|
||
{:skip, exception, pool_state} ->
|
||
state = remove_request(%{state | state: pool_state}, ref, mon_ref)
|
||
{:reply, {:skipped, exception}, state}
|
||
end
|
||
end
|
||
|
||
@impl true
|
||
def handle_info({__MODULE__, :update, ref, command}, state) do
|
||
%{requests: requests, state: pool_state, worker: worker} = state
|
||
|
||
case requests do
|
||
%{^ref => {pid, mon_ref, :state, worker_state}} ->
|
||
{:ok, worker_state, pool_state} = worker.handle_update(command, worker_state, pool_state)
|
||
requests = Map.put(requests, ref, {pid, mon_ref, :state, worker_state})
|
||
{:noreply, %{state | requests: requests, state: pool_state}}
|
||
|
||
%{} ->
|
||
exit(:unexpected_precheckin)
|
||
end
|
||
end
|
||
|
||
@impl true
|
||
def handle_info({__MODULE__, :checkin, ref, worker_client_state}, state) do
|
||
%{
|
||
requests: requests,
|
||
resources: resources,
|
||
worker: worker,
|
||
state: pool_state,
|
||
worker_idle_timeout: worker_idle_timeout
|
||
} = state
|
||
|
||
case requests do
|
||
%{^ref => {pid, mon_ref, :state, worker_server_state}} ->
|
||
checkin =
|
||
if function_exported?(worker, :handle_checkin, 4) do
|
||
args = [worker_client_state, {pid, ref}, worker_server_state, pool_state]
|
||
apply_worker_callback(pool_state, worker, :handle_checkin, args)
|
||
else
|
||
{:ok, worker_server_state, pool_state}
|
||
end
|
||
|
||
{resources, state} =
|
||
case checkin do
|
||
{:ok, worker_server_state, pool_state} ->
|
||
{:queue.in({worker_server_state, get_metadata(worker_idle_timeout)}, resources),
|
||
%{state | state: pool_state}}
|
||
|
||
{:remove, reason, pool_state} ->
|
||
{resources,
|
||
remove_worker(reason, worker_server_state, %{state | state: pool_state})}
|
||
end
|
||
|
||
state = remove_request(state, ref, mon_ref)
|
||
{:noreply, maybe_checkout(%{state | resources: resources})}
|
||
|
||
%{} ->
|
||
exit(:unexpected_checkin)
|
||
end
|
||
end
|
||
|
||
@impl true
|
||
def handle_info({__MODULE__, :cancel, ref, reason}, state) do
|
||
cancel_request_ref(ref, reason, state)
|
||
end
|
||
|
||
@impl true
|
||
def handle_info({__MODULE__, :init_worker}, state) do
|
||
%{
|
||
async: async,
|
||
resources: resources,
|
||
worker: worker,
|
||
state: pool_state,
|
||
worker_idle_timeout: worker_idle_timeout
|
||
} = state
|
||
|
||
{pool_state, resources, async} =
|
||
init_worker(worker, pool_state, resources, async, worker_idle_timeout)
|
||
|
||
{:noreply, maybe_checkout(%{state | async: async, resources: resources, state: pool_state})}
|
||
end
|
||
|
||
@impl true
|
||
def handle_info({:DOWN, ref, _, _, _} = down, state) do
|
||
%{monitors: monitors, async: async} = state
|
||
|
||
case monitors do
|
||
%{^ref => request_ref} ->
|
||
cancel_request_ref(request_ref, :DOWN, state)
|
||
|
||
%{} ->
|
||
case async do
|
||
%{^ref => _} -> remove_async_ref(ref, state)
|
||
%{} -> maybe_handle_info(down, state)
|
||
end
|
||
end
|
||
end
|
||
|
||
@impl true
|
||
def handle_info({:EXIT, pid, _reason} = exit, state) do
|
||
%{async: async} = state
|
||
|
||
case async do
|
||
%{^pid => _} -> {:noreply, %{state | async: Map.delete(async, pid)}}
|
||
%{} -> maybe_handle_info(exit, state)
|
||
end
|
||
end
|
||
|
||
@impl true
|
||
def handle_info({ref, worker_state} = reply, state) when is_reference(ref) do
|
||
%{async: async, resources: resources, worker_idle_timeout: worker_idle_timeout} = state
|
||
|
||
case async do
|
||
%{^ref => _} ->
|
||
Process.demonitor(ref, [:flush])
|
||
resources = :queue.in({worker_state, get_metadata(worker_idle_timeout)}, resources)
|
||
async = Map.delete(async, ref)
|
||
state = %{state | async: async, resources: resources}
|
||
{:noreply, maybe_checkout(state)}
|
||
|
||
%{} ->
|
||
maybe_handle_info(reply, state)
|
||
end
|
||
end
|
||
|
||
@impl true
|
||
def handle_info(
|
||
:check_idle,
|
||
%{resources: resources, worker_idle_timeout: worker_idle_timeout} = state
|
||
) do
|
||
case check_idle_resources(resources, state) do
|
||
{:ok, new_resources, new_state} ->
|
||
Process.send_after(self(), :check_idle, worker_idle_timeout)
|
||
{:noreply, %{new_state | resources: new_resources}}
|
||
|
||
{:stop, reason, state} ->
|
||
{:stop, {:shutdown, reason}, state}
|
||
end
|
||
end
|
||
|
||
@impl true
|
||
def handle_info(msg, state) do
|
||
maybe_handle_info(msg, state)
|
||
end
|
||
|
||
@impl true
|
||
def terminate(reason, %{worker: worker, resources: resources} = state) do
|
||
for {worker_server_state, _} <- :queue.to_list(resources) do
|
||
maybe_terminate_worker(reason, worker_server_state, state)
|
||
end
|
||
|
||
if function_exported?(worker, :terminate_pool, 2) do
|
||
worker.terminate_pool(reason, state)
|
||
end
|
||
|
||
:ok
|
||
end
|
||
|
||
defp do_init_pool(worker, arg) do
|
||
if function_exported?(worker, :init_pool, 1) do
|
||
worker.init_pool(arg)
|
||
else
|
||
{:ok, arg}
|
||
end
|
||
end
|
||
|
||
defp remove_async_ref(ref, state) do
|
||
%{
|
||
async: async,
|
||
resources: resources,
|
||
worker: worker,
|
||
state: pool_state,
|
||
worker_idle_timeout: worker_idle_timeout
|
||
} = state
|
||
|
||
# If an async worker failed to start, we try to start another one
|
||
# immediately, even if the pool is lazy, as we assume there is an
|
||
# immediate need for this resource.
|
||
{pool_state, resources, async} =
|
||
init_worker(worker, pool_state, resources, Map.delete(async, ref), worker_idle_timeout)
|
||
|
||
{:noreply, %{state | resources: resources, async: async, state: pool_state}}
|
||
end
|
||
|
||
defp cancel_request_ref(
|
||
ref,
|
||
reason,
|
||
%{requests: requests, worker: worker, state: pool_state} = state
|
||
) do
|
||
case requests do
|
||
# Exited or timed out before we could serve it
|
||
%{^ref => {_, mon_ref, :command, _command, _deadline}} ->
|
||
if function_exported?(worker, :handle_cancelled, 2) do
|
||
args = [:queued, pool_state]
|
||
apply_worker_callback(worker, :handle_cancelled, args)
|
||
end
|
||
|
||
{:noreply, remove_request(state, ref, mon_ref)}
|
||
|
||
# Exited or errored during client processing
|
||
%{^ref => {_, mon_ref, :state, worker_server_state}} ->
|
||
if function_exported?(worker, :handle_cancelled, 2) do
|
||
args = [:checked_out, pool_state]
|
||
apply_worker_callback(worker, :handle_cancelled, args)
|
||
end
|
||
|
||
state = remove_request(state, ref, mon_ref)
|
||
{:noreply, remove_worker(reason, worker_server_state, state)}
|
||
|
||
# The client timed out, sent us a message, and we dropped the deadlined request
|
||
%{} ->
|
||
if function_exported?(worker, :handle_cancelled, 2) do
|
||
args = [:queued, pool_state]
|
||
apply_worker_callback(worker, :handle_cancelled, args)
|
||
end
|
||
|
||
{:noreply, state}
|
||
end
|
||
end
|
||
|
||
defp maybe_handle_info(msg, state) do
|
||
%{resources: resources, worker: worker, worker_idle_timeout: worker_idle_timeout} = state
|
||
|
||
if function_exported?(worker, :handle_info, 2) do
|
||
{resources, state} =
|
||
Enum.reduce(:queue.to_list(resources), {:queue.new(), state}, fn
|
||
{worker_server_state, _}, {resources, state} ->
|
||
case apply_worker_callback(worker, :handle_info, [msg, worker_server_state]) do
|
||
{:ok, worker_server_state} ->
|
||
{:queue.in({worker_server_state, get_metadata(worker_idle_timeout)}, resources),
|
||
state}
|
||
|
||
{:remove, reason} ->
|
||
{resources, remove_worker(reason, worker_server_state, state)}
|
||
end
|
||
end)
|
||
|
||
{:noreply, %{state | resources: resources}}
|
||
else
|
||
{:noreply, state}
|
||
end
|
||
end
|
||
|
||
defp maybe_checkout(%{queue: queue, requests: requests} = state) do
|
||
case :queue.out(queue) do
|
||
{{:value, {pid, ref}}, queue} ->
|
||
case requests do
|
||
# The request still exists, so we are good to go
|
||
%{^ref => {^pid, mon_ref, :command, command, deadline}} ->
|
||
maybe_checkout(command, mon_ref, deadline, {pid, ref}, %{state | queue: queue})
|
||
|
||
# It should never happen
|
||
%{^ref => _} ->
|
||
exit(:unexpected_checkout)
|
||
|
||
# The request is no longer active, do nothing
|
||
%{} ->
|
||
maybe_checkout(%{state | queue: queue})
|
||
end
|
||
|
||
{:empty, _queue} ->
|
||
state
|
||
end
|
||
end
|
||
|
||
defp maybe_checkout(command, mon_ref, deadline, {pid, ref} = from, state) do
|
||
if past_deadline?(deadline) do
|
||
state = remove_request(state, ref, mon_ref)
|
||
maybe_checkout(state)
|
||
else
|
||
%{resources: resources, requests: requests, worker: worker, queue: queue, state: pool_state} =
|
||
state = init_worker_if_lazy_and_empty(state)
|
||
|
||
case :queue.out(resources) do
|
||
{{:value, {worker_server_state, _}}, resources} ->
|
||
args = [command, from, worker_server_state, pool_state]
|
||
|
||
case apply_worker_callback(pool_state, worker, :handle_checkout, args) do
|
||
{:ok, worker_client_state, worker_server_state, pool_state} ->
|
||
GenServer.reply({pid, ref}, worker_client_state)
|
||
|
||
requests = Map.put(requests, ref, {pid, mon_ref, :state, worker_server_state})
|
||
%{state | resources: resources, requests: requests, state: pool_state}
|
||
|
||
{:remove, reason, pool_state} ->
|
||
state = remove_worker(reason, worker_server_state, %{state | state: pool_state})
|
||
maybe_checkout(command, mon_ref, deadline, from, %{state | resources: resources})
|
||
|
||
{:skip, exception, pool_state} ->
|
||
GenServer.reply({pid, ref}, {:skipped, exception})
|
||
remove_request(%{state | state: pool_state}, ref, mon_ref)
|
||
|
||
other ->
|
||
raise """
|
||
unexpected return from #{inspect(worker)}.handle_checkout/4.
|
||
|
||
Expected: {:ok, client_state, server_state, pool_state} | {:remove, reason, pool_state} | {:skip, Exception.t(), pool_state}
|
||
Got: #{inspect(other)}
|
||
"""
|
||
end
|
||
|
||
{:empty, _} ->
|
||
%{state | queue: :queue.in(from, queue)}
|
||
end
|
||
end
|
||
end
|
||
|
||
defp init_worker_if_lazy_and_empty(%{lazy: nil} = state), do: state
|
||
|
||
defp init_worker_if_lazy_and_empty(
|
||
%{lazy: lazy, resources: resources, worker_idle_timeout: worker_idle_timeout} = state
|
||
) do
|
||
if lazy > 0 and :queue.is_empty(resources) do
|
||
%{async: async, worker: worker, state: pool_state} = state
|
||
|
||
{pool_state, resources, async} =
|
||
init_worker(worker, pool_state, resources, async, worker_idle_timeout)
|
||
|
||
%{state | async: async, resources: resources, state: pool_state, lazy: lazy - 1}
|
||
else
|
||
state
|
||
end
|
||
end
|
||
|
||
defp past_deadline?(deadline) when is_integer(deadline) do
|
||
System.monotonic_time() >= deadline
|
||
end
|
||
|
||
defp past_deadline?(:infinity), do: false
|
||
|
||
defp remove_worker(reason, worker_server_state, state) do
|
||
state = maybe_terminate_worker(reason, worker_server_state, state)
|
||
|
||
if lazy = state.lazy do
|
||
%{state | lazy: lazy + 1}
|
||
else
|
||
schedule_init()
|
||
state
|
||
end
|
||
end
|
||
|
||
defp check_idle_resources(resources, state) do
|
||
now_in_ms = System.monotonic_time(:millisecond)
|
||
do_check_idle_resources(resources, now_in_ms, state, :queue.new(), state.max_idle_pings)
|
||
end
|
||
|
||
defp do_check_idle_resources(resources, _now_in_ms, state, new_resources, 0) do
|
||
{:ok, :queue.join(new_resources, resources), state}
|
||
end
|
||
|
||
defp do_check_idle_resources(resources, now_in_ms, state, new_resources, remaining_pings) do
|
||
case :queue.out(resources) do
|
||
{:empty, _} ->
|
||
{:ok, new_resources, state}
|
||
|
||
{{:value, resource_data}, next_resources} ->
|
||
{worker_server_state, worker_metadata} = resource_data
|
||
time_diff = now_in_ms - worker_metadata
|
||
|
||
if time_diff >= state.worker_idle_timeout do
|
||
case maybe_ping_worker(worker_server_state, state) do
|
||
{:ok, new_worker_state} ->
|
||
# We don't need to update the worker_metadata because, by definition,
|
||
# if we are checking for idle resources again and the timestamp is the same,
|
||
# it is because it has to be checked again.
|
||
new_resource_data = {new_worker_state, worker_metadata}
|
||
new_resources = :queue.in(new_resource_data, new_resources)
|
||
|
||
do_check_idle_resources(
|
||
next_resources,
|
||
now_in_ms,
|
||
state,
|
||
new_resources,
|
||
remaining_pings - 1
|
||
)
|
||
|
||
{:remove, user_reason} ->
|
||
new_state = remove_worker(user_reason, worker_server_state, state)
|
||
|
||
do_check_idle_resources(
|
||
next_resources,
|
||
now_in_ms,
|
||
new_state,
|
||
new_resources,
|
||
remaining_pings - 1
|
||
)
|
||
|
||
{:stop, reason} ->
|
||
{:stop, reason, state}
|
||
end
|
||
else
|
||
{:ok, :queue.join(new_resources, resources), state}
|
||
end
|
||
end
|
||
end
|
||
|
||
defp maybe_ping_worker(worker_server_state, state) do
|
||
%{worker: worker, state: pool_state} = state
|
||
|
||
args = [worker_server_state, pool_state]
|
||
|
||
case apply_worker_callback(worker, :handle_ping, args) do
|
||
{:ok, worker_state} ->
|
||
{:ok, worker_state}
|
||
|
||
{:remove, user_reason} ->
|
||
{:remove, user_reason}
|
||
|
||
{:stop, user_reason} ->
|
||
{:stop, user_reason}
|
||
|
||
other ->
|
||
raise """
|
||
unexpected return from #{inspect(worker)}.handle_ping/2.
|
||
|
||
Expected:
|
||
|
||
{:remove, reason}
|
||
| {:ok, worker_state}
|
||
| {:stop, reason}
|
||
|
||
Got: #{inspect(other)}
|
||
"""
|
||
end
|
||
end
|
||
|
||
defp maybe_terminate_worker(reason, worker_server_state, state) do
|
||
%{worker: worker, state: pool_state} = state
|
||
|
||
if function_exported?(worker, :terminate_worker, 3) do
|
||
args = [reason, worker_server_state, pool_state]
|
||
|
||
case apply_worker_callback(worker, :terminate_worker, args) do
|
||
{:ok, pool_state} ->
|
||
%{state | state: pool_state}
|
||
|
||
{:remove, _reason} ->
|
||
state
|
||
|
||
other ->
|
||
raise """
|
||
unexpected return from #{inspect(worker)}.terminate_worker/3.
|
||
|
||
Expected:
|
||
|
||
{:ok, pool_state}
|
||
|
||
Got: #{inspect(other)}
|
||
"""
|
||
end
|
||
else
|
||
state
|
||
end
|
||
end
|
||
|
||
defp init_worker(worker, pool_state, resources, async, worker_idle_timeout) do
|
||
case apply_worker_callback(worker, :init_worker, [pool_state]) do
|
||
{:ok, worker_state, pool_state} ->
|
||
{pool_state, :queue.in({worker_state, get_metadata(worker_idle_timeout)}, resources),
|
||
async}
|
||
|
||
{:async, fun, pool_state} when is_function(fun, 0) ->
|
||
%{ref: ref, pid: pid} = Task.Supervisor.async(NimblePool.TaskSupervisor, fun)
|
||
{pool_state, resources, async |> Map.put(ref, pid) |> Map.put(pid, ref)}
|
||
|
||
{:remove, _reason} ->
|
||
send(self(), {__MODULE__, :init_worker})
|
||
{pool_state, resources, async}
|
||
|
||
other ->
|
||
raise """
|
||
unexpected return from #{inspect(worker)}.init_worker/1.
|
||
|
||
Expected:
|
||
|
||
{:ok, worker_state, pool_state}
|
||
| {:async, (() -> worker_state), pool_state}
|
||
|
||
Got: #{inspect(other)}
|
||
"""
|
||
end
|
||
end
|
||
|
||
defp schedule_init() do
|
||
send(self(), {__MODULE__, :init_worker})
|
||
end
|
||
|
||
defp apply_worker_callback(worker, fun, args) do
|
||
do_apply_worker_callback(worker, fun, args, &{:remove, &1})
|
||
end
|
||
|
||
defp apply_worker_callback(pool_state, worker, fun, args) do
|
||
do_apply_worker_callback(worker, fun, args, &{:remove, &1, pool_state})
|
||
end
|
||
|
||
defp do_apply_worker_callback(worker, fun, args, catch_fun) do
|
||
try do
|
||
apply(worker, fun, args)
|
||
catch
|
||
kind, reason ->
|
||
reason = Exception.normalize(kind, reason, __STACKTRACE__)
|
||
|
||
Logger.error(
|
||
[
|
||
"Error during #{inspect(worker)}.#{fun}/#{length(args)} callback:\n"
|
||
| Exception.format(kind, reason, __STACKTRACE__)
|
||
],
|
||
crash_reason: {crash_reason(kind, reason), __STACKTRACE__}
|
||
)
|
||
|
||
catch_fun.(reason)
|
||
end
|
||
end
|
||
|
||
defp crash_reason(:throw, value), do: {:nocatch, value}
|
||
defp crash_reason(_, value), do: value
|
||
|
||
defp remove_request(pool_state, ref, mon_ref) do
|
||
requests = Map.delete(pool_state.requests, ref)
|
||
monitors = Map.delete(pool_state.monitors, mon_ref)
|
||
Process.demonitor(mon_ref, [:flush])
|
||
%{pool_state | requests: requests, monitors: monitors}
|
||
end
|
||
|
||
defp handle_enqueue(worker, command, pool_state) do
|
||
if function_exported?(worker, :handle_enqueue, 2) do
|
||
worker.handle_enqueue(command, pool_state)
|
||
else
|
||
{:ok, command, pool_state}
|
||
end
|
||
end
|
||
|
||
defp get_metadata(nil), do: nil
|
||
defp get_metadata(_worker_idle_timeout), do: System.monotonic_time(:millisecond)
|
||
end
|