Skip to content

Instantly share code, notes, and snippets.

@sorentwo
Last active August 2, 2022 14:33
Show Gist options
  • Save sorentwo/aa75df4f3991fce8f516bcaff1c7882c to your computer and use it in GitHub Desktop.
Save sorentwo/aa75df4f3991fce8f516bcaff1c7882c to your computer and use it in GitHub Desktop.

Revisions

  1. sorentwo revised this gist Aug 1, 2022. 1 changed file with 10 additions and 0 deletions.
    10 changes: 10 additions & 0 deletions queue_breaker.ex
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,14 @@
    defmodule QueueBreaker do
    @moduledoc """
    Automatic queue pausing/resuming based on accumulated errors.
    ## Example
    Pause after 1000 errors:
    QueueBreaker.attach("risky-queue", 1000)
    """

    def attach(queue, limit) do
    counter = :counters.new(1, [:write_concurrency])
    conf = {queue, limit, counter}
  2. sorentwo created this gist Aug 1, 2022.
    28 changes: 28 additions & 0 deletions queue_breaker.ex
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,28 @@
    defmodule QueueBreaker do
    def attach(queue, limit) do
    counter = :counters.new(1, [:write_concurrency])
    conf = {queue, limit, counter}

    :telemetry.attach("queue-break", [:oban, :job, :exception], &__MODULE__.handler/4, conf)
    end

    def handle(_, _, %{conf: conf, job: job, queue: queue}, {queue, limit, counter}) do
    :counters.add(counter, 1, 1)

    if :counters.get(counter, 1) >= limit do
    Oban.pause_queue(conf.name, queue: queue)

    30
    |> :timer.seconds()
    |> :timer.apply_after(__MODULE__, :reset, [conf, queue, counter])
    end
    end

    def handle(_events, _measure, _meta, _conf), do: :ok

    def reset(conf, queue, counter) do
    :counters.put(counter, 1, 0)

    Oban.resume_queue(conf.name, queue: queue)
    end
    end