70 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
			
		
		
	
	
			70 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
defmodule Phoenix.Tracker.DeltaGeneration do
 | 
						|
  @moduledoc false
 | 
						|
  require Logger
 | 
						|
  alias Phoenix.Tracker.{State, Clock, Replica}
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Extracts minimal delta from generations to satisfy remote clock.
 | 
						|
 | 
						|
  Falls back to extracting entire crdt if unable to match delta.
 | 
						|
  """
 | 
						|
  @spec extract(State.t, [State.delta], State.name, State.context) :: State.delta | State.t
 | 
						|
  def extract(%State{mode: :normal} = state, generations, remote_ref, remote_context) do
 | 
						|
    case delta_fullfilling_clock(generations, remote_context) do
 | 
						|
      {delta, index} ->
 | 
						|
        if index, do: Logger.debug "#{inspect state.replica}: sending delta generation #{index + 1}"
 | 
						|
        State.extract(delta, remote_ref, remote_context)
 | 
						|
      nil ->
 | 
						|
        Logger.debug "#{inspect state.replica}: falling back to sending entire crdt"
 | 
						|
        State.extract(state, remote_ref, remote_context)
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  @spec push(State.t, [State.delta], State.delta, [pos_integer]) :: [State.delta]
 | 
						|
  def push(%State{mode: :normal} = parent, [] = _generations, %State{mode: :delta} = delta, opts) do
 | 
						|
    parent.delta
 | 
						|
    |> List.duplicate(Enum.count(opts))
 | 
						|
    |> do_push(delta, opts, {delta, []})
 | 
						|
  end
 | 
						|
  def push(%State{mode: :normal} = _parent, generations, %State{mode: :delta} = delta, opts) do
 | 
						|
    do_push(generations, delta, opts, {delta, []})
 | 
						|
  end
 | 
						|
  defp do_push([], _delta, [], {_prev, acc}), do: Enum.reverse(acc)
 | 
						|
  defp do_push([gen | generations], delta, [gen_max | opts], {prev, acc}) do
 | 
						|
    case State.merge_deltas(gen, delta) do
 | 
						|
      {:ok, merged} ->
 | 
						|
        if State.delta_size(merged) <= gen_max do
 | 
						|
          do_push(generations, delta, opts, {merged, [merged | acc]})
 | 
						|
        else
 | 
						|
          do_push(generations, delta, opts, {merged, [prev | acc]})
 | 
						|
        end
 | 
						|
 | 
						|
      {:error, :not_contiguous} ->
 | 
						|
        do_push(generations, delta, opts, {gen, [gen | acc]})
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  @doc """
 | 
						|
  Prunes permanently downed replicates from the delta generation list
 | 
						|
  """
 | 
						|
  @spec remove_down_replicas([State.delta], Replica.replica_ref) :: [State.delta]
 | 
						|
  def remove_down_replicas(generations, replica_ref) do
 | 
						|
    Enum.map(generations, fn %State{mode: :delta} = gen ->
 | 
						|
      State.remove_down_replicas(gen, replica_ref)
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  defp delta_fullfilling_clock(generations, remote_context) do
 | 
						|
    generations
 | 
						|
    |> Enum.with_index()
 | 
						|
    |> Enum.find(fn {%State{range: {local_start, local_end}}, _} ->
 | 
						|
      local_start = Clock.filter_replicas(local_start, Clock.replicas(remote_context))
 | 
						|
      local_end = Clock.filter_replicas(local_end, Clock.replicas(remote_context))
 | 
						|
 | 
						|
      not Clock.dominates_or_equal?(local_start, local_end) and
 | 
						|
        Clock.dominates_or_equal?(remote_context, local_start) and
 | 
						|
        not Clock.dominates?(remote_context, local_end)
 | 
						|
    end)
 | 
						|
  end
 | 
						|
end
 |