Last active
November 28, 2022 06:59
-
-
Save aaronjensen/33cc2aeb74746cac3bcb40dcefdd9c09 to your computer and use it in GitHub Desktop.
Revisions
-
aaronjensen revised this gist
May 20, 2020 . 1 changed file with 3 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,6 @@ # ATTENTION: This is now supported in plug_cowboy as of 2.1.0: # https://hexdocs.pm/plug_cowboy/Plug.Cowboy.Drainer.html defmodule DrainStop do @moduledoc """ DrainStop Attempts to gracefully shutdown an endpoint when a normal shutdown -
aaronjensen revised this gist
Jun 2, 2016 . 1 changed file with 7 additions and 13 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -87,22 +87,16 @@ defmodule DrainStop do end def pending_requests(endpoint) do for pid <- ranch_listener_sup_pids(endpoint), {:ranch_conns_sup, sup_pid, _, _} <- which_children(pid), {_, request_pid, _, _} <- which_children(sup_pid), do: request_pid end def ranch_listener_sup_pids(endpoint) do for {Phoenix.Endpoint.Server, pid, _, _} <- which_children(endpoint), {{:ranch_listener_sup, _}, pid, _, _} <- which_children(pid), do: pid end def stop_listening(endpoint) do -
aaronjensen revised this gist
Jun 2, 2016 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -110,4 +110,4 @@ defmodule DrainStop do |> ranch_listener_sup_pids |> Enum.each(&terminate_child(&1, :ranch_acceptors_sup)) end end -
aaronjensen revised this gist
Jun 2, 2016 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -110,4 +110,4 @@ defmodule DrainStop do |> ranch_listener_sup_pids |> Enum.each(&terminate_child(&1, :ranch_acceptors_sup)) end end -
aaronjensen created this gist
May 27, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,113 @@ defmodule DrainStop do @moduledoc """ DrainStop Attempts to gracefully shutdown an endpoint when a normal shutdown occurs. It first shuts down the acceptor, ensuring that no new requests can be made. It then waits for all pending requests to complete. If the timeout expires before this happens, it stops waiting, allowing the supervision tree to continue its shutdown order. DrainStop should be installed in your supervision tree *after* the EndPoint it is going to drain stop. DrainStop takes two options: `endpoint`: The `Phoenix.Endpoint` to drain stop. Required. `timeout`: The amount of time to allow for requests to finish in msec. Defaults to `5000`. For example: children = [ supervisor(MyApp.Endpoint, []), worker( DrainStop, [[timeout: 10_000, endpoint: MyApp.Endpoint]], [shutdown: 15_000] ) ] """ use GenServer require Logger import Supervisor, only: [which_children: 1, terminate_child: 2] def start_link(options) do GenServer.start_link(DrainStop, options) end def init(options) do Process.flag(:trap_exit, true) endpoint = Keyword.fetch!(options, :endpoint) timeout = Keyword.get(options, :timeout, 5000) {:ok, {endpoint, timeout}} end def terminate(:shutdown, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout) def terminate({:shutdown, _}, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout) def terminate(:normal, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout) def terminate(_, _), do: :ok def drain_endpoint(endpoint, timeout) do stop_listening(endpoint) wait_for_requests(endpoint, timeout) end def wait_for_requests(endpoint, timeout) do Logger.info("DrainStop starting graceful shutdown with timeout: #{timeout}") timer_ref = :erlang.start_timer(timeout, self, :timeout) do_wait_for_requests(endpoint, timer_ref, %{}) end defp do_wait_for_requests(endpoint, timer_ref, refs) do get_monitor = fn pid -> refs[pid] || Process.monitor(pid) end refs = endpoint |> pending_requests |> Map.new(&{&1, get_monitor.(&1)}) case Map.size(refs) do 0 -> Logger.info("DrainStop Successful, no more connections") :erlang.cancel_timer(timer_ref) n -> time_left = :erlang.read_timer(timer_ref) Logger.info("DrainStop waiting #{time_left} msec for #{n} more connections to shutdown") receive do {:DOWN, _monitor_ref, _, _, _} -> do_wait_for_requests(endpoint, timer_ref, refs) {:timeout, ^timer_ref, :timeout} -> Logger.error("DrainStop timeout") msg -> Logger.error("DrainStop unexpected msg: #{inspect msg}") do_wait_for_requests(endpoint, timer_ref, refs) end end end def pending_requests(endpoint) do endpoint |> ranch_listener_sup_pids |> Enum.map(fn pid -> for {:ranch_conns_sup, sup_pid, _, _} <- which_children(pid) do for {_, request_pid, _, _} <- which_children(sup_pid), do: request_pid end end) |> List.flatten end def ranch_listener_sup_pids(endpoint) do pids = for {Phoenix.Endpoint.Server, pid, _, _} <- which_children(endpoint) do for {{:ranch_listener_sup, _}, pid, _, _} <- which_children(pid), do: pid end List.flatten(pids) end def stop_listening(endpoint) do endpoint |> ranch_listener_sup_pids |> Enum.each(&terminate_child(&1, :ranch_acceptors_sup)) end end This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,93 @@ defmodule DrainStopTest do use ExUnit.Case, async: false use Phoenix.ConnTest defmodule TestSupervisor do @drain_stop_timeout 100 def start_link do import Supervisor.Spec, warn: false children = [ # Start the endpoint when the application starts supervisor(DrainStopTest.TestEndpoint, []), worker( DrainStop, [[timeout: @drain_stop_timeout, endpoint: DrainStopTest.TestEndpoint]], [shutdown: @drain_stop_timeout * 2] ), ] opts = [strategy: :one_for_one] Supervisor.start_link(children, opts) end end defmodule TestPlug do def init(opts), do: opts def call(conn, _) do pid = Application.get_env(:drain_stop_test, :test_pid) conn = Plug.Conn.fetch_query_params(conn) send pid, :request_start {time, _} = Integer.parse(conn.params["sleep"]) :timer.sleep(time) send pid, :request_end conn end end defmodule TestEndpoint do use Phoenix.Endpoint, otp_app: :drain_stop_test plug DrainStopTest.TestPlug end @endpoint DrainStopTest.TestEndpoint setup do Application.put_env(:drain_stop_test, :test_pid, self) Application.put_env(:drain_stop_test, DrainStopTest.TestEndpoint, http: [port: "4807"], url: [host: "example.com"], server: true) {:ok, pid} = DrainStopTest.TestSupervisor.start_link Process.flag(:trap_exit, true) on_exit(fn -> if Process.whereis(DrainStopTest.TestEndpoint) do Supervisor.stop(DrainStopTest.TestEndpoint) end Process.exit(pid, :brutal_kill) end) {:ok, pid: pid} end test "waits for request to finish", %{pid: pid} do Task.async(fn -> HTTPoison.get("http://localhost:4807/?sleep=50") end) assert_receive :request_start, 1000 Supervisor.stop(pid, :shutdown) assert_received :request_end end test "truncates requests that don't finish in time", %{pid: pid} do Task.async(fn -> HTTPoison.get("http://localhost:4807/?sleep=500") end) assert_receive :request_start, 1000 Supervisor.stop(pid, :shutdown) refute_received :request_end end test "does not allow new requests" do # This is harder to test without reaching in to internals... DrainStop.stop_listening(DrainStopTest.TestEndpoint) assert_raise(HTTPoison.Error, fn -> HTTPoison.get!("http://localhost:4807/?sleep=500") end) end end