defmodule Finch do @external_resource "README.md" @moduledoc "README.md" |> File.read!() |> String.split("") |> Enum.fetch!(1) alias Finch.{PoolManager, Request, Response} require Finch.Pool use Supervisor @default_pool_size 50 @default_pool_count 1 @default_connect_timeout 5_000 @pool_config_schema [ protocol: [ type: {:in, [:http2, :http1]}, deprecated: "Use :protocols instead." ], protocols: [ type: {:list, {:in, [:http1, :http2]}}, doc: """ The type of connections to support. If using `:http1` only, an HTTP1 pool without multiplexing is used. \ If using `:http2` only, an HTTP2 pool with multiplexing is used. \ If both are listed, then both HTTP1/HTTP2 connections are \ supported (via ALPN), but there is no multiplexing. """, default: [:http1] ], size: [ type: :pos_integer, doc: """ Number of connections to maintain in each pool. Used only by HTTP1 pools \ since HTTP2 is able to multiplex requests through a single connection. In \ other words, for HTTP2, the size is always 1 and the `:count` should be \ configured in order to increase capacity. """, default: @default_pool_size ], count: [ type: :pos_integer, doc: """ Number of pools to start. HTTP1 pools are able to re-use connections in the \ same pool and establish new ones only when necessary. However, if there is a \ high pool count and few requests are made, these requests will be scattered \ across pools, reducing connection reuse. It is recommended to increase the pool \ count for HTTP1 only if you are experiencing high checkout times. """, default: @default_pool_count ], max_idle_time: [ type: :timeout, doc: """ The maximum number of milliseconds an HTTP1 connection is allowed to be idle \ before being closed during a checkout attempt. """, deprecated: "Use :conn_max_idle_time instead." ], conn_opts: [ type: :keyword_list, doc: """ These options are passed to `Mint.HTTP.connect/4` whenever a new connection is established. \ `:mode` is not configurable as Finch must control this setting. Typically these options are \ used to configure proxying, https settings, or connect timeouts. """, default: [] ], pool_max_idle_time: [ type: :timeout, doc: """ The maximum number of milliseconds that a pool can be idle before being terminated, used only by HTTP1 pools. \ This options is forwarded to NimblePool and it starts and idle verification cycle that may impact \ performance if misused. For instance setting a very low timeout may lead to pool restarts. \ For more information see NimblePool's `handle_ping/2` documentation. """, default: :infinity ], conn_max_idle_time: [ type: :timeout, doc: """ The maximum number of milliseconds an HTTP1 connection is allowed to be idle \ before being closed during a checkout attempt. """, default: :infinity ], start_pool_metrics?: [ type: :boolean, doc: "When true, pool metrics will be collected and available through Finch.pool_status/2", default: false ] ] @typedoc """ The `:name` provided to Finch in `start_link/1`. """ @type name() :: atom() @type scheme() :: :http | :https @type scheme_host_port() :: {scheme(), host :: String.t(), port :: :inet.port_number()} @type request_opt() :: {:pool_timeout, timeout()} | {:receive_timeout, timeout()} | {:request_timeout, timeout()} @typedoc """ Options used by request functions. """ @type request_opts() :: [request_opt()] @typedoc """ The reference used to identify a request sent using `async_request/3`. """ @opaque request_ref() :: Finch.Pool.request_ref() @typedoc """ The stream function given to `stream/5`. """ @type stream(acc) :: ({:status, integer} | {:headers, Mint.Types.headers()} | {:data, binary} | {:trailers, Mint.Types.headers()}, acc -> acc) @typedoc """ The stream function given to `stream_while/5`. """ @type stream_while(acc) :: ({:status, integer} | {:headers, Mint.Types.headers()} | {:data, binary} | {:trailers, Mint.Types.headers()}, acc -> {:cont, acc} | {:halt, acc}) @doc """ Start an instance of Finch. ## Options * `:name` - The name of your Finch instance. This field is required. * `:pools` - A map specifying the configuration for your pools. The keys should be URLs provided as binaries, a tuple `{scheme, {:local, unix_socket}}` where `unix_socket` is the path for the socket, or the atom `:default` to provide a catch-all configuration to be used for any unspecified URLs. See "Pool Configuration Options" below for details on the possible map values. Default value is `%{default: [size: #{@default_pool_size}, count: #{@default_pool_count}]}`. ### Pool Configuration Options #{NimbleOptions.docs(@pool_config_schema)} """ def start_link(opts) do name = finch_name!(opts) pools = Keyword.get(opts, :pools, []) |> pool_options!() {default_pool_config, pools} = Map.pop(pools, :default) config = %{ registry_name: name, manager_name: manager_name(name), supervisor_name: pool_supervisor_name(name), default_pool_config: default_pool_config, pools: pools } Supervisor.start_link(__MODULE__, config, name: supervisor_name(name)) end def child_spec(opts) do %{ id: finch_name!(opts), start: {__MODULE__, :start_link, [opts]} } end @impl true def init(config) do children = [ {Registry, [keys: :duplicate, name: config.registry_name, meta: [config: config]]}, {DynamicSupervisor, name: config.supervisor_name, strategy: :one_for_one}, {PoolManager, config} ] Supervisor.init(children, strategy: :one_for_all) end defp finch_name!(opts) do Keyword.get(opts, :name) || raise(ArgumentError, "must supply a name") end defp pool_options!(pools) do {:ok, default} = NimbleOptions.validate([], @pool_config_schema) Enum.reduce(pools, %{default: valid_opts_to_map(default)}, fn {destination, opts}, acc -> with {:ok, valid_destination} <- cast_destination(destination), {:ok, valid_pool_opts} <- cast_pool_opts(opts) do Map.put(acc, valid_destination, valid_pool_opts) else {:error, reason} -> raise reason end end) end defp cast_destination(destination) do case destination do :default -> {:ok, destination} {scheme, {:local, path}} when is_atom(scheme) and is_binary(path) -> {:ok, {scheme, {:local, path}, 0}} url when is_binary(url) -> cast_binary_destination(url) _ -> {:error, %ArgumentError{message: "invalid destination: #{inspect(destination)}"}} end end defp cast_binary_destination(url) when is_binary(url) do {scheme, host, port, _path, _query} = Finch.Request.parse_url(url) {:ok, {scheme, host, port}} end defp cast_pool_opts(opts) do with {:ok, valid} <- NimbleOptions.validate(opts, @pool_config_schema) do {:ok, valid_opts_to_map(valid)} end end defp valid_opts_to_map(valid) do # We need to enable keepalive and set the nodelay flag to true by default. transport_opts = valid |> get_in([:conn_opts, :transport_opts]) |> List.wrap() |> Keyword.put_new(:timeout, @default_connect_timeout) |> Keyword.put_new(:nodelay, true) |> Keyword.put(:keepalive, true) conn_opts = valid[:conn_opts] |> List.wrap() ssl_key_log_file = Keyword.get(conn_opts, :ssl_key_log_file) || System.get_env("SSLKEYLOGFILE") ssl_key_log_file_device = ssl_key_log_file && File.open!(ssl_key_log_file, [:append]) conn_opts = conn_opts |> Keyword.put(:ssl_key_log_file_device, ssl_key_log_file_device) |> Keyword.put(:transport_opts, transport_opts) |> Keyword.put(:protocols, valid[:protocols]) # TODO: Remove :protocol on v0.18 mod = case valid[:protocol] do :http1 -> Finch.HTTP1.Pool :http2 -> Finch.HTTP2.Pool nil -> if :http1 in valid[:protocols] do Finch.HTTP1.Pool else Finch.HTTP2.Pool end end %{ mod: mod, size: valid[:size], count: valid[:count], conn_opts: conn_opts, conn_max_idle_time: to_native(valid[:max_idle_time] || valid[:conn_max_idle_time]), pool_max_idle_time: valid[:pool_max_idle_time], start_pool_metrics?: valid[:start_pool_metrics?] } end defp to_native(:infinity), do: :infinity defp to_native(time), do: System.convert_time_unit(time, :millisecond, :native) defp supervisor_name(name), do: :"#{name}.Supervisor" defp manager_name(name), do: :"#{name}.PoolManager" defp pool_supervisor_name(name), do: :"#{name}.PoolSupervisor" defmacrop request_span(request, name, do: block) do quote do start_meta = %{request: unquote(request), name: unquote(name)} Finch.Telemetry.span(:request, start_meta, fn -> result = unquote(block) end_meta = Map.put(start_meta, :result, result) {result, end_meta} end) end end @doc """ Builds an HTTP request to be sent with `request/3` or `stream/4`. It is possible to send the request body in a streaming fashion. In order to do so, the `body` parameter needs to take form of a tuple `{:stream, body_stream}`, where `body_stream` is a `Stream`. """ @spec build(Request.method(), Request.url(), Request.headers(), Request.body(), Keyword.t()) :: Request.t() defdelegate build(method, url, headers \\ [], body \\ nil, opts \\ []), to: Request @doc """ Streams an HTTP request and returns the accumulator. A function of arity 2 is expected as argument. The first argument is a tuple, as listed below, and the second argument is the accumulator. The function must return a potentially updated accumulator. See also `stream_while/5`. > ### HTTP2 streaming and back-pressure {: .warning} > > At the moment, streaming over HTTP2 connections do not provide > any back-pressure mechanism: this means the response will be > sent to the client as quickly as possible. Therefore, you must > not use streaming over HTTP2 for non-terminating responses or > when streaming large responses which you do not intend to keep > in memory. ## Stream commands * `{:status, status}` - the http response status * `{:headers, headers}` - the http response headers * `{:data, data}` - a streaming section of the http response body * `{:trailers, trailers}` - the http response trailers ## Options Shares options with `request/3`. ## Examples path = "/tmp/archive.zip" file = File.open!(path, [:write, :exclusive]) url = "https://example.com/archive.zip" request = Finch.build(:get, url) Finch.stream(request, MyFinch, nil, fn {:status, status}, _acc -> IO.inspect(status) {:headers, headers}, _acc -> IO.inspect(headers) {:data, data}, _acc -> IO.binwrite(file, data) end) File.close(file) """ @spec stream(Request.t(), name(), acc, stream(acc), request_opts()) :: {:ok, acc} | {:error, Exception.t()} when acc: term() def stream(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do fun = fn entry, acc -> {:cont, fun.(entry, acc)} end stream_while(req, name, acc, fun, opts) end @doc """ Streams an HTTP request until it finishes or `fun` returns `{:halt, acc}`. A function of arity 2 is expected as argument. The first argument is a tuple, as listed below, and the second argument is the accumulator. The function must return: * `{:cont, acc}` to continue streaming * `{:halt, acc}` to halt streaming See also `stream/5`. > ### HTTP2 streaming and back-pressure {: .warning} > > At the moment, streaming over HTTP2 connections do not provide > any back-pressure mechanism: this means the response will be > sent to the client as quickly as possible. Therefore, you must > not use streaming over HTTP2 for non-terminating responses or > when streaming large responses which you do not intend to keep > in memory. ## Stream commands * `{:status, status}` - the http response status * `{:headers, headers}` - the http response headers * `{:data, data}` - a streaming section of the http response body * `{:trailers, trailers}` - the http response trailers ## Options Shares options with `request/3`. ## Examples path = "/tmp/archive.zip" file = File.open!(path, [:write, :exclusive]) url = "https://example.com/archive.zip" request = Finch.build(:get, url) Finch.stream_while(request, MyFinch, nil, fn {:status, status}, acc -> IO.inspect(status) {:cont, acc} {:headers, headers}, acc -> IO.inspect(headers) {:cont, acc} {:data, data}, acc -> IO.binwrite(file, data) {:cont, acc} end) File.close(file) """ @spec stream_while(Request.t(), name(), acc, stream_while(acc), request_opts()) :: {:ok, acc} | {:error, Exception.t()} when acc: term() def stream_while(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do request_span req, name do __stream__(req, name, acc, fun, opts) end end defp __stream__(%Request{} = req, name, acc, fun, opts) do {pool, pool_mod} = get_pool(req, name) pool_mod.request(pool, req, acc, fun, name, opts) end @doc """ Sends an HTTP request and returns a `Finch.Response` struct. ## Options * `:pool_timeout` - This timeout is applied when we check out a connection from the pool. Default value is `5_000`. * `:receive_timeout` - The maximum time to wait for each chunk to be received before returning an error. Default value is `15_000`. * `:request_timeout` - The amount of time to wait for a complete response before returning an error. This timeout only applies to HTTP/1, and its current implementation is a best effort timeout, it does not guarantee the call will return precisely when the time has elapsed. Default value is `:infinity`. """ @spec request(Request.t(), name(), request_opts()) :: {:ok, Response.t()} | {:error, Exception.t()} def request(req, name, opts \\ []) def request(%Request{} = req, name, opts) do request_span req, name do acc = {nil, [], [], []} fun = fn {:status, value}, {_, headers, body, trailers} -> {:cont, {value, headers, body, trailers}} {:headers, value}, {status, headers, body, trailers} -> {:cont, {status, headers ++ value, body, trailers}} {:data, value}, {status, headers, body, trailers} -> {:cont, {status, headers, [body | value], trailers}} {:trailers, value}, {status, headers, body, trailers} -> {:cont, {status, headers, body, trailers ++ value}} end with {:ok, {status, headers, body, trailers}} <- __stream__(req, name, acc, fun, opts) do {:ok, %Response{ status: status, headers: headers, body: IO.iodata_to_binary(body), trailers: trailers }} end end end # Catch-all for backwards compatibility below def request(name, method, url) do request(name, method, url, []) end @doc false def request(name, method, url, headers, body \\ nil, opts \\ []) do IO.warn("Finch.request/6 is deprecated, use Finch.build/5 + Finch.request/3 instead") build(method, url, headers, body) |> request(name, opts) end @doc """ Sends an HTTP request and returns a `Finch.Response` struct or raises an exception in case of failure. See `request/3` for more detailed information. """ @spec request!(Request.t(), name(), request_opts()) :: Response.t() def request!(%Request{} = req, name, opts \\ []) do case request(req, name, opts) do {:ok, resp} -> resp {:error, exception} -> raise exception end end @doc """ Sends an HTTP request asynchronously, returning a request reference. If the request is sent using HTTP1, an extra process is spawned to consume messages from the underlying socket. The messages are sent to the current process as soon as they arrive, as a firehose. If you wish to maximize request rate or have more control over how messages are streamed, a strategy using `request/3` or `stream/5` should be used instead. ## Receiving the response Response information is sent to the calling process as it is received in `{ref, response}` tuples. If the calling process exits before the request has completed, the request will be canceled. Responses include: * `{:status, status}` - HTTP response status * `{:headers, headers}` - HTTP response headers * `{:data, data}` - section of the HTTP response body * `{:error, exception}` - an error occurred during the request * `:done` - request has completed successfully On a successful request, a single `:status` message will be followed by a single `:headers` message, after which more than one `:data` messages may be sent. If trailing headers are present, a final `:headers` message may be sent. Any `:done` or `:error` message indicates that the request has succeeded or failed and no further messages are expected. ## Example iex> req = Finch.build(:get, "https://httpbin.org/stream/5") iex> ref = Finch.async_request(req, MyFinch) iex> flush() {ref, {:status, 200}} {ref, {:headers, [...]}} {ref, {:data, "..."}} {ref, :done} ## Options Shares options with `request/3`. """ @spec async_request(Request.t(), name(), request_opts()) :: request_ref() def async_request(%Request{} = req, name, opts \\ []) do {pool, pool_mod} = get_pool(req, name) pool_mod.async_request(pool, req, name, opts) end @doc """ Cancels a request sent with `async_request/3`. """ @spec cancel_async_request(request_ref()) :: :ok def cancel_async_request(request_ref) when Finch.Pool.is_request_ref(request_ref) do {pool_mod, _cancel_ref} = request_ref pool_mod.cancel_async_request(request_ref) end defp get_pool(%Request{scheme: scheme, unix_socket: unix_socket}, name) when is_binary(unix_socket) do PoolManager.get_pool(name, {scheme, {:local, unix_socket}, 0}) end defp get_pool(%Request{scheme: scheme, host: host, port: port}, name) do PoolManager.get_pool(name, {scheme, host, port}) end @doc """ Get pool metrics list. The number of items present on the metrics list depends on the `:count` option each metric will have a `pool_index` going from 1 to `:count`. The metrics struct depends on the pool scheme defined on the `:protocols` option `Finch.HTTP1.PoolMetrics` for `:http1` and `Finch.HTTP2.PoolMetrics` for `:http2`. See the `Finch.HTTP1.PoolMetrics` and `Finch.HTTP2.PoolMetrics` for more details. `{:error, :not_found}` may return on 2 scenarios: - There is no pool registered for the given pair finch instance and url - The pool is configured with `start_pool_metrics?` option false (default) ## Example iex> Finch.get_pool_status(MyFinch, "https://httpbin.org") {:ok, [ %Finch.HTTP1.PoolMetrics{ pool_index: 1, pool_size: 50, available_connections: 43, in_use_connections: 7 }, %Finch.HTTP1.PoolMetrics{ pool_index: 2, pool_size: 50, available_connections: 37, in_use_connections: 13 }] } """ @spec get_pool_status(name(), url :: String.t() | scheme_host_port()) :: {:ok, list(Finch.HTTP1.PoolMetrics.t())} | {:ok, list(Finch.HTTP2.PoolMetrics.t())} | {:error, :not_found} def get_pool_status(finch_name, url) when is_binary(url) do {s, h, p, _, _} = Request.parse_url(url) get_pool_status(finch_name, {s, h, p}) end def get_pool_status(finch_name, shp) when is_tuple(shp) do case PoolManager.get_pool(finch_name, shp, auto_start?: false) do {_pool, pool_mod} -> pool_mod.get_pool_status(finch_name, shp) :not_found -> {:error, :not_found} end end end