624 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
			
		
		
	
	
			624 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
defmodule Phoenix.Tracker.State do
 | 
						|
  @moduledoc false
 | 
						|
  alias Phoenix.Tracker.{State, Clock}
 | 
						|
 | 
						|
  @type name       :: term
 | 
						|
  @type topic      :: String.t
 | 
						|
  @type key        :: term
 | 
						|
  @type meta       :: map
 | 
						|
  @type ets_id     :: :ets.tid
 | 
						|
  @type clock      :: pos_integer
 | 
						|
  @type tag        :: {name, clock}
 | 
						|
  @type cloud      :: MapSet.t
 | 
						|
  @type clouds     :: %{name => cloud}
 | 
						|
  @type context    :: %{name => clock}
 | 
						|
  @type values     :: ets_id | :extracted | %{tag => {pid, topic, key, meta}}
 | 
						|
  @type value      :: {{topic, pid, key}, meta, tag}
 | 
						|
  @type key_meta   :: {key, meta}
 | 
						|
  @type delta      :: %State{mode: :delta}
 | 
						|
  @type pid_lookup :: {pid, topic, key}
 | 
						|
 | 
						|
  @type t :: %State{
 | 
						|
    replica:  name,
 | 
						|
    context:  context,
 | 
						|
    clouds:   clouds,
 | 
						|
    values:   values,
 | 
						|
    pids:     ets_id,
 | 
						|
    mode:     :unset | :delta | :normal,
 | 
						|
    delta:    :unset | delta,
 | 
						|
    replicas: %{name => :up | :down},
 | 
						|
    range:    {context, context}
 | 
						|
  }
 | 
						|
 | 
						|
  defstruct replica: nil,
 | 
						|
            context: %{},
 | 
						|
            clouds: %{},
 | 
						|
            values: nil,
 | 
						|
            pids: nil,
 | 
						|
            mode: :unset,
 | 
						|
            delta: :unset,
 | 
						|
            replicas: %{},
 | 
						|
            range: {%{}, %{}}
 | 
						|
 | 
						|
  @compile {:inline, tag: 1, clock: 1, put_tag: 2, delete_tag: 2, remove_delta_tag: 2}
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Creates a new set for the replica.
 | 
						|
 | 
						|
  ## Examples
 | 
						|
 | 
						|
      iex> Phoenix.Tracker.State.new(:replica1)
 | 
						|
      %Phoenix.Tracker.State{...}
 | 
						|
 | 
						|
  """
 | 
						|
  @spec new(name, atom) :: t
 | 
						|
  def new(replica, shard_name) do
 | 
						|
    reset_delta(%State{
 | 
						|
      replica: replica,
 | 
						|
      context: %{replica => 0},
 | 
						|
      mode: :normal,
 | 
						|
      values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]),
 | 
						|
      pids: :ets.new(:pids, [:duplicate_bag]),
 | 
						|
      replicas: %{replica => :up}})
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Returns the causal context for the set.
 | 
						|
  """
 | 
						|
  @spec clocks(t) :: {name, context}
 | 
						|
  def clocks(%State{replica: rep, context: ctx} = state) do
 | 
						|
    # Exclude down replicas from clocks as they are also not included in
 | 
						|
    # deltas. Otherwise if this node knows of a down node X in permdown grace
 | 
						|
    # period, another node Y which came up after X went down will keep
 | 
						|
    # requesting full state from this node as the clock of Y will be dominated
 | 
						|
    # by the clock of this node.
 | 
						|
    {rep, Map.drop(ctx, down_replicas(state))}
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Adds a new element to the set.
 | 
						|
  """
 | 
						|
  @spec join(t, pid, topic, key, meta) :: t
 | 
						|
  def join(%State{} = state, pid, topic, key, meta \\ %{}) do
 | 
						|
    add(state, pid, topic, key, meta)
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Updates an element via leave and join.
 | 
						|
 | 
						|
  Atomically updates ETS local entry.
 | 
						|
  """
 | 
						|
  @spec leave_join(t, pid, topic, key, meta) :: t
 | 
						|
  def leave_join(state, pid, topic, key, meta) do
 | 
						|
    # Produce remove-like delta
 | 
						|
    [{{^topic, ^pid, ^key}, _meta, tag}] = :ets.lookup(state.values, {topic, pid, key})
 | 
						|
    pruned_clouds = delete_tag(state.clouds, tag)
 | 
						|
    new_delta = remove_delta_tag(state.delta, tag)
 | 
						|
    state = bump_clock(%State{state | clouds: pruned_clouds, delta: new_delta})
 | 
						|
 | 
						|
    # Update ETS entry and produce add-like delta
 | 
						|
    state = bump_clock(state)
 | 
						|
    tag = tag(state)
 | 
						|
    true = :ets.insert(state.values, {{topic, pid, key}, meta, tag})
 | 
						|
    new_delta = %State{state.delta | values: Map.put(state.delta.values, tag, {pid, topic, key, meta})}
 | 
						|
    %State{state | delta: new_delta}
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Removes an element from the set.
 | 
						|
  """
 | 
						|
  @spec leave(t, pid, topic, key) :: t
 | 
						|
  def leave(%State{pids: pids} = state, pid, topic, key) do
 | 
						|
    pids
 | 
						|
    |> :ets.match_object({pid, topic, key})
 | 
						|
    |> case do
 | 
						|
      [{^pid, ^topic, ^key}] -> remove(state, pid, topic, key)
 | 
						|
      [] -> state
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Removes all elements from the set for the given pid.
 | 
						|
  """
 | 
						|
  @spec leave(t, pid) :: t
 | 
						|
  def leave(%State{pids: pids} = state, pid) do
 | 
						|
    pids
 | 
						|
    |> :ets.lookup(pid)
 | 
						|
    |> Enum.reduce(state, fn {^pid, topic, key}, acc ->
 | 
						|
      remove(acc, pid, topic, key)
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Returns a list of elements in the set belonging to an online replica.
 | 
						|
  """
 | 
						|
  @spec online_list(t) :: [value]
 | 
						|
  def online_list(%State{values: values} = state) do
 | 
						|
    replicas = down_replicas(state)
 | 
						|
    :ets.select(values, [{ {:_, :_, {:"$1", :_}},
 | 
						|
      not_in(:"$1", replicas), [:"$_"]}])
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Returns a list of elements for the topic who belong to an online replica.
 | 
						|
  """
 | 
						|
  @spec get_by_topic(t, topic) :: [key_meta]
 | 
						|
  def get_by_topic(%State{values: values} = state, topic) do
 | 
						|
    tracked_values(values, topic, down_replicas(state))
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Returns a list of elements for the topic who belong to an online replica.
 | 
						|
  """
 | 
						|
  @spec get_by_key(t, topic, key) :: [key_meta]
 | 
						|
  def get_by_key(%State{values: values} = state, topic, key) do
 | 
						|
    case tracked_key(values, topic, key, down_replicas(state)) do
 | 
						|
      [] -> []
 | 
						|
      [_|_] = metas -> metas
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Performs table lookup for tracked elements in the topic.
 | 
						|
 | 
						|
  Filters out those present on downed replicas.
 | 
						|
  """
 | 
						|
  def tracked_values(table, topic, down_replicas) do
 | 
						|
    :ets.select(table,
 | 
						|
      [{{{topic, :_, :"$1"}, :"$2", {:"$3", :_}},
 | 
						|
        not_in(:"$3", down_replicas),
 | 
						|
        [{{:"$1", :"$2"}}]}])
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Performs table lookup for tracked key in the topic.
 | 
						|
 | 
						|
  Filters out those present on downed replicas.
 | 
						|
  """
 | 
						|
  def tracked_key(table, topic, key, down_replicas) do
 | 
						|
    :ets.select(table,
 | 
						|
      [{{{topic, :"$1", key}, :"$2", {:"$3", :_}},
 | 
						|
        not_in(:"$3", down_replicas),
 | 
						|
        [{{:"$1", :"$2"}}]}])
 | 
						|
  end
 | 
						|
 | 
						|
  defp not_in(_pos, []), do: []
 | 
						|
  defp not_in(pos, replicas), do: [not: ors(pos, replicas)]
 | 
						|
  defp ors(pos, [rep]), do: {:"=:=", pos, {rep}}
 | 
						|
  defp ors(pos, [rep | rest]), do: {:or, {:"=:=", pos, {rep}}, ors(pos, rest)}
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Returns the element matching the pid, topic, and key.
 | 
						|
  """
 | 
						|
  @spec get_by_pid(t, pid, topic, key) :: value | nil
 | 
						|
  def get_by_pid(%State{values: values}, pid, topic, key) do
 | 
						|
    case :ets.lookup(values, {topic, pid, key}) do
 | 
						|
      [] -> nil
 | 
						|
      [one] -> one
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Returns all elements for the pid.
 | 
						|
  """
 | 
						|
  @spec get_by_pid(t, pid) :: [value]
 | 
						|
  def get_by_pid(%State{pids: pids, values: values}, pid) do
 | 
						|
    case :ets.lookup(pids, pid) do
 | 
						|
      [] -> []
 | 
						|
      matches ->
 | 
						|
        :ets.select(values, Enum.map(matches, fn {^pid, topic, key} ->
 | 
						|
          {{{topic, pid, key}, :_, :_}, [], [:"$_"]}
 | 
						|
        end))
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Checks if set has a non-empty delta.
 | 
						|
  """
 | 
						|
  @spec has_delta?(t) :: boolean
 | 
						|
  def has_delta?(%State{delta: %State{clouds: clouds}}) do
 | 
						|
    Enum.find(clouds, fn {_name, cloud} -> MapSet.size(cloud) != 0 end)
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Resets the set's delta.
 | 
						|
  """
 | 
						|
  @spec reset_delta(t) :: t
 | 
						|
  def reset_delta(%State{context: ctx, replica: replica} = state) do
 | 
						|
    delta_ctx = Map.take(ctx, [replica])
 | 
						|
    delta = %State{replica: replica,
 | 
						|
                   values: %{},
 | 
						|
                   range: {delta_ctx, delta_ctx},
 | 
						|
                   mode: :delta}
 | 
						|
    %State{state | delta: delta}
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Extracts the set's elements from ets into a mergeable list.
 | 
						|
 | 
						|
  Used when merging two sets.
 | 
						|
  """
 | 
						|
 | 
						|
  @spec extract(t, remote_ref :: name, context) :: t | {t, values}
 | 
						|
  def extract(%State{mode: :delta, values: values, clouds: clouds} = state, remote_ref, remote_context) do
 | 
						|
    {start_ctx, end_ctx} = state.range
 | 
						|
    known_keys = Map.keys(remote_context)
 | 
						|
    pruned_clouds = Map.take(clouds, known_keys)
 | 
						|
    pruned_start = Map.take(start_ctx, known_keys)
 | 
						|
    pruned_end = Map.take(end_ctx, known_keys)
 | 
						|
    map = Enum.reduce(values, [], fn
 | 
						|
      {{^remote_ref, _clock}, _data}, acc -> acc
 | 
						|
      {{replica, _clock} = tag, data}, acc ->
 | 
						|
        if Map.has_key?(remote_context, replica) do
 | 
						|
          [{tag, data} | acc]
 | 
						|
        else
 | 
						|
          acc
 | 
						|
        end
 | 
						|
    end) |> :maps.from_list()
 | 
						|
 | 
						|
    %State{state | values: map, clouds: pruned_clouds, range: {pruned_start, pruned_end}}
 | 
						|
  end
 | 
						|
  def extract(%State{mode: :normal, values: values, clouds: clouds} = state, remote_ref, remote_context) do
 | 
						|
    known_keys = Map.keys(remote_context)
 | 
						|
    pruned_clouds = Map.take(clouds, known_keys)
 | 
						|
    pruned_context = Map.take(state.context, known_keys)
 | 
						|
    # fn {{topic, pid, key}, meta, {replica, clock}} when replica !== remote_ref ->
 | 
						|
    #  {{replica, clock}, {pid, topic, key, meta}}
 | 
						|
    # end
 | 
						|
    ms = [{
 | 
						|
      {{:"$1", :"$2", :"$3"}, :"$4", {:"$5", :"$6"}},
 | 
						|
      [{:"=/=", :"$5", {:const, remote_ref}}],
 | 
						|
      [{{{{:"$5", :"$6"}}, {{:"$2", :"$1", :"$3", :"$4"}}}}]
 | 
						|
    }]
 | 
						|
    data =
 | 
						|
      foldl(values, [], ms, fn {{replica, _} = tag, data}, acc ->
 | 
						|
        if match?(%{^replica => _}, remote_context) do
 | 
						|
          [{tag, data} | acc]
 | 
						|
        else
 | 
						|
          acc
 | 
						|
        end
 | 
						|
      end)
 | 
						|
 | 
						|
    {%State{state |
 | 
						|
        clouds: pruned_clouds,
 | 
						|
        context: pruned_context,
 | 
						|
        pids: nil,
 | 
						|
        values: nil,
 | 
						|
        delta: :unset}, Map.new(data)}
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Merges two sets, or a delta into a set.
 | 
						|
 | 
						|
  Returns a 3-tuple of the updated set, and the joined and left elements.
 | 
						|
 | 
						|
  ## Examples
 | 
						|
 | 
						|
      iex> {s1, joined, left} =
 | 
						|
           Phoenix.Tracker.State.merge(s1, Phoenix.Tracker.State.extract(s2))
 | 
						|
 | 
						|
      {%Phoenix.Tracker.State{}, [...], [...]}
 | 
						|
  """
 | 
						|
  @spec merge(local :: t, {remote :: t, values} | delta) :: {new_local :: t, joins :: [value], leaves :: [value]}
 | 
						|
  def merge(%State{} = local, %State{mode: :delta, values: remote_map} = remote) do
 | 
						|
    merge(local, remote, remote_map)
 | 
						|
  end
 | 
						|
  def merge(%State{} = local, {%State{} = remote, remote_map}) do
 | 
						|
    merge(local, remote, remote_map)
 | 
						|
  end
 | 
						|
 | 
						|
  defp merge(local, remote, remote_map) do
 | 
						|
    {added_pids, joins} = accumulate_joins(local, remote_map)
 | 
						|
    {clouds, delta, leaves, removed_pids} = observe_removes(local, remote, remote_map)
 | 
						|
 | 
						|
    # We diff ETS deletes and inserts, this way if there is an update
 | 
						|
    # operation (leave + join) we handle it atomically via insert into
 | 
						|
    # the :ordered_set table
 | 
						|
 | 
						|
    added_value_keys = for {value_key, _meta, _tag} <- joins, do: value_key
 | 
						|
    removed_value_keys = for {value_key, _meta, _tag} <- leaves, do: value_key
 | 
						|
    value_keys_to_remove = removed_value_keys -- added_value_keys
 | 
						|
 | 
						|
    pids_to_remove = removed_pids -- added_pids
 | 
						|
    pids_to_add = added_pids -- removed_pids
 | 
						|
 | 
						|
    for value_key <- value_keys_to_remove do
 | 
						|
      :ets.delete(local.values, value_key)
 | 
						|
    end
 | 
						|
 | 
						|
    for pid <- pids_to_remove do
 | 
						|
      :ets.match_delete(local.pids, pid)
 | 
						|
    end
 | 
						|
 | 
						|
    true = :ets.insert(local.values, joins)
 | 
						|
    true = :ets.insert(local.pids, pids_to_add)
 | 
						|
 | 
						|
    known_remote_context = Map.take(remote.context, Map.keys(local.context))
 | 
						|
    ctx = Clock.upperbound(local.context, known_remote_context)
 | 
						|
    new_state =
 | 
						|
      %State{local | clouds: clouds, delta: delta}
 | 
						|
      |> put_context(ctx)
 | 
						|
      |> compact()
 | 
						|
 | 
						|
    {new_state, joins, leaves}
 | 
						|
  end
 | 
						|
 | 
						|
  @spec accumulate_joins(t, values) :: joins :: {[pid_lookup], [values]}
 | 
						|
  defp accumulate_joins(local, remote_map) do
 | 
						|
    %State{context: context, clouds: clouds} = local
 | 
						|
    Enum.reduce(remote_map, {[], []}, fn {{replica, _} = tag, {pid, topic, key, meta}}, {pids, adds} ->
 | 
						|
      if not match?(%{^replica => _}, context) or in?(context, clouds, tag) do
 | 
						|
        {pids, adds}
 | 
						|
      else
 | 
						|
        {[{pid, topic, key} | pids], [{{topic, pid, key}, meta, tag} | adds]}
 | 
						|
      end
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  @spec observe_removes(t, t, map) :: {clouds, delta, leaves :: [value], removed_pids :: [pid_lookup]}
 | 
						|
  defp observe_removes(%State{values: values, delta: delta} = local, remote, remote_map) do
 | 
						|
    unioned_clouds = union_clouds(local, remote)
 | 
						|
    %State{context: remote_context, clouds: remote_clouds} = remote
 | 
						|
    init = {unioned_clouds, delta, [], []}
 | 
						|
    local_replica = local.replica
 | 
						|
    # fn {_, _, {replica, _}} = result when replica != local_replica -> result end
 | 
						|
    ms = [{
 | 
						|
      {:_, :_, {:"$1", :_}},
 | 
						|
      [{:"/=", :"$1", {:const, local_replica}}],
 | 
						|
      [:"$_"]
 | 
						|
    }]
 | 
						|
 | 
						|
    foldl(values, init, ms, fn {{topic, pid, key}, _, tag} = el, {clouds, delta, leaves, removed_pids} ->
 | 
						|
      if not match?(%{^tag => _}, remote_map) and in?(remote_context, remote_clouds, tag) do
 | 
						|
        {delete_tag(clouds, tag), remove_delta_tag(delta, tag), [el | leaves], [{pid, topic, key} | removed_pids]}
 | 
						|
      else
 | 
						|
        {clouds, delta, leaves, removed_pids}
 | 
						|
      end
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  defp put_tag(clouds, {name, _clock} = tag) do
 | 
						|
    case clouds do
 | 
						|
      %{^name => cloud} -> %{clouds | name => MapSet.put(cloud, tag)}
 | 
						|
      _ -> Map.put(clouds, name, MapSet.new([tag]))
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  defp delete_tag(clouds, {name, _clock} = tag) do
 | 
						|
    case clouds do
 | 
						|
      %{^name => cloud} -> %{clouds | name => MapSet.delete(cloud, tag)}
 | 
						|
      _ -> clouds
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  defp union_clouds(%State{mode: :delta} = local, %State{} = remote) do
 | 
						|
    Enum.reduce(remote.clouds, local.clouds, fn {name, remote_cloud}, acc ->
 | 
						|
      Map.update(acc, name, remote_cloud, &MapSet.union(&1, remote_cloud))
 | 
						|
    end)
 | 
						|
  end
 | 
						|
  defp union_clouds(%State{mode: :normal, context: local_ctx} = local, %State{} = remote) do
 | 
						|
    Enum.reduce(remote.clouds, local.clouds, fn {name, remote_cloud}, acc ->
 | 
						|
      if Map.has_key?(local_ctx, name) do
 | 
						|
        Map.update(acc, name, remote_cloud, &MapSet.union(&1, remote_cloud))
 | 
						|
      else
 | 
						|
        acc
 | 
						|
      end
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  def merge_deltas(%State{mode: :delta} = local, %State{mode: :delta, values: remote_values} = remote) do
 | 
						|
    %{values: local_values, range: {local_start, local_end}, context: local_context, clouds: local_clouds} = local
 | 
						|
    %{range: {remote_start, remote_end}, context: remote_context, clouds: remote_clouds} = remote
 | 
						|
 | 
						|
    if (Clock.dominates_or_equal?(remote_end, local_start) and
 | 
						|
        Clock.dominates_or_equal?(local_end, remote_start)) or
 | 
						|
       (Clock.dominates_or_equal?(local_end, remote_start) and
 | 
						|
        Clock.dominates_or_equal?(remote_end, local_start)) do
 | 
						|
      new_start = Clock.lowerbound(local_start, remote_start)
 | 
						|
      new_end = Clock.upperbound(local_end, remote_end)
 | 
						|
      clouds = union_clouds(local, remote)
 | 
						|
 | 
						|
      filtered_locals = for {tag, value} <- local_values,
 | 
						|
                        match?(%{^tag => _}, remote_values) or not in?(remote_context, remote_clouds, tag),
 | 
						|
                        do: {tag, value}
 | 
						|
 | 
						|
      merged_vals = for {tag, value} <- remote_values,
 | 
						|
                    not match?(%{^tag => _}, local_values) and not in?(local_context, local_clouds, tag),
 | 
						|
                    do: {tag, value}
 | 
						|
 | 
						|
      all_vals = filtered_locals ++ merged_vals
 | 
						|
 | 
						|
      {:ok, %State{local | clouds: clouds, values: Map.new(all_vals), range: {new_start, new_end}}}
 | 
						|
    else
 | 
						|
      {:error, :not_contiguous}
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Marks a replica as up in the set and returns rejoined users.
 | 
						|
  """
 | 
						|
  @spec replica_up(t, name) :: {t, joins :: [values], leaves :: []}
 | 
						|
  def replica_up(%State{replicas: replicas, context: ctx} = state, replica) do
 | 
						|
    {%State{state |
 | 
						|
            context: Map.put_new(ctx, replica, 0),
 | 
						|
            replicas: Map.put(replicas, replica, :up)}, replica_users(state, replica), []}
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Marks a replica as down in the set and returns left users.
 | 
						|
  """
 | 
						|
  @spec replica_down(t, name) :: {t, joins:: [], leaves :: [values]}
 | 
						|
  def replica_down(%State{replicas: replicas} = state, replica) do
 | 
						|
    {%State{state | replicas: Map.put(replicas, replica, :down)}, [], replica_users(state, replica)}
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Removes all elements for replicas that are permanently gone.
 | 
						|
  """
 | 
						|
  @spec remove_down_replicas(t, name) :: t
 | 
						|
  def remove_down_replicas(%State{mode: :normal, context: ctx, values: values, pids: pids} = state, replica) do
 | 
						|
    new_ctx = Map.delete(ctx, replica)
 | 
						|
    # fn {key, _, {^replica, _}} -> key end
 | 
						|
    ms = [{{:"$1", :_, {replica, :_}}, [], [:"$1"]}]
 | 
						|
 | 
						|
 | 
						|
    foldl(values, nil, ms, fn {topic, pid, key} = values_key, _ ->
 | 
						|
      :ets.delete(values, values_key)
 | 
						|
      :ets.match_delete(pids, {pid, topic, key})
 | 
						|
      nil
 | 
						|
    end)
 | 
						|
    new_clouds = Map.delete(state.clouds, replica)
 | 
						|
    new_delta = remove_down_replicas(state.delta, replica)
 | 
						|
 | 
						|
    %State{state | context: new_ctx, clouds: new_clouds, delta: new_delta}
 | 
						|
  end
 | 
						|
  def remove_down_replicas(%State{mode: :delta, range: range} = delta, replica) do
 | 
						|
    {start_ctx, end_ctx} = range
 | 
						|
    new_start = Map.delete(start_ctx, replica)
 | 
						|
    new_end = Map.delete(end_ctx, replica)
 | 
						|
    new_clouds = Map.delete(delta.clouds, replica)
 | 
						|
    new_vals = Enum.reduce(delta.values, delta.values, fn
 | 
						|
      {{^replica, _clock} = tag, {_pid, _topic, _key, _meta}}, vals ->
 | 
						|
        Map.delete(vals, tag)
 | 
						|
      {{_replica, _clock} = _tag, {_pid, _topic, _key, _meta}}, vals ->
 | 
						|
        vals
 | 
						|
    end)
 | 
						|
 | 
						|
    %State{delta | range: {new_start, new_end}, clouds: new_clouds, values: new_vals}
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Returns the dize of the delta.
 | 
						|
  """
 | 
						|
  @spec delta_size(delta) :: pos_integer
 | 
						|
  def delta_size(%State{mode: :delta, clouds: clouds, values: values}) do
 | 
						|
    Enum.reduce(clouds, map_size(values), fn {_name, cloud}, sum ->
 | 
						|
      sum + MapSet.size(cloud)
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  @spec add(t, pid, topic, key, meta) :: t
 | 
						|
  defp add(%State{} = state, pid, topic, key, meta) do
 | 
						|
    state
 | 
						|
    |> bump_clock()
 | 
						|
    |> do_add(pid, topic, key, meta)
 | 
						|
  end
 | 
						|
  defp do_add(%State{delta: delta} = state, pid, topic, key, meta) do
 | 
						|
    tag = tag(state)
 | 
						|
    true = :ets.insert(state.values, {{topic, pid, key}, meta, tag})
 | 
						|
    true = :ets.insert(state.pids, {pid, topic, key})
 | 
						|
    new_delta = %State{delta | values: Map.put(delta.values, tag, {pid, topic, key, meta})}
 | 
						|
    %State{state | delta: new_delta}
 | 
						|
  end
 | 
						|
 | 
						|
  @spec remove(t, pid, topic, key) :: t
 | 
						|
  defp remove(%State{pids: pids, values: values} = state, pid, topic, key) do
 | 
						|
    [{{^topic, ^pid, ^key}, _meta, tag}] = :ets.lookup(values, {topic, pid, key})
 | 
						|
    1 = :ets.select_delete(values, [{{{topic, pid, key}, :_, :_}, [], [true]}])
 | 
						|
    1 = :ets.select_delete(pids, [{{pid, topic, key}, [], [true]}])
 | 
						|
    pruned_clouds = delete_tag(state.clouds, tag)
 | 
						|
    new_delta = remove_delta_tag(state.delta, tag)
 | 
						|
 | 
						|
    bump_clock(%State{state | clouds: pruned_clouds, delta: new_delta})
 | 
						|
  end
 | 
						|
 | 
						|
  @spec remove_delta_tag(delta, tag) :: delta
 | 
						|
  defp remove_delta_tag(%{mode: :delta, values: values, clouds: clouds} = delta, tag) do
 | 
						|
    %{delta | clouds: put_tag(clouds, tag), values: Map.delete(values, tag)}
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Compacts a sets causal history.
 | 
						|
 | 
						|
  Called as needed and after merges.
 | 
						|
  """
 | 
						|
  @spec compact(t) :: t
 | 
						|
  def compact(%State{context: ctx, clouds: clouds} = state) do
 | 
						|
    {new_ctx, new_clouds} =
 | 
						|
      Enum.reduce(clouds, {ctx, clouds}, fn {name, cloud}, {ctx_acc, clouds_acc} ->
 | 
						|
        {new_ctx, new_cloud} = do_compact(ctx_acc, Enum.sort(MapSet.to_list(cloud)))
 | 
						|
        {new_ctx, Map.put(clouds_acc, name, MapSet.new(new_cloud))}
 | 
						|
      end)
 | 
						|
 | 
						|
    put_context(%State{state | clouds: new_clouds}, new_ctx)
 | 
						|
  end
 | 
						|
  @spec do_compact(context, sorted_cloud_list :: list) :: {context, cloud}
 | 
						|
  defp do_compact(ctx, cloud) do
 | 
						|
    Enum.reduce(cloud, {ctx, []}, fn {replica, clock} = tag, {ctx_acc, cloud_acc} ->
 | 
						|
      case ctx_acc do
 | 
						|
        %{^replica => ctx_clock} when ctx_clock + 1 == clock ->
 | 
						|
          {%{ctx_acc | replica => clock}, cloud_acc}
 | 
						|
        %{^replica => ctx_clock} when ctx_clock >= clock ->
 | 
						|
          {ctx_acc, cloud_acc}
 | 
						|
        _ when clock == 1 ->
 | 
						|
          {Map.put(ctx_acc, replica, clock), cloud_acc}
 | 
						|
        _ ->
 | 
						|
          {ctx_acc, [tag | cloud_acc]}
 | 
						|
      end
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  @compile {:inline, in?: 3, in_ctx?: 3, in_clouds?: 3}
 | 
						|
 | 
						|
  defp in?(context, clouds, {replica, clock} = tag) do
 | 
						|
    in_ctx?(context, replica, clock) or in_clouds?(clouds, replica, tag)
 | 
						|
  end
 | 
						|
  defp in_ctx?(ctx, replica, clock) do
 | 
						|
    case ctx do
 | 
						|
      %{^replica => replica_clock} -> replica_clock >= clock
 | 
						|
      _ -> false
 | 
						|
    end
 | 
						|
  end
 | 
						|
  defp in_clouds?(clouds, replica, tag) do
 | 
						|
    case clouds do
 | 
						|
      %{^replica => cloud} -> MapSet.member?(cloud, tag)
 | 
						|
      _ -> false
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  @spec tag(t) :: tag
 | 
						|
  defp tag(%State{replica: rep} = state), do: {rep, clock(state)}
 | 
						|
 | 
						|
  @spec clock(t) :: clock
 | 
						|
  defp clock(%State{replica: rep, context: ctx}), do: Map.get(ctx, rep, 0)
 | 
						|
 | 
						|
  @spec bump_clock(t) :: t
 | 
						|
  defp bump_clock(%State{mode: :normal, replica: rep, clouds: clouds, context: ctx, delta: delta} = state) do
 | 
						|
    new_clock = clock(state) + 1
 | 
						|
    new_ctx = Map.put(ctx, rep, new_clock)
 | 
						|
 | 
						|
    %State{state |
 | 
						|
           clouds: put_tag(clouds, {rep, new_clock}),
 | 
						|
           delta: %State{delta | clouds: put_tag(delta.clouds, {rep, new_clock})}}
 | 
						|
    |> put_context(new_ctx)
 | 
						|
  end
 | 
						|
  defp put_context(%State{delta: delta, replica: rep} = state, new_ctx) do
 | 
						|
    {start_clock, end_clock} = delta.range
 | 
						|
    new_end = Map.put(end_clock, rep, Map.get(new_ctx, rep, 0))
 | 
						|
    %State{state |
 | 
						|
           context: new_ctx,
 | 
						|
           delta: %State{delta | range: {start_clock, new_end}}}
 | 
						|
  end
 | 
						|
 | 
						|
  @spec down_replicas(t) :: [name]
 | 
						|
  defp down_replicas(%State{replicas: replicas})  do
 | 
						|
    for {replica, :down} <- replicas, do: replica
 | 
						|
  end
 | 
						|
 | 
						|
  @spec replica_users(t, name) :: [value]
 | 
						|
  defp replica_users(%State{values: values}, replica) do
 | 
						|
    :ets.match_object(values, {:_, :_, {replica, :_}})
 | 
						|
  end
 | 
						|
 | 
						|
  @fold_batch_size 1000
 | 
						|
 | 
						|
  defp foldl(table, initial, ms, func) do
 | 
						|
    foldl(:ets.select(table, ms, @fold_batch_size), initial, func)
 | 
						|
  end
 | 
						|
 | 
						|
  defp foldl(:"$end_of_table", acc, _func), do: acc
 | 
						|
  defp foldl({objects, cont}, acc, func) do
 | 
						|
    foldl(:ets.select(cont), Enum.reduce(objects, acc, func), func)
 | 
						|
  end
 | 
						|
end
 |