Skip to content

Instantly share code, notes, and snippets.

@rjungemann
Forked from tobi/client.rb
Created May 29, 2010 18:09
Show Gist options
  • Save rjungemann/418422 to your computer and use it in GitHub Desktop.
Save rjungemann/418422 to your computer and use it in GitHub Desktop.

Revisions

  1. tobi revised this gist May 29, 2010. 2 changed files with 22 additions and 0 deletions.
    13 changes: 13 additions & 0 deletions client.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,13 @@
    STDOUT.sync = true

    require 'queue'

    start_time = Time.now.to_i
    msg = 0

    queue = Queue.new("testing")
    queue.subscribe do |obj|
    msg += 1
    seconds = Time.now.to_i - start_time
    puts "%.3f ... " % [msg.to_f / seconds]
    end
    9 changes: 9 additions & 0 deletions server.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,9 @@
    require 'queue'

    queue = Queue.new("testing")
    queue.clear

    loop do
    queue.push 1
    puts queue.size
    end
  2. tobi revised this gist May 29, 2010. 1 changed file with 25 additions and 13 deletions.
    38 changes: 25 additions & 13 deletions queue.rb
    Original file line number Diff line number Diff line change
    @@ -1,35 +1,47 @@
    Very efficient message queue backed by redis, using msgpack for serialization.


    require 'rubygems'
    require 'redis'
    require 'digest/md5'
    require 'msgpack'
    #require 'msgpack'

    $redis = Redis.new

    # Very efficient message queue.
    # Performance: ~ 2100 messages per second on i7 iMac
    class Queue

    def initialize(name)
    @name = name
    @queue_name = "queue:#{name}"
    end

    def clear
    $redis.del(@queue_name)
    end

    def size
    $redis.llen(@queue_name)
    end

    def push(object)
    hash = Digest::MD5.hexdigest(object.to_s)
    $redis.set("msg:#{hash}", MessagePack.pack(object))
    $redis.rpush("queue:#{@name}", hash)
    $redis.rpush(@queue_name, hash)
    end

    def subscribe
    loop do
    hash = $redis.blpop("queue:#{@name}", 0)[1]
    hash = $redis.blpop(@queue_name, 0)[1]
    object = $redis.get("msg:#{hash}")
    begin
    yield MessagePack.unpack(object)
    $redis.del("msg:#{hash}")
    rescue
    $redis.lpush("queue:#{@name}", hash)
    end
    if object
    begin
    yield MessagePack.unpack(object)
    # Done, remove message from redis.
    $redis.del("msg:#{hash}")
    rescue
    # Error, add the message again to the end of the queue
    $redis.rpush(@queue_name, hash)
    raise
    end
    end
    end
    end
    end
  3. tobi created this gist May 29, 2010.
    36 changes: 36 additions & 0 deletions queue.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,36 @@
    Very efficient message queue backed by redis, using msgpack for serialization.


    require 'rubygems'
    require 'redis'
    require 'digest/md5'
    require 'msgpack'

    $redis = Redis.new

    class Queue

    def initialize(name)
    @name = name
    end

    def push(object)
    hash = Digest::MD5.hexdigest(object.to_s)
    $redis.set("msg:#{hash}", MessagePack.pack(object))
    $redis.rpush("queue:#{@name}", hash)
    end

    def subscribe
    loop do
    hash = $redis.blpop("queue:#{@name}", 0)[1]
    object = $redis.get("msg:#{hash}")
    begin
    yield MessagePack.unpack(object)
    $redis.del("msg:#{hash}")
    rescue
    $redis.lpush("queue:#{@name}", hash)
    end
    end
    end
    end