Skip to content

Instantly share code, notes, and snippets.

@jdoconnor
Last active August 29, 2015 13:57
Show Gist options
  • Select an option

  • Save jdoconnor/9438386 to your computer and use it in GitHub Desktop.

Select an option

Save jdoconnor/9438386 to your computer and use it in GitHub Desktop.

Revisions

  1. Jay OConnor revised this gist Mar 9, 2014. 5 changed files with 112 additions and 79 deletions.
    3 changes: 3 additions & 0 deletions Gemfile
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,6 @@
    source 'https://rubygems.org'

    gem 'bunny'
    gem 'hashie'
    gem 'pry'
    gem 'json'
    71 changes: 0 additions & 71 deletions client.rb
    Original file line number Diff line number Diff line change
    @@ -1,71 +0,0 @@
    #!/usr/bin/env ruby
    # encoding: utf-8

    require "bunny"
    require "thread"

    conn = Bunny.new(:automatically_recover => false)
    conn.start

    ch = conn.create_channel


    class FibonacciClient
    attr_reader :reply_queue
    attr_accessor :response, :call_id
    attr_reader :lock, :condition

    def initialize(ch, server_queue)
    @ch = ch
    @x = ch.default_exchange

    @server_queue = server_queue
    @reply_queue = ch.queue("", :exclusive => true)


    @lock = Mutex.new
    @condition = ConditionVariable.new

    @reply_queue.subscribe do |delivery_info, properties, payload|
    puts "response_id #{properties[:correlation_id]}"
    puts properties[:correlation_id] == self.call_id ? "correct id" : "BAD id"
    if properties[:correlation_id] == self.call_id
    self.response = payload.to_i
    self.lock.synchronize{self.condition.signal}
    end
    end
    end

    def call(n)
    self.call_id = self.generate_uuid

    @x.publish(n.to_s,
    :routing_key => @server_queue,
    :correlation_id => call_id,
    :reply_to => @reply_queue.name)
    puts "call id #{call_id}"
    self.response = nil
    # params to synchronize are mutex, timeout_in_seconds
    lock.synchronize{condition.wait(lock, 1)}
    response
    end

    protected

    def generate_uuid
    # very naive but good enough for code
    # examples
    "#{rand}#{rand}#{rand}"
    end
    end


    client = FibonacciClient.new(ch, "rpc_queue")
    (1..1000).each do |i|
    puts " [x] Requesting fib(#{i % 20})"
    response = client.call(i % 20)
    puts " [.] Got #{response}"
    end

    ch.close
    conn.close
    81 changes: 81 additions & 0 deletions my_client.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,81 @@
    class MyClient
    require "bunny"
    require "thread"
    require "hashie"
    require 'pry'
    require 'json'

    attr_accessor :route
    attr_accessor :response, :call_id
    attr_reader :lock, :condition
    attr_reader :reply_queue

    def initialize(options = {})
    # make this connection part a singleton and a LOT of time is saved, as well as reusing the same connection
    conn = Bunny.new(:automatically_recover => false)
    conn.start
    @ch = conn.create_channel

    @defaults = Hashie::Mash.new({
    server_queue: nil,
    exchange: @ch.default_exchange
    }.merge(options)
    )

    @lock = Mutex.new
    @condition = ConditionVariable.new

    end

    def listen_for_response
    # listen on a new queue for this response
    @reply_queue = @ch.queue("", :exclusive => true)
    @reply_queue.subscribe do |delivery_info, properties, payload|
    puts "response_id #{properties[:correlation_id]}"
    puts properties[:correlation_id] == self.call_id ? "correct id" : "BAD id"
    if properties[:correlation_id] == self.call_id
    self.response = payload
    self.lock.synchronize{self.condition.signal}
    end
    end
    end

    def send_request(routing_options, method, params)
    self.call_id = self.generate_uuid

    data_string = {method: method, params: params}.to_json

    routing_options.exchange.publish(
    data_string,
    routing_key: routing_options.server_queue,
    correlation_id: call_id,
    reply_to: @reply_queue.name)
    puts "call id #{call_id}"
    self.response = nil
    # params to synchronize are mutex, timeout_in_seconds
    lock.synchronize{condition.wait(lock, 5)}
    response
    end

    def request(options = {})
    options = Hashie::Mash.new(options)
    # grab out the expected data
    method = options.delete(:method)
    params = options.delete(:params)

    # merge the connection options with the defaults
    routing_options = @defaults.merge(options)

    listen_for_response
    response = send_request(routing_options, method, params)
    # parse and return response
    Hashie::Mash.new(JSON.parse(response))
    end

    def generate_uuid
    # very naive but good enough for code
    # examples
    "#{rand}#{rand}#{rand}"
    end

    end
    21 changes: 13 additions & 8 deletions server.rb
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,10 @@
    #!/usr/bin/env ruby
    #!/usr/bin/env ruby
    # encoding: utf-8

    require "bunny"
    require 'hashie'
    require 'json'
    require 'pry'

    conn = Bunny.new(:automatically_recover => false)
    conn.start
    @@ -19,13 +22,15 @@ def start(queue_name)
    @x = @ch.default_exchange

    @q.subscribe(:block => true, ack: true) do |delivery_info, properties, payload|
    n = payload.to_i
    r = self.class.fib(n)

    puts " [.] fib(#{n})"

    @x.publish(r.to_s, :routing_key => properties.reply_to, :correlation_id => properties.correlation_id)
    @ch.acknowledge(delivery_info.delivery_tag, false)
    req = Hashie::Mash.new(JSON.parse(payload))
    if req[:method] == 'fib'
    n = req.params.number.to_i
    r = self.class.fib(n)
    puts " [.] fib(#{n})"
    data_string = {data: { value: r } }.to_json
    @x.publish(data_string, :routing_key => properties.reply_to, :correlation_id => properties.correlation_id)
    @ch.acknowledge(delivery_info.delivery_tag, false)
    end
    end
    end

    15 changes: 15 additions & 0 deletions user_code.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,15 @@
    # DSL I WANT to use
    # route = queue: 'user_service'
    # client = MyClient.new
    # response = client.request(route: route, method: 'show', params: {})
    # response.data = { some: hash_value }
    # response.error? # => false

    require './my_client'

    client = MyClient.new(server_queue: 'rpc_queue')

    (1..100).each do |c|
    response = client.request(method: :fib, params: { number: 20 })
    puts "i got this #{response.data.value}"
    end
  2. Jay OConnor revised this gist Mar 8, 2014. 1 changed file with 3 additions and 0 deletions.
    3 changes: 3 additions & 0 deletions Gemfile
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,3 @@
    source 'https://rubygems.org'

    gem 'bunny'
  3. Jay OConnor created this gist Mar 8, 2014.
    71 changes: 71 additions & 0 deletions client.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,71 @@
    #!/usr/bin/env ruby
    # encoding: utf-8

    require "bunny"
    require "thread"

    conn = Bunny.new(:automatically_recover => false)
    conn.start

    ch = conn.create_channel


    class FibonacciClient
    attr_reader :reply_queue
    attr_accessor :response, :call_id
    attr_reader :lock, :condition

    def initialize(ch, server_queue)
    @ch = ch
    @x = ch.default_exchange

    @server_queue = server_queue
    @reply_queue = ch.queue("", :exclusive => true)


    @lock = Mutex.new
    @condition = ConditionVariable.new

    @reply_queue.subscribe do |delivery_info, properties, payload|
    puts "response_id #{properties[:correlation_id]}"
    puts properties[:correlation_id] == self.call_id ? "correct id" : "BAD id"
    if properties[:correlation_id] == self.call_id
    self.response = payload.to_i
    self.lock.synchronize{self.condition.signal}
    end
    end
    end

    def call(n)
    self.call_id = self.generate_uuid

    @x.publish(n.to_s,
    :routing_key => @server_queue,
    :correlation_id => call_id,
    :reply_to => @reply_queue.name)
    puts "call id #{call_id}"
    self.response = nil
    # params to synchronize are mutex, timeout_in_seconds
    lock.synchronize{condition.wait(lock, 1)}
    response
    end

    protected

    def generate_uuid
    # very naive but good enough for code
    # examples
    "#{rand}#{rand}#{rand}"
    end
    end


    client = FibonacciClient.new(ch, "rpc_queue")
    (1..1000).each do |i|
    puts " [x] Requesting fib(#{i % 20})"
    response = client.call(i % 20)
    puts " [.] Got #{response}"
    end

    ch.close
    conn.close
    52 changes: 52 additions & 0 deletions server.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,52 @@
    #!/usr/bin/env ruby
    # encoding: utf-8

    require "bunny"

    conn = Bunny.new(:automatically_recover => false)
    conn.start

    ch = conn.create_channel

    class FibonacciServer

    def initialize(ch)
    @ch = ch
    end

    def start(queue_name)
    @q = @ch.queue(queue_name, durable: true)
    @x = @ch.default_exchange

    @q.subscribe(:block => true, ack: true) do |delivery_info, properties, payload|
    n = payload.to_i
    r = self.class.fib(n)

    puts " [.] fib(#{n})"

    @x.publish(r.to_s, :routing_key => properties.reply_to, :correlation_id => properties.correlation_id)
    @ch.acknowledge(delivery_info.delivery_tag, false)
    end
    end


    def self.fib(n)
    case n
    when 0 then 0
    when 1 then 1
    else
    fib(n - 1) + fib(n - 2)
    end
    end
    end

    begin
    server = FibonacciServer.new(ch)
    " [x] Awaiting RPC requests"
    server.start("rpc_queue")
    rescue Interrupt => _
    ch.close
    conn.close

    exit(0)
    end