-
-
Save aaronjensen/33cc2aeb74746cac3bcb40dcefdd9c09 to your computer and use it in GitHub Desktop.
| defmodule DrainStop do | |
| @moduledoc """ | |
| DrainStop Attempts to gracefully shutdown an endpoint when a normal shutdown | |
| occurs. It first shuts down the acceptor, ensuring that no new requests can be | |
| made. It then waits for all pending requests to complete. If the timeout | |
| expires before this happens, it stops waiting, allowing the supervision tree | |
| to continue its shutdown order. | |
| DrainStop should be installed in your supervision tree *after* the | |
| EndPoint it is going to drain stop. | |
| DrainStop takes two options: | |
| `endpoint`: The `Phoenix.Endpoint` to drain stop. Required. `timeout`: The | |
| amount of time to allow for requests to finish in msec. Defaults to `5000`. | |
| For example: | |
| children = [ | |
| supervisor(MyApp.Endpoint, []), | |
| worker( | |
| DrainStop, | |
| [[timeout: 10_000, endpoint: MyApp.Endpoint]], | |
| [shutdown: 15_000] | |
| ) | |
| ] | |
| """ | |
| use GenServer | |
| require Logger | |
| import Supervisor, only: [which_children: 1, terminate_child: 2] | |
| def start_link(options) do | |
| GenServer.start_link(DrainStop, options) | |
| end | |
| def init(options) do | |
| Process.flag(:trap_exit, true) | |
| endpoint = Keyword.fetch!(options, :endpoint) | |
| timeout = Keyword.get(options, :timeout, 5000) | |
| {:ok, {endpoint, timeout}} | |
| end | |
| def terminate(:shutdown, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout) | |
| def terminate({:shutdown, _}, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout) | |
| def terminate(:normal, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout) | |
| def terminate(_, _), do: :ok | |
| def drain_endpoint(endpoint, timeout) do | |
| stop_listening(endpoint) | |
| wait_for_requests(endpoint, timeout) | |
| end | |
| def wait_for_requests(endpoint, timeout) do | |
| Logger.info("DrainStop starting graceful shutdown with timeout: #{timeout}") | |
| timer_ref = :erlang.start_timer(timeout, self, :timeout) | |
| do_wait_for_requests(endpoint, timer_ref, %{}) | |
| end | |
| defp do_wait_for_requests(endpoint, timer_ref, refs) do | |
| get_monitor = fn pid -> | |
| refs[pid] || Process.monitor(pid) | |
| end | |
| refs = | |
| endpoint | |
| |> pending_requests | |
| |> Map.new(&{&1, get_monitor.(&1)}) | |
| case Map.size(refs) do | |
| 0 -> | |
| Logger.info("DrainStop Successful, no more connections") | |
| :erlang.cancel_timer(timer_ref) | |
| n -> | |
| time_left = :erlang.read_timer(timer_ref) | |
| Logger.info("DrainStop waiting #{time_left} msec for #{n} more connections to shutdown") | |
| receive do | |
| {:DOWN, _monitor_ref, _, _, _} -> | |
| do_wait_for_requests(endpoint, timer_ref, refs) | |
| {:timeout, ^timer_ref, :timeout} -> | |
| Logger.error("DrainStop timeout") | |
| msg -> | |
| Logger.error("DrainStop unexpected msg: #{inspect msg}") | |
| do_wait_for_requests(endpoint, timer_ref, refs) | |
| end | |
| end | |
| end | |
| def pending_requests(endpoint) do | |
| for pid <- ranch_listener_sup_pids(endpoint), | |
| {:ranch_conns_sup, sup_pid, _, _} <- which_children(pid), | |
| {_, request_pid, _, _} <- which_children(sup_pid), | |
| do: request_pid | |
| end | |
| def ranch_listener_sup_pids(endpoint) do | |
| for {Phoenix.Endpoint.Server, pid, _, _} <- which_children(endpoint), | |
| {{:ranch_listener_sup, _}, pid, _, _} <- which_children(pid), | |
| do: pid | |
| end | |
| def stop_listening(endpoint) do | |
| endpoint | |
| |> ranch_listener_sup_pids | |
| |> Enum.each(&terminate_child(&1, :ranch_acceptors_sup)) | |
| end | |
| end |
| defmodule DrainStopTest do | |
| use ExUnit.Case, async: false | |
| use Phoenix.ConnTest | |
| defmodule TestSupervisor do | |
| @drain_stop_timeout 100 | |
| def start_link do | |
| import Supervisor.Spec, warn: false | |
| children = [ | |
| # Start the endpoint when the application starts | |
| supervisor(DrainStopTest.TestEndpoint, []), | |
| worker( | |
| DrainStop, | |
| [[timeout: @drain_stop_timeout, endpoint: DrainStopTest.TestEndpoint]], | |
| [shutdown: @drain_stop_timeout * 2] | |
| ), | |
| ] | |
| opts = [strategy: :one_for_one] | |
| Supervisor.start_link(children, opts) | |
| end | |
| end | |
| defmodule TestPlug do | |
| def init(opts), do: opts | |
| def call(conn, _) do | |
| pid = Application.get_env(:drain_stop_test, :test_pid) | |
| conn = Plug.Conn.fetch_query_params(conn) | |
| send pid, :request_start | |
| {time, _} = Integer.parse(conn.params["sleep"]) | |
| :timer.sleep(time) | |
| send pid, :request_end | |
| conn | |
| end | |
| end | |
| defmodule TestEndpoint do | |
| use Phoenix.Endpoint, otp_app: :drain_stop_test | |
| plug DrainStopTest.TestPlug | |
| end | |
| @endpoint DrainStopTest.TestEndpoint | |
| setup do | |
| Application.put_env(:drain_stop_test, :test_pid, self) | |
| Application.put_env(:drain_stop_test, DrainStopTest.TestEndpoint, | |
| http: [port: "4807"], url: [host: "example.com"], server: true) | |
| {:ok, pid} = DrainStopTest.TestSupervisor.start_link | |
| Process.flag(:trap_exit, true) | |
| on_exit(fn -> | |
| if Process.whereis(DrainStopTest.TestEndpoint) do | |
| Supervisor.stop(DrainStopTest.TestEndpoint) | |
| end | |
| Process.exit(pid, :brutal_kill) | |
| end) | |
| {:ok, pid: pid} | |
| end | |
| test "waits for request to finish", %{pid: pid} do | |
| Task.async(fn -> | |
| HTTPoison.get("http://localhost:4807/?sleep=50") | |
| end) | |
| assert_receive :request_start, 1000 | |
| Supervisor.stop(pid, :shutdown) | |
| assert_received :request_end | |
| end | |
| test "truncates requests that don't finish in time", %{pid: pid} do | |
| Task.async(fn -> | |
| HTTPoison.get("http://localhost:4807/?sleep=500") | |
| end) | |
| assert_receive :request_start, 1000 | |
| Supervisor.stop(pid, :shutdown) | |
| refute_received :request_end | |
| end | |
| test "does not allow new requests" do | |
| # This is harder to test without reaching in to internals... | |
| DrainStop.stop_listening(DrainStopTest.TestEndpoint) | |
| assert_raise(HTTPoison.Error, | |
| fn -> HTTPoison.get!("http://localhost:4807/?sleep=500") end) | |
| end | |
| end |
Thanks for posting this 👍 very helpful for a use case that I have.
Thanks a lot for the job! But how can I use it. I added DrainStop to children following the doc.
And run
iex -S mix phoenix.server
Erlang/OTP 19 [erts-8.0.2] [source-9503fff] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false]
[info] Running MyPro.Endpoint with Cowboy using http://localhost:4000
Interactive Elixir (1.3.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Application.stop(:my_pro)
[info] stop listening
[info] DrainStop starting graceful shutdown with timeout: 10000
[info] DrainStop Successful, no more connections
:ok
[info] Application star_way_bff exited: :stoppedPerfect! it works! But how can I do it for production? Phoenix server will be started as daemon , how can I stop that application?
Is there a mechanism in Elixir can handle Unix killing signals?
You guys have a better idea?
@xingxing not sure if you're still looking for a solution. But I found https://github.com/tsutsu/signal_handler which lets Elixir applications intercept POSIX signals.
@aaronjensen are you planning to package this functionality into a library?
Anybody know if this works with Elixir 1.4.5 and Phoenix 1.3? It doesn't seem to work for me. I put a :timer.sleep/1 in a controller and when I call :init.stop() the connection is severed.
Doesn't seem to work; Elixir 1.5.2, Phoenix 1.3. It still kills open connections that are being blocked by :timer.sleep/1, as reported above. Is this just an artifact of using sleep? Will it work in other scenarios? How can I trust it?
For people trying to use it with Phoenix 1.3, maybe take a look at https://github.com/lyokato/the_end
This is already supported by plug_cowboy (release 2.1.0)
https://hexdocs.pm/plug_cowboy/Plug.Cowboy.Drainer.html
That's great! I'll update the file at the top to say that this is now supported as of plug_cowboy 2.1.0. Thanks for pointing it out.
Thanks for this. I think it'll be a big help for us!