1121 lines
		
	
	
		
			36 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
			
		
		
	
	
			1121 lines
		
	
	
		
			36 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
defmodule NimblePool do
 | 
						||
  @external_resource "README.md"
 | 
						||
  @moduledoc "README.md"
 | 
						||
             |> File.read!()
 | 
						||
             |> String.split("<!-- MDOC !-->")
 | 
						||
             |> Enum.fetch!(1)
 | 
						||
 | 
						||
  use GenServer
 | 
						||
  require Logger
 | 
						||
 | 
						||
  @type from :: {pid, reference}
 | 
						||
  @type init_arg :: term
 | 
						||
  @type pool_state :: term
 | 
						||
  @type worker_state :: term
 | 
						||
  @type client_state :: term
 | 
						||
  @type user_reason :: term
 | 
						||
 | 
						||
  @typedoc since: "1.1.0"
 | 
						||
  @type pool :: GenServer.server()
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Initializes the worker.
 | 
						||
 | 
						||
  It receives the worker argument passed to `start_link/1` if `c:init_pool/1` is
 | 
						||
  not implemented, otherwise the pool state returned by `c:init_pool/1`. It must
 | 
						||
  return `{:ok, worker_state, pool_state}` or `{:async, fun, pool_state}`, where the `fun`
 | 
						||
  is a zero-arity function that must return the worker state.
 | 
						||
 | 
						||
  If this callback returns `{:async, fun, pool_state}`, `fun` is executed in a **separate
 | 
						||
  one-off process**. Because of this, if you start resources that the pool needs to "own",
 | 
						||
  you need to transfer ownership to the pool process. For example, if your async `fun`
 | 
						||
  opens a `:gen_tcp` socket, you'll have to use `:gen_tcp.controlling_process/2` to transfer
 | 
						||
  ownership back to the pool.
 | 
						||
 | 
						||
  > #### Blocking the pool {: .warning}
 | 
						||
  >
 | 
						||
  > This callback is synchronous and therefore will block the pool, potentially
 | 
						||
  > for a significant amount of time since it's executed in the pool process once
 | 
						||
  > per worker. > If you need to perform long initialization, consider using the
 | 
						||
  > `{:async, fun, pool_state}` return type.
 | 
						||
  """
 | 
						||
  @doc callback: :worker
 | 
						||
  @callback init_worker(pool_state) ::
 | 
						||
              {:ok, worker_state, pool_state} | {:async, (-> worker_state), pool_state}
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Initializes the pool.
 | 
						||
 | 
						||
  It receives the worker argument passed to `start_link/1` and must
 | 
						||
  return `{:ok, pool_state}` upon successful initialization,
 | 
						||
  `:ignore` to exit normally, or `{:stop, reason}` to exit with `reason`
 | 
						||
  and return `{:error, reason}`.
 | 
						||
 | 
						||
  This is a good place to perform a registration, for example.
 | 
						||
 | 
						||
  It must return the `pool_state`. The `pool_state` is given to
 | 
						||
  `init_worker`. By default, it simply returns the given arguments.
 | 
						||
 | 
						||
  This callback is optional.
 | 
						||
 | 
						||
  ## Examples
 | 
						||
 | 
						||
      @impl NimblePool
 | 
						||
      def init_pool(options) do
 | 
						||
        Registry.register(options[:registry], :some_key, :some_value)
 | 
						||
      end
 | 
						||
 | 
						||
  """
 | 
						||
  @doc callback: :pool
 | 
						||
  @callback init_pool(init_arg) :: {:ok, pool_state} | :ignore | {:stop, reason :: any()}
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Checks a worker out.
 | 
						||
 | 
						||
  The `maybe_wrapped_command` is the `command` passed to `checkout!/4` if the worker
 | 
						||
  doesn't implement the `c:handle_enqueue/2` callback, otherwise it's the possibly-wrapped
 | 
						||
  command returned by `c:handle_enqueue/2`.
 | 
						||
 | 
						||
  This callback must return one of:
 | 
						||
 | 
						||
    * `{:ok, client_state, worker_state, pool_state}` — the client state is given to
 | 
						||
      the callback function passed to `checkout!/4`. `worker_state` and `pool_state`
 | 
						||
      can potentially update the state of the checked-out worker and the pool.
 | 
						||
 | 
						||
    * `{:remove, reason, pool_state}` — `NimblePool` will remove the checked-out worker and
 | 
						||
      attempt to checkout another worker.
 | 
						||
 | 
						||
    * `{:skip, Exception.t(), pool_state}` — `NimblePool` will skip the checkout, the client will
 | 
						||
      raise the returned exception, and the worker will be left ready for the next
 | 
						||
      checkout attempt.
 | 
						||
 | 
						||
  > #### Blocking the pool {: .warning}
 | 
						||
  >
 | 
						||
  > This callback is synchronous and therefore will block the pool.
 | 
						||
  > Avoid performing long work in here. Instead, do as much work as
 | 
						||
  > possible on the client.
 | 
						||
 | 
						||
  Once the worker is checked out, the worker won't handle any
 | 
						||
  messages targeted to `c:handle_info/2`.
 | 
						||
  """
 | 
						||
  @doc callback: :worker
 | 
						||
  @callback handle_checkout(maybe_wrapped_command :: term, from, worker_state, pool_state) ::
 | 
						||
              {:ok, client_state, worker_state, pool_state}
 | 
						||
              | {:remove, user_reason, pool_state}
 | 
						||
              | {:skip, Exception.t(), pool_state}
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Checks a worker back in the pool.
 | 
						||
 | 
						||
  It receives the potentially-updated `client_state`, returned by the `checkout!/4`
 | 
						||
  anonymous function, and it must return either
 | 
						||
  `{:ok, worker_state, pool_state}` or `{:remove, reason, pool_state}`.
 | 
						||
 | 
						||
  > #### Blocking the pool {: .warning}
 | 
						||
  >
 | 
						||
  > This callback is synchronous and therefore will block the pool.
 | 
						||
  > Avoid performing long work in here, instead do as much work as
 | 
						||
  > possible on the client.
 | 
						||
 | 
						||
  Once the connection is checked in, it may immediately be handed
 | 
						||
  to another client, without traversing any of the messages in the
 | 
						||
  pool inbox.
 | 
						||
 | 
						||
  This callback is optional.
 | 
						||
  """
 | 
						||
  @doc callback: :worker
 | 
						||
  @callback handle_checkin(client_state, from, worker_state, pool_state) ::
 | 
						||
              {:ok, worker_state, pool_state} | {:remove, user_reason, pool_state}
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Handles an update instruction from a checked out worker.
 | 
						||
 | 
						||
  See `update/2` for more information.
 | 
						||
 | 
						||
  This callback is optional.
 | 
						||
  """
 | 
						||
  @doc callback: :worker
 | 
						||
  @callback handle_update(message :: term, worker_state, pool_state) ::
 | 
						||
              {:ok, worker_state, pool_state}
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Receives a message in the pool and handles it as each worker.
 | 
						||
 | 
						||
  It receives the `message` and it must return either
 | 
						||
  `{:ok, worker_state}` to update the worker state, or `{:remove, reason}` to
 | 
						||
  remove the worker.
 | 
						||
 | 
						||
  Since there is only a single pool process that can receive messages, this
 | 
						||
  callback is executed once for every worker when the pool receives `message`.
 | 
						||
 | 
						||
  > #### Blocking the pool {: .warning}
 | 
						||
  >
 | 
						||
  > This callback is synchronous and therefore will block the pool while it
 | 
						||
  > executes for each worker. Avoid performing long work in here.
 | 
						||
 | 
						||
  This callback is optional.
 | 
						||
  """
 | 
						||
  @doc callback: :worker
 | 
						||
  @callback handle_info(message :: term, worker_state) ::
 | 
						||
              {:ok, worker_state} | {:remove, user_reason}
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Executed by the pool whenever a request to check out a worker is enqueued.
 | 
						||
 | 
						||
  The `command` argument should be treated as an opaque value, but it can be
 | 
						||
  wrapped with some data to be used in `c:handle_checkout/4`.
 | 
						||
 | 
						||
  It must return either `{:ok, maybe_wrapped_command, pool_state}` or
 | 
						||
  `{:skip, Exception.t(), pool_state}` if checkout is to be skipped.
 | 
						||
 | 
						||
  > #### Blocking the pool {: .warning}
 | 
						||
  >
 | 
						||
  > This callback is synchronous and therefore will block the pool.
 | 
						||
  > Avoid performing long work in here.
 | 
						||
 | 
						||
  This callback is optional.
 | 
						||
 | 
						||
  ## Examples
 | 
						||
 | 
						||
      @impl NimblePool
 | 
						||
      def handle_enqueue(command, pool_state) do
 | 
						||
        {:ok, {:wrapped, command}, pool_state}
 | 
						||
      end
 | 
						||
 | 
						||
  """
 | 
						||
  @doc callback: :pool
 | 
						||
  @callback handle_enqueue(command :: term, pool_state) ::
 | 
						||
              {:ok, maybe_wrapped_command :: term, pool_state}
 | 
						||
              | {:skip, Exception.t(), pool_state}
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Terminates a worker.
 | 
						||
 | 
						||
  The `reason` argument is:
 | 
						||
 | 
						||
    * `:DOWN` whenever the client link breaks
 | 
						||
    * `:timeout` whenever the client times out
 | 
						||
    * one of `:throw`, `:error`, `:exit` whenever the client crashes with one
 | 
						||
      of the reasons above.
 | 
						||
    * `reason` if at any point you return `{:remove, reason}`
 | 
						||
    * if any callback raises, the raised exception will be given as `reason`.
 | 
						||
 | 
						||
  It receives the latest known `worker_state`, which may not
 | 
						||
  be the latest state. For example, if a client checks out the
 | 
						||
  state and crashes, we don't fully know the `client_state`,
 | 
						||
  so the `c:terminate_worker/3` callback needs to take such scenarios
 | 
						||
  into account.
 | 
						||
 | 
						||
  This callback must always return `{:ok, pool_state}` with the potentially-updated
 | 
						||
  pool state.
 | 
						||
 | 
						||
  This callback is optional.
 | 
						||
  """
 | 
						||
  @doc callback: :pool
 | 
						||
  @callback terminate_worker(
 | 
						||
              reason :: :DOWN | :timeout | :throw | :error | :exit | user_reason,
 | 
						||
              worker_state,
 | 
						||
              pool_state
 | 
						||
            ) ::
 | 
						||
              {:ok, pool_state}
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Handle pings due to inactivity on the worker.
 | 
						||
 | 
						||
  Executed whenever the idle worker periodic timer verifies that a worker has been idle
 | 
						||
  on the pool for longer than the `:worker_idle_timeout` pool configuration (in milliseconds).
 | 
						||
 | 
						||
  This callback must return one of the following values:
 | 
						||
 | 
						||
    * `{:ok, worker_state}`: Updates worker state.
 | 
						||
 | 
						||
    * `{:remove, user_reason}`: The pool will proceed to the standard worker termination
 | 
						||
        defined in `terminate_worker/3`.
 | 
						||
 | 
						||
    * `{:stop, user_reason}`: The entire pool process will be terminated, and `terminate_worker/3`
 | 
						||
        will be called for every worker on the pool.
 | 
						||
 | 
						||
  This callback is optional.
 | 
						||
 | 
						||
  ## Max idle pings
 | 
						||
 | 
						||
  The `:max_idle_pings` pool option is useful to prevent sequential termination of a large number
 | 
						||
  of workers. However, it is important to keep in mind the following behaviours whenever
 | 
						||
  utilizing it.
 | 
						||
 | 
						||
    * If you are not terminating workers with `c:handle_ping/2`, you may end up pinging only
 | 
						||
      the same workers over and over again because each cycle will ping only the first
 | 
						||
      `:max_idle_pings` workers.
 | 
						||
 | 
						||
    * If you are terminating workers with `c:handle_ping/2`, the last worker may be terminated
 | 
						||
      after up to `worker_idle_timeout + worker_idle_timeout * ceil(number_of_workers/max_idle_pings)`,
 | 
						||
      instead of `2 * worker_idle_timeout` milliseconds of idle time.
 | 
						||
 | 
						||
  For instance consider a pool with 10 workers and a ping of 1 second.
 | 
						||
 | 
						||
  Given a negligible worker termination time and a worst-case scenario where all the workers
 | 
						||
  go idle right after a verification cycle is started, then without `max_idle_pings` the
 | 
						||
  last worker will be terminated in the next cycle (2 seconds), whereas with a
 | 
						||
  `max_idle_pings` of 2 the last worker will be terminated only in the 5th cycle (6 seconds).
 | 
						||
 | 
						||
  ## Disclaimers
 | 
						||
 | 
						||
    * On lazy pools, if no worker is currently on the pool the callback will never be called.
 | 
						||
      Therefore you can not rely on this callback to terminate empty lazy pools.
 | 
						||
 | 
						||
    * On not lazy pools, if you return `{:remove, user_reason}` you may end up
 | 
						||
      terminating and initializing workers at the same time every idle verification cycle.
 | 
						||
 | 
						||
    * On large pools, if many resources go idle at the same cycle, you may end up terminating
 | 
						||
      a large number of workers sequentially, which could lead to the pool being unable to
 | 
						||
      fulfill requests. See `:max_idle_pings` option to prevent this.
 | 
						||
 | 
						||
  """
 | 
						||
  @doc callback: :worker
 | 
						||
  @callback handle_ping(
 | 
						||
              worker_state,
 | 
						||
              pool_state
 | 
						||
            ) ::
 | 
						||
              {:ok, worker_state} | {:remove, user_reason()} | {:stop, user_reason()}
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Handle pool termination.
 | 
						||
 | 
						||
  The `reason` argmument is the same given to GenServer's terminate/2 callback.
 | 
						||
 | 
						||
  It is not necessary to terminate workers here because the
 | 
						||
  `terminate_worker/3` callback has already been invoked.
 | 
						||
 | 
						||
  This should be used only for clean up extra resources that can not be
 | 
						||
  handled by `terminate_worker/3` callback.
 | 
						||
 | 
						||
  This callback is optional.
 | 
						||
  """
 | 
						||
  @doc callback: :pool
 | 
						||
  @callback terminate_pool(
 | 
						||
              reason :: :DOWN | :timeout | :throw | :error | :exit | user_reason,
 | 
						||
              pool_state
 | 
						||
            ) :: :ok
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Handle cancelled checkout requests.
 | 
						||
 | 
						||
  This callback is executed when a checkout request is cancelled unexpectedly.
 | 
						||
 | 
						||
  The context argument may be `:queued` or `:checked_out`:
 | 
						||
 | 
						||
  * `:queued` means the cancellation happened before resource checkout. This may happen
 | 
						||
  when the pool is starving under load and can not serve resources.
 | 
						||
 | 
						||
  * `:checked_out` means the cancellation happened after resource checkout. This may happen
 | 
						||
  when the function given to `checkout!/4` raises.
 | 
						||
 | 
						||
  This callback is optional.
 | 
						||
  """
 | 
						||
  @doc callback: :pool
 | 
						||
  @callback handle_cancelled(
 | 
						||
              context :: :queued | :checked_out,
 | 
						||
              pool_state
 | 
						||
            ) :: :ok
 | 
						||
 | 
						||
  @optional_callbacks init_pool: 1,
 | 
						||
                      handle_checkin: 4,
 | 
						||
                      handle_info: 2,
 | 
						||
                      handle_enqueue: 2,
 | 
						||
                      handle_update: 3,
 | 
						||
                      handle_ping: 2,
 | 
						||
                      terminate_worker: 3,
 | 
						||
                      terminate_pool: 2,
 | 
						||
                      handle_cancelled: 2
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Defines a pool to be started under the supervision tree.
 | 
						||
 | 
						||
  It accepts the same options as `start_link/1` with the
 | 
						||
  addition or `:restart` and `:shutdown` that control the
 | 
						||
  "Child Specification".
 | 
						||
 | 
						||
  ## Examples
 | 
						||
 | 
						||
      NimblePool.child_spec(worker: {__MODULE__, :some_arg}, restart: :temporary)
 | 
						||
 | 
						||
  """
 | 
						||
  @spec child_spec(keyword) :: Supervisor.child_spec()
 | 
						||
  def child_spec(opts) when is_list(opts) do
 | 
						||
    {worker, _} = Keyword.fetch!(opts, :worker)
 | 
						||
    {restart, opts} = Keyword.pop(opts, :restart, :permanent)
 | 
						||
    {shutdown, opts} = Keyword.pop(opts, :shutdown, 5_000)
 | 
						||
 | 
						||
    %{
 | 
						||
      id: worker,
 | 
						||
      start: {__MODULE__, :start_link, [opts]},
 | 
						||
      shutdown: shutdown,
 | 
						||
      restart: restart
 | 
						||
    }
 | 
						||
  end
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Starts a pool.
 | 
						||
 | 
						||
  ## Options
 | 
						||
 | 
						||
    * `:worker` - a `{worker_mod, worker_init_arg}` tuple with the worker
 | 
						||
      module that implements the `NimblePool` behaviour and the worker
 | 
						||
      initial argument. This argument is **required**.
 | 
						||
 | 
						||
    * `:pool_size` - how many workers in the pool. Defaults to `10`.
 | 
						||
 | 
						||
    * `:lazy` - When `true`, workers are started lazily, only when necessary.
 | 
						||
      Defaults to `false`.
 | 
						||
 | 
						||
    * `:worker_idle_timeout` - Timeout in milliseconds to tag a worker as idle.
 | 
						||
      If not nil, starts a periodic timer on the same frequency that will ping
 | 
						||
      all idle workers using `handle_ping/2` optional callback .
 | 
						||
      Defaults to no timeout.
 | 
						||
 | 
						||
    * `:max_idle_pings` - Defines a limit to the number of workers that can be pinged
 | 
						||
      for each cycle of the `handle_ping/2` optional callback.
 | 
						||
      Defaults to no limit. See `handle_ping/2` for more details.
 | 
						||
 | 
						||
  """
 | 
						||
  @spec start_link(keyword) :: GenServer.on_start()
 | 
						||
  def start_link(opts) when is_list(opts) do
 | 
						||
    {{worker, arg}, opts} =
 | 
						||
      Keyword.pop_lazy(opts, :worker, fn ->
 | 
						||
        raise ArgumentError, "missing required :worker option"
 | 
						||
      end)
 | 
						||
 | 
						||
    {pool_size, opts} = Keyword.pop(opts, :pool_size, 10)
 | 
						||
    {lazy, opts} = Keyword.pop(opts, :lazy, false)
 | 
						||
    {worker_idle_timeout, opts} = Keyword.pop(opts, :worker_idle_timeout, nil)
 | 
						||
    {max_idle_pings, opts} = Keyword.pop(opts, :max_idle_pings, -1)
 | 
						||
 | 
						||
    unless is_atom(worker) do
 | 
						||
      raise ArgumentError, "worker must be an atom, got: #{inspect(worker)}"
 | 
						||
    end
 | 
						||
 | 
						||
    unless is_integer(pool_size) and pool_size > 0 do
 | 
						||
      raise ArgumentError, "pool_size must be a positive integer, got: #{inspect(pool_size)}"
 | 
						||
    end
 | 
						||
 | 
						||
    GenServer.start_link(
 | 
						||
      __MODULE__,
 | 
						||
      {worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings},
 | 
						||
      opts
 | 
						||
    )
 | 
						||
  end
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Stops the given `pool`.
 | 
						||
 | 
						||
  The pool exits with the given `reason`. The pool has `timeout` milliseconds
 | 
						||
  to terminate, otherwise it will be brutally terminated.
 | 
						||
 | 
						||
  ## Examples
 | 
						||
 | 
						||
      NimblePool.stop(pool)
 | 
						||
      #=> :ok
 | 
						||
 | 
						||
  """
 | 
						||
  @spec stop(pool, reason :: term, timeout) :: :ok
 | 
						||
  def stop(pool, reason \\ :normal, timeout \\ :infinity) do
 | 
						||
    GenServer.stop(pool, reason, timeout)
 | 
						||
  end
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Checks out a worker from the pool.
 | 
						||
 | 
						||
  It expects a command, which will be passed to the `c:handle_checkout/4`
 | 
						||
  callback. The `c:handle_checkout/4` callback will return a client state,
 | 
						||
  which is given to the `function`.
 | 
						||
 | 
						||
  The `function` receives two arguments, the request
 | 
						||
  (`{pid(), reference()}`) and the `client_state`.
 | 
						||
  The function must return a two-element tuple, where the first element is the
 | 
						||
  return value for `checkout!/4`, and the second element is the updated `client_state`,
 | 
						||
  which will be given as the first argument to `c:handle_checkin/4`.
 | 
						||
 | 
						||
  `checkout!/4` also has an optional `timeout` value. This value will be applied
 | 
						||
  to the checkout operation itself. The "check in" operation happens asynchronously.
 | 
						||
  """
 | 
						||
  @spec checkout!(pool, command :: term, function, timeout) :: result
 | 
						||
        when function: (from, client_state -> {result, client_state}), result: var
 | 
						||
  def checkout!(pool, command, function, timeout \\ 5_000) when is_function(function, 2) do
 | 
						||
    # Re-implementation of gen.erl call to avoid multiple monitors.
 | 
						||
    pid = GenServer.whereis(pool)
 | 
						||
 | 
						||
    unless pid do
 | 
						||
      exit!(:noproc, :checkout, [pool])
 | 
						||
    end
 | 
						||
 | 
						||
    ref = Process.monitor(pid)
 | 
						||
    send_call(pid, ref, {:checkout, command, deadline(timeout)})
 | 
						||
 | 
						||
    receive do
 | 
						||
      {^ref, {:skipped, exception}} ->
 | 
						||
        raise exception
 | 
						||
 | 
						||
      {^ref, client_state} ->
 | 
						||
        Process.demonitor(ref, [:flush])
 | 
						||
 | 
						||
        try do
 | 
						||
          function.({pid, ref}, client_state)
 | 
						||
        catch
 | 
						||
          kind, reason ->
 | 
						||
            send(pid, {__MODULE__, :cancel, ref, kind})
 | 
						||
            :erlang.raise(kind, reason, __STACKTRACE__)
 | 
						||
        else
 | 
						||
          {result, client_state} ->
 | 
						||
            send(pid, {__MODULE__, :checkin, ref, client_state})
 | 
						||
            result
 | 
						||
        end
 | 
						||
 | 
						||
      {:DOWN, ^ref, _, _, :noconnection} ->
 | 
						||
        exit!({:nodedown, get_node(pid)}, :checkout, [pool])
 | 
						||
 | 
						||
      {:DOWN, ^ref, _, _, reason} ->
 | 
						||
        exit!(reason, :checkout, [pool])
 | 
						||
    after
 | 
						||
      timeout ->
 | 
						||
        send(pid, {__MODULE__, :cancel, ref, :timeout})
 | 
						||
        Process.demonitor(ref, [:flush])
 | 
						||
        exit!(:timeout, :checkout, [pool])
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  @doc """
 | 
						||
  Sends an **update** instruction to the pool about the checked out worker.
 | 
						||
 | 
						||
  This must be called inside the `checkout!/4` callback function with
 | 
						||
  the `from` value given to `c:handle_checkout/4`.
 | 
						||
 | 
						||
  This is useful to update the pool's state before effectively
 | 
						||
  checking the state in, which is handy when transferring
 | 
						||
  resources requires two steps.
 | 
						||
  """
 | 
						||
  @spec update(from, command :: term) :: :ok
 | 
						||
  def update({pid, ref} = _from, command) do
 | 
						||
    send(pid, {__MODULE__, :update, ref, command})
 | 
						||
    :ok
 | 
						||
  end
 | 
						||
 | 
						||
  defp deadline(timeout) when is_integer(timeout) do
 | 
						||
    System.monotonic_time() + System.convert_time_unit(timeout, :millisecond, :native)
 | 
						||
  end
 | 
						||
 | 
						||
  defp deadline(:infinity), do: :infinity
 | 
						||
 | 
						||
  defp get_node({_, node}), do: node
 | 
						||
  defp get_node(pid) when is_pid(pid), do: node(pid)
 | 
						||
 | 
						||
  defp send_call(pid, ref, message) do
 | 
						||
    # Auto-connect is asynchronous. But we still use :noconnect to make sure
 | 
						||
    # we send on the monitored connection, and not trigger a new auto-connect.
 | 
						||
    Process.send(pid, {:"$gen_call", {self(), ref}, message}, [:noconnect])
 | 
						||
  end
 | 
						||
 | 
						||
  defp exit!(reason, fun, args) do
 | 
						||
    exit({reason, {__MODULE__, fun, args}})
 | 
						||
  end
 | 
						||
 | 
						||
  ## Callbacks
 | 
						||
 | 
						||
  @impl true
 | 
						||
  def init({worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings}) do
 | 
						||
    Process.flag(:trap_exit, true)
 | 
						||
 | 
						||
    case Code.ensure_loaded(worker) do
 | 
						||
      {:module, _} ->
 | 
						||
        :ok
 | 
						||
 | 
						||
      {:error, reason} ->
 | 
						||
        raise ArgumentError, "failed to load worker module #{inspect(worker)}: #{inspect(reason)}"
 | 
						||
    end
 | 
						||
 | 
						||
    lazy = if lazy, do: pool_size, else: nil
 | 
						||
 | 
						||
    if worker_idle_timeout do
 | 
						||
      if function_exported?(worker, :handle_ping, 2) do
 | 
						||
        Process.send_after(self(), :check_idle, worker_idle_timeout)
 | 
						||
      else
 | 
						||
        IO.warn(
 | 
						||
          ":worker_idle_timeout was given but the worker does not export a handle_ping/2 callback"
 | 
						||
        )
 | 
						||
      end
 | 
						||
    end
 | 
						||
 | 
						||
    with {:ok, pool_state} <- do_init_pool(worker, arg) do
 | 
						||
      {pool_state, resources, async} =
 | 
						||
        if is_nil(lazy) do
 | 
						||
          Enum.reduce(1..pool_size, {pool_state, :queue.new(), %{}}, fn
 | 
						||
            _, {pool_state, resources, async} ->
 | 
						||
              init_worker(worker, pool_state, resources, async, worker_idle_timeout)
 | 
						||
          end)
 | 
						||
        else
 | 
						||
          {pool_state, :queue.new(), %{}}
 | 
						||
        end
 | 
						||
 | 
						||
      state = %{
 | 
						||
        worker: worker,
 | 
						||
        queue: :queue.new(),
 | 
						||
        requests: %{},
 | 
						||
        monitors: %{},
 | 
						||
        resources: resources,
 | 
						||
        async: async,
 | 
						||
        state: pool_state,
 | 
						||
        lazy: lazy,
 | 
						||
        worker_idle_timeout: worker_idle_timeout,
 | 
						||
        max_idle_pings: max_idle_pings
 | 
						||
      }
 | 
						||
 | 
						||
      {:ok, state}
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  @impl true
 | 
						||
  def handle_call({:checkout, command, deadline}, {pid, ref} = from, state) do
 | 
						||
    %{requests: requests, monitors: monitors, worker: worker, state: pool_state} = state
 | 
						||
    mon_ref = Process.monitor(pid)
 | 
						||
    requests = Map.put(requests, ref, {pid, mon_ref, :command, command, deadline})
 | 
						||
    monitors = Map.put(monitors, mon_ref, ref)
 | 
						||
    state = %{state | requests: requests, monitors: monitors}
 | 
						||
 | 
						||
    case handle_enqueue(worker, command, pool_state) do
 | 
						||
      {:ok, command, pool_state} ->
 | 
						||
        {:noreply, maybe_checkout(command, mon_ref, deadline, from, %{state | state: pool_state})}
 | 
						||
 | 
						||
      {:skip, exception, pool_state} ->
 | 
						||
        state = remove_request(%{state | state: pool_state}, ref, mon_ref)
 | 
						||
        {:reply, {:skipped, exception}, state}
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  @impl true
 | 
						||
  def handle_info({__MODULE__, :update, ref, command}, state) do
 | 
						||
    %{requests: requests, state: pool_state, worker: worker} = state
 | 
						||
 | 
						||
    case requests do
 | 
						||
      %{^ref => {pid, mon_ref, :state, worker_state}} ->
 | 
						||
        {:ok, worker_state, pool_state} = worker.handle_update(command, worker_state, pool_state)
 | 
						||
        requests = Map.put(requests, ref, {pid, mon_ref, :state, worker_state})
 | 
						||
        {:noreply, %{state | requests: requests, state: pool_state}}
 | 
						||
 | 
						||
      %{} ->
 | 
						||
        exit(:unexpected_precheckin)
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  @impl true
 | 
						||
  def handle_info({__MODULE__, :checkin, ref, worker_client_state}, state) do
 | 
						||
    %{
 | 
						||
      requests: requests,
 | 
						||
      resources: resources,
 | 
						||
      worker: worker,
 | 
						||
      state: pool_state,
 | 
						||
      worker_idle_timeout: worker_idle_timeout
 | 
						||
    } = state
 | 
						||
 | 
						||
    case requests do
 | 
						||
      %{^ref => {pid, mon_ref, :state, worker_server_state}} ->
 | 
						||
        checkin =
 | 
						||
          if function_exported?(worker, :handle_checkin, 4) do
 | 
						||
            args = [worker_client_state, {pid, ref}, worker_server_state, pool_state]
 | 
						||
            apply_worker_callback(pool_state, worker, :handle_checkin, args)
 | 
						||
          else
 | 
						||
            {:ok, worker_server_state, pool_state}
 | 
						||
          end
 | 
						||
 | 
						||
        {resources, state} =
 | 
						||
          case checkin do
 | 
						||
            {:ok, worker_server_state, pool_state} ->
 | 
						||
              {:queue.in({worker_server_state, get_metadata(worker_idle_timeout)}, resources),
 | 
						||
               %{state | state: pool_state}}
 | 
						||
 | 
						||
            {:remove, reason, pool_state} ->
 | 
						||
              {resources,
 | 
						||
               remove_worker(reason, worker_server_state, %{state | state: pool_state})}
 | 
						||
          end
 | 
						||
 | 
						||
        state = remove_request(state, ref, mon_ref)
 | 
						||
        {:noreply, maybe_checkout(%{state | resources: resources})}
 | 
						||
 | 
						||
      %{} ->
 | 
						||
        exit(:unexpected_checkin)
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  @impl true
 | 
						||
  def handle_info({__MODULE__, :cancel, ref, reason}, state) do
 | 
						||
    cancel_request_ref(ref, reason, state)
 | 
						||
  end
 | 
						||
 | 
						||
  @impl true
 | 
						||
  def handle_info({__MODULE__, :init_worker}, state) do
 | 
						||
    %{
 | 
						||
      async: async,
 | 
						||
      resources: resources,
 | 
						||
      worker: worker,
 | 
						||
      state: pool_state,
 | 
						||
      worker_idle_timeout: worker_idle_timeout
 | 
						||
    } = state
 | 
						||
 | 
						||
    {pool_state, resources, async} =
 | 
						||
      init_worker(worker, pool_state, resources, async, worker_idle_timeout)
 | 
						||
 | 
						||
    {:noreply, maybe_checkout(%{state | async: async, resources: resources, state: pool_state})}
 | 
						||
  end
 | 
						||
 | 
						||
  @impl true
 | 
						||
  def handle_info({:DOWN, ref, _, _, _} = down, state) do
 | 
						||
    %{monitors: monitors, async: async} = state
 | 
						||
 | 
						||
    case monitors do
 | 
						||
      %{^ref => request_ref} ->
 | 
						||
        cancel_request_ref(request_ref, :DOWN, state)
 | 
						||
 | 
						||
      %{} ->
 | 
						||
        case async do
 | 
						||
          %{^ref => _} -> remove_async_ref(ref, state)
 | 
						||
          %{} -> maybe_handle_info(down, state)
 | 
						||
        end
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  @impl true
 | 
						||
  def handle_info({:EXIT, pid, _reason} = exit, state) do
 | 
						||
    %{async: async} = state
 | 
						||
 | 
						||
    case async do
 | 
						||
      %{^pid => _} -> {:noreply, %{state | async: Map.delete(async, pid)}}
 | 
						||
      %{} -> maybe_handle_info(exit, state)
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  @impl true
 | 
						||
  def handle_info({ref, worker_state} = reply, state) when is_reference(ref) do
 | 
						||
    %{async: async, resources: resources, worker_idle_timeout: worker_idle_timeout} = state
 | 
						||
 | 
						||
    case async do
 | 
						||
      %{^ref => _} ->
 | 
						||
        Process.demonitor(ref, [:flush])
 | 
						||
        resources = :queue.in({worker_state, get_metadata(worker_idle_timeout)}, resources)
 | 
						||
        async = Map.delete(async, ref)
 | 
						||
        state = %{state | async: async, resources: resources}
 | 
						||
        {:noreply, maybe_checkout(state)}
 | 
						||
 | 
						||
      %{} ->
 | 
						||
        maybe_handle_info(reply, state)
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  @impl true
 | 
						||
  def handle_info(
 | 
						||
        :check_idle,
 | 
						||
        %{resources: resources, worker_idle_timeout: worker_idle_timeout} = state
 | 
						||
      ) do
 | 
						||
    case check_idle_resources(resources, state) do
 | 
						||
      {:ok, new_resources, new_state} ->
 | 
						||
        Process.send_after(self(), :check_idle, worker_idle_timeout)
 | 
						||
        {:noreply, %{new_state | resources: new_resources}}
 | 
						||
 | 
						||
      {:stop, reason, state} ->
 | 
						||
        {:stop, {:shutdown, reason}, state}
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  @impl true
 | 
						||
  def handle_info(msg, state) do
 | 
						||
    maybe_handle_info(msg, state)
 | 
						||
  end
 | 
						||
 | 
						||
  @impl true
 | 
						||
  def terminate(reason, %{worker: worker, resources: resources} = state) do
 | 
						||
    for {worker_server_state, _} <- :queue.to_list(resources) do
 | 
						||
      maybe_terminate_worker(reason, worker_server_state, state)
 | 
						||
    end
 | 
						||
 | 
						||
    if function_exported?(worker, :terminate_pool, 2) do
 | 
						||
      worker.terminate_pool(reason, state)
 | 
						||
    end
 | 
						||
 | 
						||
    :ok
 | 
						||
  end
 | 
						||
 | 
						||
  defp do_init_pool(worker, arg) do
 | 
						||
    if function_exported?(worker, :init_pool, 1) do
 | 
						||
      worker.init_pool(arg)
 | 
						||
    else
 | 
						||
      {:ok, arg}
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  defp remove_async_ref(ref, state) do
 | 
						||
    %{
 | 
						||
      async: async,
 | 
						||
      resources: resources,
 | 
						||
      worker: worker,
 | 
						||
      state: pool_state,
 | 
						||
      worker_idle_timeout: worker_idle_timeout
 | 
						||
    } = state
 | 
						||
 | 
						||
    # If an async worker failed to start, we try to start another one
 | 
						||
    # immediately, even if the pool is lazy, as we assume there is an
 | 
						||
    # immediate need for this resource.
 | 
						||
    {pool_state, resources, async} =
 | 
						||
      init_worker(worker, pool_state, resources, Map.delete(async, ref), worker_idle_timeout)
 | 
						||
 | 
						||
    {:noreply, %{state | resources: resources, async: async, state: pool_state}}
 | 
						||
  end
 | 
						||
 | 
						||
  defp cancel_request_ref(
 | 
						||
         ref,
 | 
						||
         reason,
 | 
						||
         %{requests: requests, worker: worker, state: pool_state} = state
 | 
						||
       ) do
 | 
						||
    case requests do
 | 
						||
      # Exited or timed out before we could serve it
 | 
						||
      %{^ref => {_, mon_ref, :command, _command, _deadline}} ->
 | 
						||
        if function_exported?(worker, :handle_cancelled, 2) do
 | 
						||
          args = [:queued, pool_state]
 | 
						||
          apply_worker_callback(worker, :handle_cancelled, args)
 | 
						||
        end
 | 
						||
 | 
						||
        {:noreply, remove_request(state, ref, mon_ref)}
 | 
						||
 | 
						||
      # Exited or errored during client processing
 | 
						||
      %{^ref => {_, mon_ref, :state, worker_server_state}} ->
 | 
						||
        if function_exported?(worker, :handle_cancelled, 2) do
 | 
						||
          args = [:checked_out, pool_state]
 | 
						||
          apply_worker_callback(worker, :handle_cancelled, args)
 | 
						||
        end
 | 
						||
 | 
						||
        state = remove_request(state, ref, mon_ref)
 | 
						||
        {:noreply, remove_worker(reason, worker_server_state, state)}
 | 
						||
 | 
						||
      # The client timed out, sent us a message, and we dropped the deadlined request
 | 
						||
      %{} ->
 | 
						||
        if function_exported?(worker, :handle_cancelled, 2) do
 | 
						||
          args = [:queued, pool_state]
 | 
						||
          apply_worker_callback(worker, :handle_cancelled, args)
 | 
						||
        end
 | 
						||
 | 
						||
        {:noreply, state}
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  defp maybe_handle_info(msg, state) do
 | 
						||
    %{resources: resources, worker: worker, worker_idle_timeout: worker_idle_timeout} = state
 | 
						||
 | 
						||
    if function_exported?(worker, :handle_info, 2) do
 | 
						||
      {resources, state} =
 | 
						||
        Enum.reduce(:queue.to_list(resources), {:queue.new(), state}, fn
 | 
						||
          {worker_server_state, _}, {resources, state} ->
 | 
						||
            case apply_worker_callback(worker, :handle_info, [msg, worker_server_state]) do
 | 
						||
              {:ok, worker_server_state} ->
 | 
						||
                {:queue.in({worker_server_state, get_metadata(worker_idle_timeout)}, resources),
 | 
						||
                 state}
 | 
						||
 | 
						||
              {:remove, reason} ->
 | 
						||
                {resources, remove_worker(reason, worker_server_state, state)}
 | 
						||
            end
 | 
						||
        end)
 | 
						||
 | 
						||
      {:noreply, %{state | resources: resources}}
 | 
						||
    else
 | 
						||
      {:noreply, state}
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  defp maybe_checkout(%{queue: queue, requests: requests} = state) do
 | 
						||
    case :queue.out(queue) do
 | 
						||
      {{:value, {pid, ref}}, queue} ->
 | 
						||
        case requests do
 | 
						||
          # The request still exists, so we are good to go
 | 
						||
          %{^ref => {^pid, mon_ref, :command, command, deadline}} ->
 | 
						||
            maybe_checkout(command, mon_ref, deadline, {pid, ref}, %{state | queue: queue})
 | 
						||
 | 
						||
          # It should never happen
 | 
						||
          %{^ref => _} ->
 | 
						||
            exit(:unexpected_checkout)
 | 
						||
 | 
						||
          # The request is no longer active, do nothing
 | 
						||
          %{} ->
 | 
						||
            maybe_checkout(%{state | queue: queue})
 | 
						||
        end
 | 
						||
 | 
						||
      {:empty, _queue} ->
 | 
						||
        state
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  defp maybe_checkout(command, mon_ref, deadline, {pid, ref} = from, state) do
 | 
						||
    if past_deadline?(deadline) do
 | 
						||
      state = remove_request(state, ref, mon_ref)
 | 
						||
      maybe_checkout(state)
 | 
						||
    else
 | 
						||
      %{resources: resources, requests: requests, worker: worker, queue: queue, state: pool_state} =
 | 
						||
        state = init_worker_if_lazy_and_empty(state)
 | 
						||
 | 
						||
      case :queue.out(resources) do
 | 
						||
        {{:value, {worker_server_state, _}}, resources} ->
 | 
						||
          args = [command, from, worker_server_state, pool_state]
 | 
						||
 | 
						||
          case apply_worker_callback(pool_state, worker, :handle_checkout, args) do
 | 
						||
            {:ok, worker_client_state, worker_server_state, pool_state} ->
 | 
						||
              GenServer.reply({pid, ref}, worker_client_state)
 | 
						||
 | 
						||
              requests = Map.put(requests, ref, {pid, mon_ref, :state, worker_server_state})
 | 
						||
              %{state | resources: resources, requests: requests, state: pool_state}
 | 
						||
 | 
						||
            {:remove, reason, pool_state} ->
 | 
						||
              state = remove_worker(reason, worker_server_state, %{state | state: pool_state})
 | 
						||
              maybe_checkout(command, mon_ref, deadline, from, %{state | resources: resources})
 | 
						||
 | 
						||
            {:skip, exception, pool_state} ->
 | 
						||
              GenServer.reply({pid, ref}, {:skipped, exception})
 | 
						||
              remove_request(%{state | state: pool_state}, ref, mon_ref)
 | 
						||
 | 
						||
            other ->
 | 
						||
              raise """
 | 
						||
              unexpected return from #{inspect(worker)}.handle_checkout/4.
 | 
						||
 | 
						||
              Expected: {:ok, client_state, server_state, pool_state} | {:remove, reason, pool_state} | {:skip, Exception.t(), pool_state}
 | 
						||
              Got: #{inspect(other)}
 | 
						||
              """
 | 
						||
          end
 | 
						||
 | 
						||
        {:empty, _} ->
 | 
						||
          %{state | queue: :queue.in(from, queue)}
 | 
						||
      end
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  defp init_worker_if_lazy_and_empty(%{lazy: nil} = state), do: state
 | 
						||
 | 
						||
  defp init_worker_if_lazy_and_empty(
 | 
						||
         %{lazy: lazy, resources: resources, worker_idle_timeout: worker_idle_timeout} = state
 | 
						||
       ) do
 | 
						||
    if lazy > 0 and :queue.is_empty(resources) do
 | 
						||
      %{async: async, worker: worker, state: pool_state} = state
 | 
						||
 | 
						||
      {pool_state, resources, async} =
 | 
						||
        init_worker(worker, pool_state, resources, async, worker_idle_timeout)
 | 
						||
 | 
						||
      %{state | async: async, resources: resources, state: pool_state, lazy: lazy - 1}
 | 
						||
    else
 | 
						||
      state
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  defp past_deadline?(deadline) when is_integer(deadline) do
 | 
						||
    System.monotonic_time() >= deadline
 | 
						||
  end
 | 
						||
 | 
						||
  defp past_deadline?(:infinity), do: false
 | 
						||
 | 
						||
  defp remove_worker(reason, worker_server_state, state) do
 | 
						||
    state = maybe_terminate_worker(reason, worker_server_state, state)
 | 
						||
 | 
						||
    if lazy = state.lazy do
 | 
						||
      %{state | lazy: lazy + 1}
 | 
						||
    else
 | 
						||
      schedule_init()
 | 
						||
      state
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  defp check_idle_resources(resources, state) do
 | 
						||
    now_in_ms = System.monotonic_time(:millisecond)
 | 
						||
    do_check_idle_resources(resources, now_in_ms, state, :queue.new(), state.max_idle_pings)
 | 
						||
  end
 | 
						||
 | 
						||
  defp do_check_idle_resources(resources, _now_in_ms, state, new_resources, 0) do
 | 
						||
    {:ok, :queue.join(new_resources, resources), state}
 | 
						||
  end
 | 
						||
 | 
						||
  defp do_check_idle_resources(resources, now_in_ms, state, new_resources, remaining_pings) do
 | 
						||
    case :queue.out(resources) do
 | 
						||
      {:empty, _} ->
 | 
						||
        {:ok, new_resources, state}
 | 
						||
 | 
						||
      {{:value, resource_data}, next_resources} ->
 | 
						||
        {worker_server_state, worker_metadata} = resource_data
 | 
						||
        time_diff = now_in_ms - worker_metadata
 | 
						||
 | 
						||
        if time_diff >= state.worker_idle_timeout do
 | 
						||
          case maybe_ping_worker(worker_server_state, state) do
 | 
						||
            {:ok, new_worker_state} ->
 | 
						||
              # We don't need to update the worker_metadata because, by definition,
 | 
						||
              # if we are checking for idle resources again and the timestamp is the same,
 | 
						||
              # it is because it has to be checked again.
 | 
						||
              new_resource_data = {new_worker_state, worker_metadata}
 | 
						||
              new_resources = :queue.in(new_resource_data, new_resources)
 | 
						||
 | 
						||
              do_check_idle_resources(
 | 
						||
                next_resources,
 | 
						||
                now_in_ms,
 | 
						||
                state,
 | 
						||
                new_resources,
 | 
						||
                remaining_pings - 1
 | 
						||
              )
 | 
						||
 | 
						||
            {:remove, user_reason} ->
 | 
						||
              new_state = remove_worker(user_reason, worker_server_state, state)
 | 
						||
 | 
						||
              do_check_idle_resources(
 | 
						||
                next_resources,
 | 
						||
                now_in_ms,
 | 
						||
                new_state,
 | 
						||
                new_resources,
 | 
						||
                remaining_pings - 1
 | 
						||
              )
 | 
						||
 | 
						||
            {:stop, reason} ->
 | 
						||
              {:stop, reason, state}
 | 
						||
          end
 | 
						||
        else
 | 
						||
          {:ok, :queue.join(new_resources, resources), state}
 | 
						||
        end
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  defp maybe_ping_worker(worker_server_state, state) do
 | 
						||
    %{worker: worker, state: pool_state} = state
 | 
						||
 | 
						||
    args = [worker_server_state, pool_state]
 | 
						||
 | 
						||
    case apply_worker_callback(worker, :handle_ping, args) do
 | 
						||
      {:ok, worker_state} ->
 | 
						||
        {:ok, worker_state}
 | 
						||
 | 
						||
      {:remove, user_reason} ->
 | 
						||
        {:remove, user_reason}
 | 
						||
 | 
						||
      {:stop, user_reason} ->
 | 
						||
        {:stop, user_reason}
 | 
						||
 | 
						||
      other ->
 | 
						||
        raise """
 | 
						||
        unexpected return from #{inspect(worker)}.handle_ping/2.
 | 
						||
 | 
						||
        Expected:
 | 
						||
 | 
						||
          {:remove, reason}
 | 
						||
          | {:ok, worker_state}
 | 
						||
          | {:stop, reason}
 | 
						||
 | 
						||
        Got: #{inspect(other)}
 | 
						||
        """
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  defp maybe_terminate_worker(reason, worker_server_state, state) do
 | 
						||
    %{worker: worker, state: pool_state} = state
 | 
						||
 | 
						||
    if function_exported?(worker, :terminate_worker, 3) do
 | 
						||
      args = [reason, worker_server_state, pool_state]
 | 
						||
 | 
						||
      case apply_worker_callback(worker, :terminate_worker, args) do
 | 
						||
        {:ok, pool_state} ->
 | 
						||
          %{state | state: pool_state}
 | 
						||
 | 
						||
        {:remove, _reason} ->
 | 
						||
          state
 | 
						||
 | 
						||
        other ->
 | 
						||
          raise """
 | 
						||
          unexpected return from #{inspect(worker)}.terminate_worker/3.
 | 
						||
 | 
						||
          Expected:
 | 
						||
 | 
						||
              {:ok, pool_state}
 | 
						||
 | 
						||
          Got: #{inspect(other)}
 | 
						||
          """
 | 
						||
      end
 | 
						||
    else
 | 
						||
      state
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  defp init_worker(worker, pool_state, resources, async, worker_idle_timeout) do
 | 
						||
    case apply_worker_callback(worker, :init_worker, [pool_state]) do
 | 
						||
      {:ok, worker_state, pool_state} ->
 | 
						||
        {pool_state, :queue.in({worker_state, get_metadata(worker_idle_timeout)}, resources),
 | 
						||
         async}
 | 
						||
 | 
						||
      {:async, fun, pool_state} when is_function(fun, 0) ->
 | 
						||
        %{ref: ref, pid: pid} = Task.Supervisor.async(NimblePool.TaskSupervisor, fun)
 | 
						||
        {pool_state, resources, async |> Map.put(ref, pid) |> Map.put(pid, ref)}
 | 
						||
 | 
						||
      {:remove, _reason} ->
 | 
						||
        send(self(), {__MODULE__, :init_worker})
 | 
						||
        {pool_state, resources, async}
 | 
						||
 | 
						||
      other ->
 | 
						||
        raise """
 | 
						||
        unexpected return from #{inspect(worker)}.init_worker/1.
 | 
						||
 | 
						||
        Expected:
 | 
						||
 | 
						||
            {:ok, worker_state, pool_state}
 | 
						||
            | {:async, (() -> worker_state), pool_state}
 | 
						||
 | 
						||
        Got: #{inspect(other)}
 | 
						||
        """
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  defp schedule_init() do
 | 
						||
    send(self(), {__MODULE__, :init_worker})
 | 
						||
  end
 | 
						||
 | 
						||
  defp apply_worker_callback(worker, fun, args) do
 | 
						||
    do_apply_worker_callback(worker, fun, args, &{:remove, &1})
 | 
						||
  end
 | 
						||
 | 
						||
  defp apply_worker_callback(pool_state, worker, fun, args) do
 | 
						||
    do_apply_worker_callback(worker, fun, args, &{:remove, &1, pool_state})
 | 
						||
  end
 | 
						||
 | 
						||
  defp do_apply_worker_callback(worker, fun, args, catch_fun) do
 | 
						||
    try do
 | 
						||
      apply(worker, fun, args)
 | 
						||
    catch
 | 
						||
      kind, reason ->
 | 
						||
        reason = Exception.normalize(kind, reason, __STACKTRACE__)
 | 
						||
 | 
						||
        Logger.error(
 | 
						||
          [
 | 
						||
            "Error during #{inspect(worker)}.#{fun}/#{length(args)} callback:\n"
 | 
						||
            | Exception.format(kind, reason, __STACKTRACE__)
 | 
						||
          ],
 | 
						||
          crash_reason: {crash_reason(kind, reason), __STACKTRACE__}
 | 
						||
        )
 | 
						||
 | 
						||
        catch_fun.(reason)
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  defp crash_reason(:throw, value), do: {:nocatch, value}
 | 
						||
  defp crash_reason(_, value), do: value
 | 
						||
 | 
						||
  defp remove_request(pool_state, ref, mon_ref) do
 | 
						||
    requests = Map.delete(pool_state.requests, ref)
 | 
						||
    monitors = Map.delete(pool_state.monitors, mon_ref)
 | 
						||
    Process.demonitor(mon_ref, [:flush])
 | 
						||
    %{pool_state | requests: requests, monitors: monitors}
 | 
						||
  end
 | 
						||
 | 
						||
  defp handle_enqueue(worker, command, pool_state) do
 | 
						||
    if function_exported?(worker, :handle_enqueue, 2) do
 | 
						||
      worker.handle_enqueue(command, pool_state)
 | 
						||
    else
 | 
						||
      {:ok, command, pool_state}
 | 
						||
    end
 | 
						||
  end
 | 
						||
 | 
						||
  defp get_metadata(nil), do: nil
 | 
						||
  defp get_metadata(_worker_idle_timeout), do: System.monotonic_time(:millisecond)
 | 
						||
end
 |