api-v2/deps/nimble_pool/lib/nimble_pool.ex
2025-04-16 10:03:13 -03:00

1121 lines
36 KiB
Elixir
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

defmodule NimblePool do
@external_resource "README.md"
@moduledoc "README.md"
|> File.read!()
|> String.split("<!-- MDOC !-->")
|> Enum.fetch!(1)
use GenServer
require Logger
@type from :: {pid, reference}
@type init_arg :: term
@type pool_state :: term
@type worker_state :: term
@type client_state :: term
@type user_reason :: term
@typedoc since: "1.1.0"
@type pool :: GenServer.server()
@doc """
Initializes the worker.
It receives the worker argument passed to `start_link/1` if `c:init_pool/1` is
not implemented, otherwise the pool state returned by `c:init_pool/1`. It must
return `{:ok, worker_state, pool_state}` or `{:async, fun, pool_state}`, where the `fun`
is a zero-arity function that must return the worker state.
If this callback returns `{:async, fun, pool_state}`, `fun` is executed in a **separate
one-off process**. Because of this, if you start resources that the pool needs to "own",
you need to transfer ownership to the pool process. For example, if your async `fun`
opens a `:gen_tcp` socket, you'll have to use `:gen_tcp.controlling_process/2` to transfer
ownership back to the pool.
> #### Blocking the pool {: .warning}
>
> This callback is synchronous and therefore will block the pool, potentially
> for a significant amount of time since it's executed in the pool process once
> per worker. > If you need to perform long initialization, consider using the
> `{:async, fun, pool_state}` return type.
"""
@doc callback: :worker
@callback init_worker(pool_state) ::
{:ok, worker_state, pool_state} | {:async, (-> worker_state), pool_state}
@doc """
Initializes the pool.
It receives the worker argument passed to `start_link/1` and must
return `{:ok, pool_state}` upon successful initialization,
`:ignore` to exit normally, or `{:stop, reason}` to exit with `reason`
and return `{:error, reason}`.
This is a good place to perform a registration, for example.
It must return the `pool_state`. The `pool_state` is given to
`init_worker`. By default, it simply returns the given arguments.
This callback is optional.
## Examples
@impl NimblePool
def init_pool(options) do
Registry.register(options[:registry], :some_key, :some_value)
end
"""
@doc callback: :pool
@callback init_pool(init_arg) :: {:ok, pool_state} | :ignore | {:stop, reason :: any()}
@doc """
Checks a worker out.
The `maybe_wrapped_command` is the `command` passed to `checkout!/4` if the worker
doesn't implement the `c:handle_enqueue/2` callback, otherwise it's the possibly-wrapped
command returned by `c:handle_enqueue/2`.
This callback must return one of:
* `{:ok, client_state, worker_state, pool_state}` — the client state is given to
the callback function passed to `checkout!/4`. `worker_state` and `pool_state`
can potentially update the state of the checked-out worker and the pool.
* `{:remove, reason, pool_state}` — `NimblePool` will remove the checked-out worker and
attempt to checkout another worker.
* `{:skip, Exception.t(), pool_state}` — `NimblePool` will skip the checkout, the client will
raise the returned exception, and the worker will be left ready for the next
checkout attempt.
> #### Blocking the pool {: .warning}
>
> This callback is synchronous and therefore will block the pool.
> Avoid performing long work in here. Instead, do as much work as
> possible on the client.
Once the worker is checked out, the worker won't handle any
messages targeted to `c:handle_info/2`.
"""
@doc callback: :worker
@callback handle_checkout(maybe_wrapped_command :: term, from, worker_state, pool_state) ::
{:ok, client_state, worker_state, pool_state}
| {:remove, user_reason, pool_state}
| {:skip, Exception.t(), pool_state}
@doc """
Checks a worker back in the pool.
It receives the potentially-updated `client_state`, returned by the `checkout!/4`
anonymous function, and it must return either
`{:ok, worker_state, pool_state}` or `{:remove, reason, pool_state}`.
> #### Blocking the pool {: .warning}
>
> This callback is synchronous and therefore will block the pool.
> Avoid performing long work in here, instead do as much work as
> possible on the client.
Once the connection is checked in, it may immediately be handed
to another client, without traversing any of the messages in the
pool inbox.
This callback is optional.
"""
@doc callback: :worker
@callback handle_checkin(client_state, from, worker_state, pool_state) ::
{:ok, worker_state, pool_state} | {:remove, user_reason, pool_state}
@doc """
Handles an update instruction from a checked out worker.
See `update/2` for more information.
This callback is optional.
"""
@doc callback: :worker
@callback handle_update(message :: term, worker_state, pool_state) ::
{:ok, worker_state, pool_state}
@doc """
Receives a message in the pool and handles it as each worker.
It receives the `message` and it must return either
`{:ok, worker_state}` to update the worker state, or `{:remove, reason}` to
remove the worker.
Since there is only a single pool process that can receive messages, this
callback is executed once for every worker when the pool receives `message`.
> #### Blocking the pool {: .warning}
>
> This callback is synchronous and therefore will block the pool while it
> executes for each worker. Avoid performing long work in here.
This callback is optional.
"""
@doc callback: :worker
@callback handle_info(message :: term, worker_state) ::
{:ok, worker_state} | {:remove, user_reason}
@doc """
Executed by the pool whenever a request to check out a worker is enqueued.
The `command` argument should be treated as an opaque value, but it can be
wrapped with some data to be used in `c:handle_checkout/4`.
It must return either `{:ok, maybe_wrapped_command, pool_state}` or
`{:skip, Exception.t(), pool_state}` if checkout is to be skipped.
> #### Blocking the pool {: .warning}
>
> This callback is synchronous and therefore will block the pool.
> Avoid performing long work in here.
This callback is optional.
## Examples
@impl NimblePool
def handle_enqueue(command, pool_state) do
{:ok, {:wrapped, command}, pool_state}
end
"""
@doc callback: :pool
@callback handle_enqueue(command :: term, pool_state) ::
{:ok, maybe_wrapped_command :: term, pool_state}
| {:skip, Exception.t(), pool_state}
@doc """
Terminates a worker.
The `reason` argument is:
* `:DOWN` whenever the client link breaks
* `:timeout` whenever the client times out
* one of `:throw`, `:error`, `:exit` whenever the client crashes with one
of the reasons above.
* `reason` if at any point you return `{:remove, reason}`
* if any callback raises, the raised exception will be given as `reason`.
It receives the latest known `worker_state`, which may not
be the latest state. For example, if a client checks out the
state and crashes, we don't fully know the `client_state`,
so the `c:terminate_worker/3` callback needs to take such scenarios
into account.
This callback must always return `{:ok, pool_state}` with the potentially-updated
pool state.
This callback is optional.
"""
@doc callback: :pool
@callback terminate_worker(
reason :: :DOWN | :timeout | :throw | :error | :exit | user_reason,
worker_state,
pool_state
) ::
{:ok, pool_state}
@doc """
Handle pings due to inactivity on the worker.
Executed whenever the idle worker periodic timer verifies that a worker has been idle
on the pool for longer than the `:worker_idle_timeout` pool configuration (in milliseconds).
This callback must return one of the following values:
* `{:ok, worker_state}`: Updates worker state.
* `{:remove, user_reason}`: The pool will proceed to the standard worker termination
defined in `terminate_worker/3`.
* `{:stop, user_reason}`: The entire pool process will be terminated, and `terminate_worker/3`
will be called for every worker on the pool.
This callback is optional.
## Max idle pings
The `:max_idle_pings` pool option is useful to prevent sequential termination of a large number
of workers. However, it is important to keep in mind the following behaviours whenever
utilizing it.
* If you are not terminating workers with `c:handle_ping/2`, you may end up pinging only
the same workers over and over again because each cycle will ping only the first
`:max_idle_pings` workers.
* If you are terminating workers with `c:handle_ping/2`, the last worker may be terminated
after up to `worker_idle_timeout + worker_idle_timeout * ceil(number_of_workers/max_idle_pings)`,
instead of `2 * worker_idle_timeout` milliseconds of idle time.
For instance consider a pool with 10 workers and a ping of 1 second.
Given a negligible worker termination time and a worst-case scenario where all the workers
go idle right after a verification cycle is started, then without `max_idle_pings` the
last worker will be terminated in the next cycle (2 seconds), whereas with a
`max_idle_pings` of 2 the last worker will be terminated only in the 5th cycle (6 seconds).
## Disclaimers
* On lazy pools, if no worker is currently on the pool the callback will never be called.
Therefore you can not rely on this callback to terminate empty lazy pools.
* On not lazy pools, if you return `{:remove, user_reason}` you may end up
terminating and initializing workers at the same time every idle verification cycle.
* On large pools, if many resources go idle at the same cycle, you may end up terminating
a large number of workers sequentially, which could lead to the pool being unable to
fulfill requests. See `:max_idle_pings` option to prevent this.
"""
@doc callback: :worker
@callback handle_ping(
worker_state,
pool_state
) ::
{:ok, worker_state} | {:remove, user_reason()} | {:stop, user_reason()}
@doc """
Handle pool termination.
The `reason` argmument is the same given to GenServer's terminate/2 callback.
It is not necessary to terminate workers here because the
`terminate_worker/3` callback has already been invoked.
This should be used only for clean up extra resources that can not be
handled by `terminate_worker/3` callback.
This callback is optional.
"""
@doc callback: :pool
@callback terminate_pool(
reason :: :DOWN | :timeout | :throw | :error | :exit | user_reason,
pool_state
) :: :ok
@doc """
Handle cancelled checkout requests.
This callback is executed when a checkout request is cancelled unexpectedly.
The context argument may be `:queued` or `:checked_out`:
* `:queued` means the cancellation happened before resource checkout. This may happen
when the pool is starving under load and can not serve resources.
* `:checked_out` means the cancellation happened after resource checkout. This may happen
when the function given to `checkout!/4` raises.
This callback is optional.
"""
@doc callback: :pool
@callback handle_cancelled(
context :: :queued | :checked_out,
pool_state
) :: :ok
@optional_callbacks init_pool: 1,
handle_checkin: 4,
handle_info: 2,
handle_enqueue: 2,
handle_update: 3,
handle_ping: 2,
terminate_worker: 3,
terminate_pool: 2,
handle_cancelled: 2
@doc """
Defines a pool to be started under the supervision tree.
It accepts the same options as `start_link/1` with the
addition or `:restart` and `:shutdown` that control the
"Child Specification".
## Examples
NimblePool.child_spec(worker: {__MODULE__, :some_arg}, restart: :temporary)
"""
@spec child_spec(keyword) :: Supervisor.child_spec()
def child_spec(opts) when is_list(opts) do
{worker, _} = Keyword.fetch!(opts, :worker)
{restart, opts} = Keyword.pop(opts, :restart, :permanent)
{shutdown, opts} = Keyword.pop(opts, :shutdown, 5_000)
%{
id: worker,
start: {__MODULE__, :start_link, [opts]},
shutdown: shutdown,
restart: restart
}
end
@doc """
Starts a pool.
## Options
* `:worker` - a `{worker_mod, worker_init_arg}` tuple with the worker
module that implements the `NimblePool` behaviour and the worker
initial argument. This argument is **required**.
* `:pool_size` - how many workers in the pool. Defaults to `10`.
* `:lazy` - When `true`, workers are started lazily, only when necessary.
Defaults to `false`.
* `:worker_idle_timeout` - Timeout in milliseconds to tag a worker as idle.
If not nil, starts a periodic timer on the same frequency that will ping
all idle workers using `handle_ping/2` optional callback .
Defaults to no timeout.
* `:max_idle_pings` - Defines a limit to the number of workers that can be pinged
for each cycle of the `handle_ping/2` optional callback.
Defaults to no limit. See `handle_ping/2` for more details.
"""
@spec start_link(keyword) :: GenServer.on_start()
def start_link(opts) when is_list(opts) do
{{worker, arg}, opts} =
Keyword.pop_lazy(opts, :worker, fn ->
raise ArgumentError, "missing required :worker option"
end)
{pool_size, opts} = Keyword.pop(opts, :pool_size, 10)
{lazy, opts} = Keyword.pop(opts, :lazy, false)
{worker_idle_timeout, opts} = Keyword.pop(opts, :worker_idle_timeout, nil)
{max_idle_pings, opts} = Keyword.pop(opts, :max_idle_pings, -1)
unless is_atom(worker) do
raise ArgumentError, "worker must be an atom, got: #{inspect(worker)}"
end
unless is_integer(pool_size) and pool_size > 0 do
raise ArgumentError, "pool_size must be a positive integer, got: #{inspect(pool_size)}"
end
GenServer.start_link(
__MODULE__,
{worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings},
opts
)
end
@doc """
Stops the given `pool`.
The pool exits with the given `reason`. The pool has `timeout` milliseconds
to terminate, otherwise it will be brutally terminated.
## Examples
NimblePool.stop(pool)
#=> :ok
"""
@spec stop(pool, reason :: term, timeout) :: :ok
def stop(pool, reason \\ :normal, timeout \\ :infinity) do
GenServer.stop(pool, reason, timeout)
end
@doc """
Checks out a worker from the pool.
It expects a command, which will be passed to the `c:handle_checkout/4`
callback. The `c:handle_checkout/4` callback will return a client state,
which is given to the `function`.
The `function` receives two arguments, the request
(`{pid(), reference()}`) and the `client_state`.
The function must return a two-element tuple, where the first element is the
return value for `checkout!/4`, and the second element is the updated `client_state`,
which will be given as the first argument to `c:handle_checkin/4`.
`checkout!/4` also has an optional `timeout` value. This value will be applied
to the checkout operation itself. The "check in" operation happens asynchronously.
"""
@spec checkout!(pool, command :: term, function, timeout) :: result
when function: (from, client_state -> {result, client_state}), result: var
def checkout!(pool, command, function, timeout \\ 5_000) when is_function(function, 2) do
# Re-implementation of gen.erl call to avoid multiple monitors.
pid = GenServer.whereis(pool)
unless pid do
exit!(:noproc, :checkout, [pool])
end
ref = Process.monitor(pid)
send_call(pid, ref, {:checkout, command, deadline(timeout)})
receive do
{^ref, {:skipped, exception}} ->
raise exception
{^ref, client_state} ->
Process.demonitor(ref, [:flush])
try do
function.({pid, ref}, client_state)
catch
kind, reason ->
send(pid, {__MODULE__, :cancel, ref, kind})
:erlang.raise(kind, reason, __STACKTRACE__)
else
{result, client_state} ->
send(pid, {__MODULE__, :checkin, ref, client_state})
result
end
{:DOWN, ^ref, _, _, :noconnection} ->
exit!({:nodedown, get_node(pid)}, :checkout, [pool])
{:DOWN, ^ref, _, _, reason} ->
exit!(reason, :checkout, [pool])
after
timeout ->
send(pid, {__MODULE__, :cancel, ref, :timeout})
Process.demonitor(ref, [:flush])
exit!(:timeout, :checkout, [pool])
end
end
@doc """
Sends an **update** instruction to the pool about the checked out worker.
This must be called inside the `checkout!/4` callback function with
the `from` value given to `c:handle_checkout/4`.
This is useful to update the pool's state before effectively
checking the state in, which is handy when transferring
resources requires two steps.
"""
@spec update(from, command :: term) :: :ok
def update({pid, ref} = _from, command) do
send(pid, {__MODULE__, :update, ref, command})
:ok
end
defp deadline(timeout) when is_integer(timeout) do
System.monotonic_time() + System.convert_time_unit(timeout, :millisecond, :native)
end
defp deadline(:infinity), do: :infinity
defp get_node({_, node}), do: node
defp get_node(pid) when is_pid(pid), do: node(pid)
defp send_call(pid, ref, message) do
# Auto-connect is asynchronous. But we still use :noconnect to make sure
# we send on the monitored connection, and not trigger a new auto-connect.
Process.send(pid, {:"$gen_call", {self(), ref}, message}, [:noconnect])
end
defp exit!(reason, fun, args) do
exit({reason, {__MODULE__, fun, args}})
end
## Callbacks
@impl true
def init({worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings}) do
Process.flag(:trap_exit, true)
case Code.ensure_loaded(worker) do
{:module, _} ->
:ok
{:error, reason} ->
raise ArgumentError, "failed to load worker module #{inspect(worker)}: #{inspect(reason)}"
end
lazy = if lazy, do: pool_size, else: nil
if worker_idle_timeout do
if function_exported?(worker, :handle_ping, 2) do
Process.send_after(self(), :check_idle, worker_idle_timeout)
else
IO.warn(
":worker_idle_timeout was given but the worker does not export a handle_ping/2 callback"
)
end
end
with {:ok, pool_state} <- do_init_pool(worker, arg) do
{pool_state, resources, async} =
if is_nil(lazy) do
Enum.reduce(1..pool_size, {pool_state, :queue.new(), %{}}, fn
_, {pool_state, resources, async} ->
init_worker(worker, pool_state, resources, async, worker_idle_timeout)
end)
else
{pool_state, :queue.new(), %{}}
end
state = %{
worker: worker,
queue: :queue.new(),
requests: %{},
monitors: %{},
resources: resources,
async: async,
state: pool_state,
lazy: lazy,
worker_idle_timeout: worker_idle_timeout,
max_idle_pings: max_idle_pings
}
{:ok, state}
end
end
@impl true
def handle_call({:checkout, command, deadline}, {pid, ref} = from, state) do
%{requests: requests, monitors: monitors, worker: worker, state: pool_state} = state
mon_ref = Process.monitor(pid)
requests = Map.put(requests, ref, {pid, mon_ref, :command, command, deadline})
monitors = Map.put(monitors, mon_ref, ref)
state = %{state | requests: requests, monitors: monitors}
case handle_enqueue(worker, command, pool_state) do
{:ok, command, pool_state} ->
{:noreply, maybe_checkout(command, mon_ref, deadline, from, %{state | state: pool_state})}
{:skip, exception, pool_state} ->
state = remove_request(%{state | state: pool_state}, ref, mon_ref)
{:reply, {:skipped, exception}, state}
end
end
@impl true
def handle_info({__MODULE__, :update, ref, command}, state) do
%{requests: requests, state: pool_state, worker: worker} = state
case requests do
%{^ref => {pid, mon_ref, :state, worker_state}} ->
{:ok, worker_state, pool_state} = worker.handle_update(command, worker_state, pool_state)
requests = Map.put(requests, ref, {pid, mon_ref, :state, worker_state})
{:noreply, %{state | requests: requests, state: pool_state}}
%{} ->
exit(:unexpected_precheckin)
end
end
@impl true
def handle_info({__MODULE__, :checkin, ref, worker_client_state}, state) do
%{
requests: requests,
resources: resources,
worker: worker,
state: pool_state,
worker_idle_timeout: worker_idle_timeout
} = state
case requests do
%{^ref => {pid, mon_ref, :state, worker_server_state}} ->
checkin =
if function_exported?(worker, :handle_checkin, 4) do
args = [worker_client_state, {pid, ref}, worker_server_state, pool_state]
apply_worker_callback(pool_state, worker, :handle_checkin, args)
else
{:ok, worker_server_state, pool_state}
end
{resources, state} =
case checkin do
{:ok, worker_server_state, pool_state} ->
{:queue.in({worker_server_state, get_metadata(worker_idle_timeout)}, resources),
%{state | state: pool_state}}
{:remove, reason, pool_state} ->
{resources,
remove_worker(reason, worker_server_state, %{state | state: pool_state})}
end
state = remove_request(state, ref, mon_ref)
{:noreply, maybe_checkout(%{state | resources: resources})}
%{} ->
exit(:unexpected_checkin)
end
end
@impl true
def handle_info({__MODULE__, :cancel, ref, reason}, state) do
cancel_request_ref(ref, reason, state)
end
@impl true
def handle_info({__MODULE__, :init_worker}, state) do
%{
async: async,
resources: resources,
worker: worker,
state: pool_state,
worker_idle_timeout: worker_idle_timeout
} = state
{pool_state, resources, async} =
init_worker(worker, pool_state, resources, async, worker_idle_timeout)
{:noreply, maybe_checkout(%{state | async: async, resources: resources, state: pool_state})}
end
@impl true
def handle_info({:DOWN, ref, _, _, _} = down, state) do
%{monitors: monitors, async: async} = state
case monitors do
%{^ref => request_ref} ->
cancel_request_ref(request_ref, :DOWN, state)
%{} ->
case async do
%{^ref => _} -> remove_async_ref(ref, state)
%{} -> maybe_handle_info(down, state)
end
end
end
@impl true
def handle_info({:EXIT, pid, _reason} = exit, state) do
%{async: async} = state
case async do
%{^pid => _} -> {:noreply, %{state | async: Map.delete(async, pid)}}
%{} -> maybe_handle_info(exit, state)
end
end
@impl true
def handle_info({ref, worker_state} = reply, state) when is_reference(ref) do
%{async: async, resources: resources, worker_idle_timeout: worker_idle_timeout} = state
case async do
%{^ref => _} ->
Process.demonitor(ref, [:flush])
resources = :queue.in({worker_state, get_metadata(worker_idle_timeout)}, resources)
async = Map.delete(async, ref)
state = %{state | async: async, resources: resources}
{:noreply, maybe_checkout(state)}
%{} ->
maybe_handle_info(reply, state)
end
end
@impl true
def handle_info(
:check_idle,
%{resources: resources, worker_idle_timeout: worker_idle_timeout} = state
) do
case check_idle_resources(resources, state) do
{:ok, new_resources, new_state} ->
Process.send_after(self(), :check_idle, worker_idle_timeout)
{:noreply, %{new_state | resources: new_resources}}
{:stop, reason, state} ->
{:stop, {:shutdown, reason}, state}
end
end
@impl true
def handle_info(msg, state) do
maybe_handle_info(msg, state)
end
@impl true
def terminate(reason, %{worker: worker, resources: resources} = state) do
for {worker_server_state, _} <- :queue.to_list(resources) do
maybe_terminate_worker(reason, worker_server_state, state)
end
if function_exported?(worker, :terminate_pool, 2) do
worker.terminate_pool(reason, state)
end
:ok
end
defp do_init_pool(worker, arg) do
if function_exported?(worker, :init_pool, 1) do
worker.init_pool(arg)
else
{:ok, arg}
end
end
defp remove_async_ref(ref, state) do
%{
async: async,
resources: resources,
worker: worker,
state: pool_state,
worker_idle_timeout: worker_idle_timeout
} = state
# If an async worker failed to start, we try to start another one
# immediately, even if the pool is lazy, as we assume there is an
# immediate need for this resource.
{pool_state, resources, async} =
init_worker(worker, pool_state, resources, Map.delete(async, ref), worker_idle_timeout)
{:noreply, %{state | resources: resources, async: async, state: pool_state}}
end
defp cancel_request_ref(
ref,
reason,
%{requests: requests, worker: worker, state: pool_state} = state
) do
case requests do
# Exited or timed out before we could serve it
%{^ref => {_, mon_ref, :command, _command, _deadline}} ->
if function_exported?(worker, :handle_cancelled, 2) do
args = [:queued, pool_state]
apply_worker_callback(worker, :handle_cancelled, args)
end
{:noreply, remove_request(state, ref, mon_ref)}
# Exited or errored during client processing
%{^ref => {_, mon_ref, :state, worker_server_state}} ->
if function_exported?(worker, :handle_cancelled, 2) do
args = [:checked_out, pool_state]
apply_worker_callback(worker, :handle_cancelled, args)
end
state = remove_request(state, ref, mon_ref)
{:noreply, remove_worker(reason, worker_server_state, state)}
# The client timed out, sent us a message, and we dropped the deadlined request
%{} ->
if function_exported?(worker, :handle_cancelled, 2) do
args = [:queued, pool_state]
apply_worker_callback(worker, :handle_cancelled, args)
end
{:noreply, state}
end
end
defp maybe_handle_info(msg, state) do
%{resources: resources, worker: worker, worker_idle_timeout: worker_idle_timeout} = state
if function_exported?(worker, :handle_info, 2) do
{resources, state} =
Enum.reduce(:queue.to_list(resources), {:queue.new(), state}, fn
{worker_server_state, _}, {resources, state} ->
case apply_worker_callback(worker, :handle_info, [msg, worker_server_state]) do
{:ok, worker_server_state} ->
{:queue.in({worker_server_state, get_metadata(worker_idle_timeout)}, resources),
state}
{:remove, reason} ->
{resources, remove_worker(reason, worker_server_state, state)}
end
end)
{:noreply, %{state | resources: resources}}
else
{:noreply, state}
end
end
defp maybe_checkout(%{queue: queue, requests: requests} = state) do
case :queue.out(queue) do
{{:value, {pid, ref}}, queue} ->
case requests do
# The request still exists, so we are good to go
%{^ref => {^pid, mon_ref, :command, command, deadline}} ->
maybe_checkout(command, mon_ref, deadline, {pid, ref}, %{state | queue: queue})
# It should never happen
%{^ref => _} ->
exit(:unexpected_checkout)
# The request is no longer active, do nothing
%{} ->
maybe_checkout(%{state | queue: queue})
end
{:empty, _queue} ->
state
end
end
defp maybe_checkout(command, mon_ref, deadline, {pid, ref} = from, state) do
if past_deadline?(deadline) do
state = remove_request(state, ref, mon_ref)
maybe_checkout(state)
else
%{resources: resources, requests: requests, worker: worker, queue: queue, state: pool_state} =
state = init_worker_if_lazy_and_empty(state)
case :queue.out(resources) do
{{:value, {worker_server_state, _}}, resources} ->
args = [command, from, worker_server_state, pool_state]
case apply_worker_callback(pool_state, worker, :handle_checkout, args) do
{:ok, worker_client_state, worker_server_state, pool_state} ->
GenServer.reply({pid, ref}, worker_client_state)
requests = Map.put(requests, ref, {pid, mon_ref, :state, worker_server_state})
%{state | resources: resources, requests: requests, state: pool_state}
{:remove, reason, pool_state} ->
state = remove_worker(reason, worker_server_state, %{state | state: pool_state})
maybe_checkout(command, mon_ref, deadline, from, %{state | resources: resources})
{:skip, exception, pool_state} ->
GenServer.reply({pid, ref}, {:skipped, exception})
remove_request(%{state | state: pool_state}, ref, mon_ref)
other ->
raise """
unexpected return from #{inspect(worker)}.handle_checkout/4.
Expected: {:ok, client_state, server_state, pool_state} | {:remove, reason, pool_state} | {:skip, Exception.t(), pool_state}
Got: #{inspect(other)}
"""
end
{:empty, _} ->
%{state | queue: :queue.in(from, queue)}
end
end
end
defp init_worker_if_lazy_and_empty(%{lazy: nil} = state), do: state
defp init_worker_if_lazy_and_empty(
%{lazy: lazy, resources: resources, worker_idle_timeout: worker_idle_timeout} = state
) do
if lazy > 0 and :queue.is_empty(resources) do
%{async: async, worker: worker, state: pool_state} = state
{pool_state, resources, async} =
init_worker(worker, pool_state, resources, async, worker_idle_timeout)
%{state | async: async, resources: resources, state: pool_state, lazy: lazy - 1}
else
state
end
end
defp past_deadline?(deadline) when is_integer(deadline) do
System.monotonic_time() >= deadline
end
defp past_deadline?(:infinity), do: false
defp remove_worker(reason, worker_server_state, state) do
state = maybe_terminate_worker(reason, worker_server_state, state)
if lazy = state.lazy do
%{state | lazy: lazy + 1}
else
schedule_init()
state
end
end
defp check_idle_resources(resources, state) do
now_in_ms = System.monotonic_time(:millisecond)
do_check_idle_resources(resources, now_in_ms, state, :queue.new(), state.max_idle_pings)
end
defp do_check_idle_resources(resources, _now_in_ms, state, new_resources, 0) do
{:ok, :queue.join(new_resources, resources), state}
end
defp do_check_idle_resources(resources, now_in_ms, state, new_resources, remaining_pings) do
case :queue.out(resources) do
{:empty, _} ->
{:ok, new_resources, state}
{{:value, resource_data}, next_resources} ->
{worker_server_state, worker_metadata} = resource_data
time_diff = now_in_ms - worker_metadata
if time_diff >= state.worker_idle_timeout do
case maybe_ping_worker(worker_server_state, state) do
{:ok, new_worker_state} ->
# We don't need to update the worker_metadata because, by definition,
# if we are checking for idle resources again and the timestamp is the same,
# it is because it has to be checked again.
new_resource_data = {new_worker_state, worker_metadata}
new_resources = :queue.in(new_resource_data, new_resources)
do_check_idle_resources(
next_resources,
now_in_ms,
state,
new_resources,
remaining_pings - 1
)
{:remove, user_reason} ->
new_state = remove_worker(user_reason, worker_server_state, state)
do_check_idle_resources(
next_resources,
now_in_ms,
new_state,
new_resources,
remaining_pings - 1
)
{:stop, reason} ->
{:stop, reason, state}
end
else
{:ok, :queue.join(new_resources, resources), state}
end
end
end
defp maybe_ping_worker(worker_server_state, state) do
%{worker: worker, state: pool_state} = state
args = [worker_server_state, pool_state]
case apply_worker_callback(worker, :handle_ping, args) do
{:ok, worker_state} ->
{:ok, worker_state}
{:remove, user_reason} ->
{:remove, user_reason}
{:stop, user_reason} ->
{:stop, user_reason}
other ->
raise """
unexpected return from #{inspect(worker)}.handle_ping/2.
Expected:
{:remove, reason}
| {:ok, worker_state}
| {:stop, reason}
Got: #{inspect(other)}
"""
end
end
defp maybe_terminate_worker(reason, worker_server_state, state) do
%{worker: worker, state: pool_state} = state
if function_exported?(worker, :terminate_worker, 3) do
args = [reason, worker_server_state, pool_state]
case apply_worker_callback(worker, :terminate_worker, args) do
{:ok, pool_state} ->
%{state | state: pool_state}
{:remove, _reason} ->
state
other ->
raise """
unexpected return from #{inspect(worker)}.terminate_worker/3.
Expected:
{:ok, pool_state}
Got: #{inspect(other)}
"""
end
else
state
end
end
defp init_worker(worker, pool_state, resources, async, worker_idle_timeout) do
case apply_worker_callback(worker, :init_worker, [pool_state]) do
{:ok, worker_state, pool_state} ->
{pool_state, :queue.in({worker_state, get_metadata(worker_idle_timeout)}, resources),
async}
{:async, fun, pool_state} when is_function(fun, 0) ->
%{ref: ref, pid: pid} = Task.Supervisor.async(NimblePool.TaskSupervisor, fun)
{pool_state, resources, async |> Map.put(ref, pid) |> Map.put(pid, ref)}
{:remove, _reason} ->
send(self(), {__MODULE__, :init_worker})
{pool_state, resources, async}
other ->
raise """
unexpected return from #{inspect(worker)}.init_worker/1.
Expected:
{:ok, worker_state, pool_state}
| {:async, (() -> worker_state), pool_state}
Got: #{inspect(other)}
"""
end
end
defp schedule_init() do
send(self(), {__MODULE__, :init_worker})
end
defp apply_worker_callback(worker, fun, args) do
do_apply_worker_callback(worker, fun, args, &{:remove, &1})
end
defp apply_worker_callback(pool_state, worker, fun, args) do
do_apply_worker_callback(worker, fun, args, &{:remove, &1, pool_state})
end
defp do_apply_worker_callback(worker, fun, args, catch_fun) do
try do
apply(worker, fun, args)
catch
kind, reason ->
reason = Exception.normalize(kind, reason, __STACKTRACE__)
Logger.error(
[
"Error during #{inspect(worker)}.#{fun}/#{length(args)} callback:\n"
| Exception.format(kind, reason, __STACKTRACE__)
],
crash_reason: {crash_reason(kind, reason), __STACKTRACE__}
)
catch_fun.(reason)
end
end
defp crash_reason(:throw, value), do: {:nocatch, value}
defp crash_reason(_, value), do: value
defp remove_request(pool_state, ref, mon_ref) do
requests = Map.delete(pool_state.requests, ref)
monitors = Map.delete(pool_state.monitors, mon_ref)
Process.demonitor(mon_ref, [:flush])
%{pool_state | requests: requests, monitors: monitors}
end
defp handle_enqueue(worker, command, pool_state) do
if function_exported?(worker, :handle_enqueue, 2) do
worker.handle_enqueue(command, pool_state)
else
{:ok, command, pool_state}
end
end
defp get_metadata(nil), do: nil
defp get_metadata(_worker_idle_timeout), do: System.monotonic_time(:millisecond)
end