api-v2/deps/dns_cluster/lib/dns_cluster.ex
2025-04-16 10:03:13 -03:00

203 lines
6.0 KiB
Elixir

defmodule DNSCluster do
@moduledoc """
Simple DNS based cluster discovery.
A DNS query is made every `:interval` milliseconds to discover new ips.
Nodes will only be joined if their node basename matches the basename of the
current node. For example if `node()` is `myapp-123@fdaa:1:36c9:a7b:198:c4b1:73c6:1`,
a `Node.connect/1` attempt will be made against every IP returned by the DNS query,
but will only be successful if there is a node running on the remote host with the same
basename, for example `myapp-123@fdaa:1:36c9:a7b:198:c4b1:73c6:2`. Nodes running on
remote hosts, but with different basenames will fail to connect and will be ignored.
## Examples
To start in your supervision tree, add the child:
children = [
...,
{DNSCluster, query: "myapp.internal"}
]
See the `start_link/1` docs for all available options.
If you require more advanced clustering options and strategies, see the
[libcluster](https://hexdocs.pm/libcluster) library.
"""
use GenServer
require Logger
defmodule Resolver do
@moduledoc false
require Record
Record.defrecord(:hostent, Record.extract(:hostent, from_lib: "kernel/include/inet.hrl"))
def basename(node_name) when is_atom(node_name) do
[basename, _] = String.split(to_string(node_name), "@")
basename
end
def connect_node(node_name) when is_atom(node_name), do: Node.connect(node_name)
def list_nodes, do: Node.list(:visible)
def lookup(query, type) when is_binary(query) and type in [:a, :aaaa] do
case :inet_res.getbyname(~c"#{query}", type) do
{:ok, hostent(h_addr_list: addr_list)} -> addr_list
{:error, _} -> []
end
end
end
@doc ~S"""
Starts DNS based cluster discovery.
## Options
* `:name` - the name of the cluster. Defaults to `DNSCluster`.
* `:query` - the required DNS query for node discovery, for example: `"myapp.internal"`.
The value `:ignore` can be used to ignore starting the DNSCluster.
* `:interval` - the millisec interval between DNS queries. Defaults to `5000`.
* `:connect_timeout` - the millisec timeout to allow discovered nodes to connect.
Defaults to `10_000`.
## Examples
iex> DNSCluster.start_link(query: "myapp.internal")
{:ok, pid}
iex> DNSCluster.start_link(query: :ignore)
:ignore
"""
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: Keyword.get(opts, :name, __MODULE__))
end
@impl true
def init(opts) do
case Keyword.fetch(opts, :query) do
{:ok, :ignore} ->
:ignore
{:ok, query} when is_binary(query) ->
warn_on_invalid_dist()
resolver = Keyword.get(opts, :resolver, Resolver)
state = %{
interval: Keyword.get(opts, :interval, 5_000),
basename: resolver.basename(node()),
query: query,
log: Keyword.get(opts, :log, false),
poll_timer: nil,
connect_timeout: Keyword.get(opts, :connect_timeout, 10_000),
resolver: resolver
}
{:ok, state, {:continue, :discover_ips}}
{:ok, other} ->
raise ArgumentError, "expected :query to be a string, got: #{inspect(other)}"
:error ->
raise ArgumentError, "missing required :query option in #{inspect(opts)}"
end
end
@impl true
def handle_continue(:discover_ips, state) do
{:noreply, do_discovery(state)}
end
@impl true
def handle_info(:discover_ips, state) do
{:noreply, do_discovery(state)}
end
defp do_discovery(state) do
state
|> connect_new_nodes()
|> schedule_next_poll()
end
defp connect_new_nodes(%{resolver: resolver, connect_timeout: timeout} = state) do
node_names = for name <- resolver.list_nodes(), into: MapSet.new(), do: to_string(name)
ips = discover_ips(state)
_results =
ips
|> Enum.map(fn ip -> "#{state.basename}@#{ip}" end)
|> Enum.filter(fn node_name -> !Enum.member?(node_names, node_name) end)
|> Task.async_stream(
fn new_name ->
if resolver.connect_node(:"#{new_name}") do
log(state, "#{node()} connected to #{new_name}")
end
end,
max_concurrency: max(1, length(ips)),
timeout: timeout
)
|> Enum.to_list()
state
end
defp log(state, msg) do
if level = state.log, do: Logger.log(level, msg)
end
defp schedule_next_poll(state) do
%{state | poll_timer: Process.send_after(self(), :discover_ips, state.interval)}
end
defp discover_ips(%{resolver: resolver, query: query}) do
[:a, :aaaa]
|> Enum.flat_map(&resolver.lookup(query, &1))
|> Enum.uniq()
|> Enum.map(&to_string(:inet.ntoa(&1)))
end
defp warn_on_invalid_dist do
release? = is_binary(System.get_env("RELEASE_NAME"))
net_state = if function_exported?(:net_kernel, :get_state, 0), do: :net_kernel.get_state()
cond do
!net_state ->
:ok
net_state.started == :no and release? ->
Logger.warning("""
node not running in distributed mode. Ensure the following exports are set in your rel/env.sh.eex file:
#!/bin/sh
export RELEASE_DISTRIBUTION=name
export RELEASE_NODE="myapp@fully-qualified-host-or-ip"
""")
net_state.started == :no or
(!release? and net_state.started != :no and net_state[:name_domain] != :longnames) ->
Logger.warning("""
node not running in distributed mode. When running outside of a release, you must start net_kernel manually with
longnames.
https://www.erlang.org/doc/man/net_kernel.html#start-2
""")
net_state[:name_domain] != :longnames and release? ->
Logger.warning("""
node not running with longnames which are required for DNS discovery.
Ensure the following exports are set in your rel/env.sh.eex file:
#!/bin/sh
export RELEASE_DISTRIBUTION=name
export RELEASE_NODE="myapp@fully-qualified-host-or-ip"
""")
true ->
:ok
end
end
end