# Author: Pieter Noordhuis # Description: Simple demo to showcase Redis PubSub with EventMachine # # Update 7 Oct 2010: # - This example does *not* appear to work with Chrome >=6.0. Apparently, # the WebSocket protocol implementation in the cramp gem does not work # well with Chrome's (newer) WebSocket implementation. # # Requirements: # - rubygems: eventmachine, thin, cramp, sinatra, yajl-ruby # - a browser with WebSocket support # # Usage: # ruby redis_pubsub_demo.rb # require 'rubygems' require 'eventmachine' require 'stringio' require 'sinatra/base' require 'cramp/controller' require 'yajl' # Incomplete evented Redis implementation specifically made for # the new PubSub features in Redis. class EventedRedis < EM::Connection def self.connect host = (ENV['REDIS_HOST'] || 'localhost') port = (ENV['REDIS_PORT'] || 6379).to_i EM.connect host, port, self end def post_init @blocks = {} end def subscribe(*channels, &blk) channels.each { |c| @blocks[c.to_s] = blk } call_command('subscribe', *channels) end def publish(channel, msg) call_command('publish', channel, msg) end def unsubscribe call_command('unsubscribe') end def receive_data(data) buffer = StringIO.new(data) begin parts = read_response(buffer) if parts.is_a?(Array) ret = @blocks[parts[1]].call(parts) close_connection if ret === false end end while !buffer.eof? end private def read_response(buffer) type = buffer.read(1) case type when ':' buffer.gets.to_i when '*' size = buffer.gets.to_i parts = size.times.map { read_object(buffer) } else raise "unsupported response type" end end def read_object(data) type = data.read(1) case type when ':' # integer data.gets.to_i when '$' size = data.gets str = data.read(size.to_i) data.read(2) # crlf str else raise "read for object of type #{type} not implemented" end end # only support multi-bulk def call_command(*args) command = "*#{args.size}\r\n" args.each { |a| command << "$#{a.to_s.size}\r\n" command << a.to_s command << "\r\n" } send_data command end end class ChatController < Cramp::Controller::Websocket on_start :create_redis on_finish :handle_leave, :destroy_redis on_data :received_data def create_redis @pub = EventedRedis.connect @sub = EventedRedis.connect end def destroy_redis @pub.close_connection_after_writing @sub.close_connection_after_writing end def received_data(data) msg = parse_json(data) case msg[:action] when 'join' handle_join(msg) when 'message' handle_message(msg) else # skip end end def handle_join(msg) @user = msg[:user] subscribe publish :action => 'control', :user => @user, :message => 'joined the chat room' end def handle_leave publish :action => 'control', :user => @user, :message => 'left the chat room' end def handle_message(msg) publish msg.merge(:user => @user) end private def subscribe @sub.subscribe('chat') do |type,channel,message| render message end end def publish(message) @pub.publish('chat', encode_json(message)) end def encode_json(obj) Yajl::Encoder.encode(obj) end def parse_json(str) Yajl::Parser.parse(str, :symbolize_keys => true) end end class StaticController < Sinatra::Base enable :inline_templates get('/') { erb :main } end EventMachine.run { Cramp::Controller::Websocket.backend = :thin Rack::Handler::Thin.run ChatController, :Port => 8081 Rack::Handler::Thin.run StaticController, :Port => 8082 } __END__ @@ main Fork me on GitHub