Skip to content

Instantly share code, notes, and snippets.

@chanks
Last active May 26, 2023 05:07
Show Gist options
  • Select an option

  • Save chanks/c2e7e0efbd3d038775208047abb68524 to your computer and use it in GitHub Desktop.

Select an option

Save chanks/c2e7e0efbd3d038775208047abb68524 to your computer and use it in GitHub Desktop.

Revisions

  1. chanks revised this gist Jan 8, 2020. No changes.
  2. chanks revised this gist Oct 7, 2019. 1 changed file with 5 additions and 0 deletions.
    5 changes: 5 additions & 0 deletions script.lua
    Original file line number Diff line number Diff line change
    @@ -31,6 +31,11 @@ local xinfo_groups = redis.call("XINFO", "GROUPS", KEYS[1])
    local last_delivered_ids = {}
    local groups = {}

    if #xinfo_groups == 0 then
    -- When there's no groups, there's nothing to delete.
    return 0
    end

    for _, group_info_array in ipairs(xinfo_groups) do
    -- Redis passes us a flattened array of key, value pairs, so before
    -- anything else, convert it into a proper hash-style table so that it's
  3. chanks created this gist Jan 12, 2019.
    128 changes: 128 additions & 0 deletions script.lua
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,128 @@
    -- The goal of this script is to trim messages that have been processed by
    -- all extant groups from the a given Redis stream. It returns the number
    -- of messages that were deleted from the stream, if any. I make no
    -- guarantees about its performance, particularly if the stream is large
    -- and not fully processed (so a simple XTRIM isn't possible).

    -- First off, bail out early if the stream doesn't exist.
    if redis.call("EXISTS", KEYS[1]) == 0 then
    return false
    end

    -- To figure out what messages are deletable, we fetch the "last-
    -- delivered-id" for each consumer group of the stream, and set the lowest
    -- one of those ids as our upper bound. Next, we scan the pending lists
    -- for each group, because we also don't want to delete any events that
    -- are delivered but not acknowledged. The lowest unacknowledged id (if
    -- any exists) then becomes our new upper bound.

    -- "last-delivered-id" isn't mentioned in the Redis docs, for some reason,
    -- but it's in there, as of 5.0.3.

    -- In the (common?) case where there are no pending messages, and the
    -- lowest last-delivered-id equals the most recent id on the stream, we
    -- can just do a simpler (and much more efficient) XTRIM stream MAXLEN 0.
    -- If we can't do that, we'll have to pull in all the message ids before
    -- the lowest unacknowledged id, and XDEL them all.

    -- First, use XINFO GROUPS to get all group names and the most recently
    -- distributed ids.
    local xinfo_groups = redis.call("XINFO", "GROUPS", KEYS[1])
    local last_delivered_ids = {}
    local groups = {}

    for _, group_info_array in ipairs(xinfo_groups) do
    -- Redis passes us a flattened array of key, value pairs, so before
    -- anything else, convert it into a proper hash-style table so that it's
    -- easier to use.

    local group_info = {}
    for i = 1, #group_info_array, 2 do
    group_info[group_info_array[i]] = group_info_array[i+1]
    end

    table.insert(groups, group_info["name"])
    table.insert(last_delivered_ids, group_info["last-delivered-id"])
    end

    local lowest_pending_ids = {}

    for _, group_name in ipairs(groups) do
    local pending = redis.call("XPENDING", KEYS[1], group_name)

    local lowest_id = pending[2]
    if not lowest_id == false then
    table.insert(lowest_pending_ids, lowest_id)
    end
    end

    local function string_id_to_table(s)
    local t = {}

    for k, v in string.gmatch(s, "(%d+)-(%d+)") do
    table.insert(t, tonumber(k))
    table.insert(t, tonumber(v))
    end

    return t
    end

    -- Returns true if a < b, or if a == b (which is important later).
    local function compare_ids(a, b)
    local a_t = string_id_to_table(a)
    local b_t = string_id_to_table(b)

    return ((a_t[1] <= b_t[1]) and (a_t[2] <= b_t[2]))
    end

    table.sort(last_delivered_ids, compare_ids)
    table.sort(lowest_pending_ids, compare_ids)

    -- Here's our XTRIM optimization.
    if #lowest_pending_ids == 0 then
    local stream_info_array = redis.call("XINFO", "STREAM", KEYS[1])

    local stream_info = {}
    for i = 1, #stream_info_array, 2 do
    stream_info[stream_info_array[i]] = stream_info_array[i+1]
    end

    if last_delivered_ids[1] == stream_info["last-generated-id"] then
    -- Yay!
    return redis.call("XTRIM", KEYS[1], "MAXLEN", 0)
    end
    end

    -- If we've gotten here, looks like we need to do a big XDEL, so find our
    -- lower bound.
    local lowest_id = last_delivered_ids[1]

    -- We can include the lowest delivered id in the deletion, so long as it
    -- isn't pending, which we'll check for next.
    local protect_lowest_id = false

    if #lowest_pending_ids > 0 then
    -- We rely here on compare_ids returning true if the ids are equal.
    if compare_ids(lowest_pending_ids[1], lowest_id) then
    lowest_id = lowest_pending_ids[1]
    protect_lowest_id = true
    end
    end

    local messages = redis.call("XRANGE", KEYS[1], "-", lowest_id)

    if #messages == 0 then
    -- Nothing to delete.
    return 0
    end

    local delete_command = {"XDEL", KEYS[1]}

    for _,t in pairs(messages) do
    local id = t[1]
    if (lowest_id ~= id) or (not protect_lowest_id) then
    table.insert(delete_command, id)
    end
    end

    return redis.call(unpack(delete_command))