Skip to content

Instantly share code, notes, and snippets.

@aaronjensen
Last active November 28, 2022 06:59
Show Gist options
  • Save aaronjensen/33cc2aeb74746cac3bcb40dcefdd9c09 to your computer and use it in GitHub Desktop.
Save aaronjensen/33cc2aeb74746cac3bcb40dcefdd9c09 to your computer and use it in GitHub Desktop.

Revisions

  1. aaronjensen revised this gist May 20, 2020. 1 changed file with 3 additions and 0 deletions.
    3 changes: 3 additions & 0 deletions drain_stop.ex
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,6 @@
    # ATTENTION: This is now supported in plug_cowboy as of 2.1.0:
    # https://hexdocs.pm/plug_cowboy/Plug.Cowboy.Drainer.html

    defmodule DrainStop do
    @moduledoc """
    DrainStop Attempts to gracefully shutdown an endpoint when a normal shutdown
  2. aaronjensen revised this gist Jun 2, 2016. 1 changed file with 7 additions and 13 deletions.
    20 changes: 7 additions & 13 deletions drain_stop.ex
    Original file line number Diff line number Diff line change
    @@ -87,22 +87,16 @@ defmodule DrainStop do
    end

    def pending_requests(endpoint) do
    endpoint
    |> ranch_listener_sup_pids
    |> Enum.map(fn pid ->
    for {:ranch_conns_sup, sup_pid, _, _} <- which_children(pid) do
    for {_, request_pid, _, _} <- which_children(sup_pid), do: request_pid
    end
    end)
    |> List.flatten
    for pid <- ranch_listener_sup_pids(endpoint),
    {:ranch_conns_sup, sup_pid, _, _} <- which_children(pid),
    {_, request_pid, _, _} <- which_children(sup_pid),
    do: request_pid
    end

    def ranch_listener_sup_pids(endpoint) do
    pids =
    for {Phoenix.Endpoint.Server, pid, _, _} <- which_children(endpoint) do
    for {{:ranch_listener_sup, _}, pid, _, _} <- which_children(pid), do: pid
    end
    List.flatten(pids)
    for {Phoenix.Endpoint.Server, pid, _, _} <- which_children(endpoint),
    {{:ranch_listener_sup, _}, pid, _, _} <- which_children(pid),
    do: pid
    end

    def stop_listening(endpoint) do
  3. aaronjensen revised this gist Jun 2, 2016. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion drain_stop.ex
    Original file line number Diff line number Diff line change
    @@ -110,4 +110,4 @@ defmodule DrainStop do
    |> ranch_listener_sup_pids
    |> Enum.each(&terminate_child(&1, :ranch_acceptors_sup))
    end
    end
    end
  4. aaronjensen revised this gist Jun 2, 2016. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion drain_stop.ex
    Original file line number Diff line number Diff line change
    @@ -110,4 +110,4 @@ defmodule DrainStop do
    |> ranch_listener_sup_pids
    |> Enum.each(&terminate_child(&1, :ranch_acceptors_sup))
    end
    end
    end
  5. aaronjensen created this gist May 27, 2016.
    113 changes: 113 additions & 0 deletions drain_stop.ex
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,113 @@
    defmodule DrainStop do
    @moduledoc """
    DrainStop Attempts to gracefully shutdown an endpoint when a normal shutdown
    occurs. It first shuts down the acceptor, ensuring that no new requests can be
    made. It then waits for all pending requests to complete. If the timeout
    expires before this happens, it stops waiting, allowing the supervision tree
    to continue its shutdown order.
    DrainStop should be installed in your supervision tree *after* the
    EndPoint it is going to drain stop.
    DrainStop takes two options:
    `endpoint`: The `Phoenix.Endpoint` to drain stop. Required. `timeout`: The
    amount of time to allow for requests to finish in msec. Defaults to `5000`.
    For example:
    children = [
    supervisor(MyApp.Endpoint, []),
    worker(
    DrainStop,
    [[timeout: 10_000, endpoint: MyApp.Endpoint]],
    [shutdown: 15_000]
    )
    ]
    """
    use GenServer
    require Logger
    import Supervisor, only: [which_children: 1, terminate_child: 2]

    def start_link(options) do
    GenServer.start_link(DrainStop, options)
    end

    def init(options) do
    Process.flag(:trap_exit, true)
    endpoint = Keyword.fetch!(options, :endpoint)
    timeout = Keyword.get(options, :timeout, 5000)
    {:ok, {endpoint, timeout}}
    end

    def terminate(:shutdown, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout)
    def terminate({:shutdown, _}, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout)
    def terminate(:normal, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout)
    def terminate(_, _), do: :ok

    def drain_endpoint(endpoint, timeout) do
    stop_listening(endpoint)
    wait_for_requests(endpoint, timeout)
    end

    def wait_for_requests(endpoint, timeout) do
    Logger.info("DrainStop starting graceful shutdown with timeout: #{timeout}")
    timer_ref = :erlang.start_timer(timeout, self, :timeout)

    do_wait_for_requests(endpoint, timer_ref, %{})
    end

    defp do_wait_for_requests(endpoint, timer_ref, refs) do
    get_monitor = fn pid ->
    refs[pid] || Process.monitor(pid)
    end

    refs =
    endpoint
    |> pending_requests
    |> Map.new(&{&1, get_monitor.(&1)})

    case Map.size(refs) do
    0 ->
    Logger.info("DrainStop Successful, no more connections")
    :erlang.cancel_timer(timer_ref)
    n ->
    time_left = :erlang.read_timer(timer_ref)
    Logger.info("DrainStop waiting #{time_left} msec for #{n} more connections to shutdown")
    receive do
    {:DOWN, _monitor_ref, _, _, _} ->
    do_wait_for_requests(endpoint, timer_ref, refs)
    {:timeout, ^timer_ref, :timeout} ->
    Logger.error("DrainStop timeout")
    msg ->
    Logger.error("DrainStop unexpected msg: #{inspect msg}")
    do_wait_for_requests(endpoint, timer_ref, refs)
    end
    end
    end

    def pending_requests(endpoint) do
    endpoint
    |> ranch_listener_sup_pids
    |> Enum.map(fn pid ->
    for {:ranch_conns_sup, sup_pid, _, _} <- which_children(pid) do
    for {_, request_pid, _, _} <- which_children(sup_pid), do: request_pid
    end
    end)
    |> List.flatten
    end

    def ranch_listener_sup_pids(endpoint) do
    pids =
    for {Phoenix.Endpoint.Server, pid, _, _} <- which_children(endpoint) do
    for {{:ranch_listener_sup, _}, pid, _, _} <- which_children(pid), do: pid
    end
    List.flatten(pids)
    end

    def stop_listening(endpoint) do
    endpoint
    |> ranch_listener_sup_pids
    |> Enum.each(&terminate_child(&1, :ranch_acceptors_sup))
    end
    end
    93 changes: 93 additions & 0 deletions drain_stop_test.exs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,93 @@
    defmodule DrainStopTest do
    use ExUnit.Case, async: false
    use Phoenix.ConnTest

    defmodule TestSupervisor do
    @drain_stop_timeout 100

    def start_link do
    import Supervisor.Spec, warn: false
    children = [
    # Start the endpoint when the application starts
    supervisor(DrainStopTest.TestEndpoint, []),
    worker(
    DrainStop,
    [[timeout: @drain_stop_timeout, endpoint: DrainStopTest.TestEndpoint]],
    [shutdown: @drain_stop_timeout * 2]
    ),
    ]

    opts = [strategy: :one_for_one]
    Supervisor.start_link(children, opts)
    end
    end

    defmodule TestPlug do
    def init(opts), do: opts

    def call(conn, _) do
    pid = Application.get_env(:drain_stop_test, :test_pid)

    conn = Plug.Conn.fetch_query_params(conn)
    send pid, :request_start
    {time, _} = Integer.parse(conn.params["sleep"])
    :timer.sleep(time)
    send pid, :request_end

    conn
    end
    end

    defmodule TestEndpoint do
    use Phoenix.Endpoint, otp_app: :drain_stop_test
    plug DrainStopTest.TestPlug
    end

    @endpoint DrainStopTest.TestEndpoint

    setup do
    Application.put_env(:drain_stop_test, :test_pid, self)
    Application.put_env(:drain_stop_test, DrainStopTest.TestEndpoint,
    http: [port: "4807"], url: [host: "example.com"], server: true)

    {:ok, pid} = DrainStopTest.TestSupervisor.start_link
    Process.flag(:trap_exit, true)

    on_exit(fn ->
    if Process.whereis(DrainStopTest.TestEndpoint) do
    Supervisor.stop(DrainStopTest.TestEndpoint)
    end

    Process.exit(pid, :brutal_kill)
    end)

    {:ok, pid: pid}
    end

    test "waits for request to finish", %{pid: pid} do
    Task.async(fn ->
    HTTPoison.get("http://localhost:4807/?sleep=50")
    end)

    assert_receive :request_start, 1000
    Supervisor.stop(pid, :shutdown)
    assert_received :request_end
    end

    test "truncates requests that don't finish in time", %{pid: pid} do
    Task.async(fn ->
    HTTPoison.get("http://localhost:4807/?sleep=500")
    end)

    assert_receive :request_start, 1000
    Supervisor.stop(pid, :shutdown)
    refute_received :request_end
    end

    test "does not allow new requests" do
    # This is harder to test without reaching in to internals...
    DrainStop.stop_listening(DrainStopTest.TestEndpoint)
    assert_raise(HTTPoison.Error,
    fn -> HTTPoison.get!("http://localhost:4807/?sleep=500") end)
    end
    end