|  |  | @@ -0,0 +1,353 @@ | 
    
    |  |  | # Author: Pieter Noordhuis | 
    
    |  |  | # Description: Simple demo to showcase Redis PubSub with EventMachine | 
    
    |  |  | # | 
    
    |  |  | # Required rubygems: | 
    
    |  |  | # eventmachine, thin, cramp, sinatra, yajl-ruby | 
    
    |  |  | # | 
    
    |  |  | # Usage: | 
    
    |  |  | # ruby redis_pubsub_demo.rb | 
    
    |  |  | # | 
    
    |  |  | 
 | 
    
    |  |  | require 'rubygems' | 
    
    |  |  | require 'eventmachine' | 
    
    |  |  | require 'stringio' | 
    
    |  |  | require 'sinatra/base' | 
    
    |  |  | require 'cramp/controller' | 
    
    |  |  | require 'yajl' | 
    
    |  |  | 
 | 
    
    |  |  | # Incomplete evented Redis implementation specifically made for | 
    
    |  |  | # the new PubSub features in Redis. | 
    
    |  |  | class EventedRedis < EM::Connection | 
    
    |  |  | def self.connect | 
    
    |  |  | EM.connect 'localhost', 6379, self | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def post_init | 
    
    |  |  | @blocks = {} | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def subscribe(*channels, &blk) | 
    
    |  |  | channels.each { |c| @blocks[c.to_s] = blk } | 
    
    |  |  | call_command('subscribe', *channels) | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def publish(channel, msg) | 
    
    |  |  | call_command('publish', channel, msg) | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def unsubscribe | 
    
    |  |  | call_command('unsubscribe') | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def receive_data(data) | 
    
    |  |  | buffer = StringIO.new(data) | 
    
    |  |  | begin | 
    
    |  |  | parts = read_response(buffer) | 
    
    |  |  | if parts.is_a?(Array) | 
    
    |  |  | ret = @blocks[parts[1]].call(parts) | 
    
    |  |  | close_connection if ret === false | 
    
    |  |  | end | 
    
    |  |  | end while !buffer.eof? | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | private | 
    
    |  |  | def read_response(buffer) | 
    
    |  |  | type = buffer.read(1) | 
    
    |  |  | case type | 
    
    |  |  | when ':' | 
    
    |  |  | buffer.gets.to_i | 
    
    |  |  | when '*' | 
    
    |  |  | size = buffer.gets.to_i | 
    
    |  |  | parts = size.times.map { read_object(buffer) } | 
    
    |  |  | else | 
    
    |  |  | raise "unsupported response type" | 
    
    |  |  | end | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def read_object(data) | 
    
    |  |  | type = data.read(1) | 
    
    |  |  | case type | 
    
    |  |  | when ':' # integer | 
    
    |  |  | data.gets.to_i | 
    
    |  |  | when '$' | 
    
    |  |  | size = data.gets | 
    
    |  |  | str = data.read(size.to_i) | 
    
    |  |  | data.read(2) # crlf | 
    
    |  |  | str | 
    
    |  |  | else | 
    
    |  |  | raise "read for object of type #{type} not implemented" | 
    
    |  |  | end | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | # only support multi-bulk | 
    
    |  |  | def call_command(*args) | 
    
    |  |  | command = "*#{args.size}\r\n" | 
    
    |  |  | args.each { |a| | 
    
    |  |  | command << "$#{a.to_s.size}\r\n" | 
    
    |  |  | command << a.to_s | 
    
    |  |  | command << "\r\n" | 
    
    |  |  | } | 
    
    |  |  | send_data command | 
    
    |  |  | end | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | class ChatController < Cramp::Controller::Websocket | 
    
    |  |  | on_start :create_redis | 
    
    |  |  | on_finish :handle_leave, :destroy_redis | 
    
    |  |  | on_data :received_data | 
    
    |  |  | 
 | 
    
    |  |  | def create_redis | 
    
    |  |  | @pub = EventedRedis.connect | 
    
    |  |  | @sub = EventedRedis.connect | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def destroy_redis | 
    
    |  |  | @pub.close_connection_after_writing | 
    
    |  |  | @sub.close_connection_after_writing | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def received_data(data) | 
    
    |  |  | msg = parse_json(data) | 
    
    |  |  | case msg[:action] | 
    
    |  |  | when 'join' | 
    
    |  |  | handle_join(msg) | 
    
    |  |  | when 'message' | 
    
    |  |  | handle_message(msg) | 
    
    |  |  | else | 
    
    |  |  | # skip | 
    
    |  |  | end | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def handle_join(msg) | 
    
    |  |  | @user = msg[:user] | 
    
    |  |  | subscribe | 
    
    |  |  | publish :action => 'control', :user => @user, :message => 'joined the chat room' | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def handle_leave | 
    
    |  |  | publish :action => 'control', :user => @user, :message => 'left the chat room' | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def handle_message(msg) | 
    
    |  |  | publish msg.merge(:user => @user) | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | private | 
    
    |  |  | def subscribe | 
    
    |  |  | @sub.subscribe('chat') do |type,channel,message| | 
    
    |  |  | render message | 
    
    |  |  | end | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def publish(message) | 
    
    |  |  | @pub.publish('chat', encode_json(message)) | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def encode_json(obj) | 
    
    |  |  | Yajl::Encoder.encode(obj) | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def parse_json(str) | 
    
    |  |  | Yajl::Parser.parse(str, :symbolize_keys => true) | 
    
    |  |  | end | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | class StaticController < Sinatra::Base | 
    
    |  |  | enable :inline_templates | 
    
    |  |  | get('/') { erb :main } | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | EventMachine.run { | 
    
    |  |  | Cramp::Controller::Websocket.backend = :thin | 
    
    |  |  | Rack::Handler::Thin.run ChatController, :Port => 8081 | 
    
    |  |  | Rack::Handler::Thin.run StaticController, :Port => 8082 | 
    
    |  |  | } | 
    
    |  |  | 
 | 
    
    |  |  | __END__ | 
    
    |  |  | @@ main | 
    
    |  |  | <html> | 
    
    |  |  | <head> | 
    
    |  |  | <script src='http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js'></script> | 
    
    |  |  | <script src='http://jquery-json.googlecode.com/files/jquery.json-2.2.min.js'></script> | 
    
    |  |  | <script> | 
    
    |  |  | $(document).ready(function(){ | 
    
    |  |  | if (typeof WebSocket != 'undefined') { | 
    
    |  |  | $('#ask').show(); | 
    
    |  |  | } else { | 
    
    |  |  | $('#error').show(); | 
    
    |  |  | } | 
    
    |  |  | 
 | 
    
    |  |  | // join on enter | 
    
    |  |  | $('#ask input').keydown(function(event) { | 
    
    |  |  | if (event.keyCode == 13) { | 
    
    |  |  | $('#ask a').click(); | 
    
    |  |  | } | 
    
    |  |  | }) | 
    
    |  |  | 
 | 
    
    |  |  | // join on click | 
    
    |  |  | $('#ask a').click(function() { | 
    
    |  |  | join($('#ask input').val()); | 
    
    |  |  | $('#ask').hide(); | 
    
    |  |  | $('#channel').show(); | 
    
    |  |  | $('input#message').focus(); | 
    
    |  |  | }); | 
    
    |  |  | 
 | 
    
    |  |  | function join(name) { | 
    
    |  |  | var host = window.location.host.split(':')[0]; | 
    
    |  |  | var ws = new WebSocket("ws://" + host + ":8081/websocket"); | 
    
    |  |  | 
 | 
    
    |  |  | var container = $('div#msgs'); | 
    
    |  |  | ws.onmessage = function(evt) { | 
    
    |  |  | var obj = $.evalJSON(evt.data); | 
    
    |  |  | if (typeof obj != 'object') return; | 
    
    |  |  | 
 | 
    
    |  |  | var action = obj['action']; | 
    
    |  |  | var struct = container.find('li.' + action + ':first'); | 
    
    |  |  | if (struct.length < 1) { | 
    
    |  |  | console.log("Could not handle: " + evt.data); | 
    
    |  |  | return; | 
    
    |  |  | } | 
    
    |  |  | 
 | 
    
    |  |  | var msg = struct.clone(); | 
    
    |  |  | if (action == 'message') { | 
    
    |  |  | var matches; | 
    
    |  |  | if (matches = obj['message'].match(/^\s*[\/\\]me\s(.*)/)) { | 
    
    |  |  | msg.find('.user').text(obj['user'] + ' ' + matches[1]); | 
    
    |  |  | msg.find('.user').css('font-weight', 'bold'); | 
    
    |  |  | } else { | 
    
    |  |  | msg.find('.user').text(obj['user']); | 
    
    |  |  | msg.find('.message').text(': ' + obj['message']); | 
    
    |  |  | } | 
    
    |  |  | } else if (action == 'control') { | 
    
    |  |  | msg.find('.user').text(obj['user']); | 
    
    |  |  | msg.find('.message').text(obj['message']); | 
    
    |  |  | msg.addClass('control'); | 
    
    |  |  | } | 
    
    |  |  | 
 | 
    
    |  |  | if (obj['user'] == name) msg.find('.user').addClass('self'); | 
    
    |  |  | container.find('ul').append(msg.show()); | 
    
    |  |  | container.scrollTop(container.find('ul').innerHeight()); | 
    
    |  |  | } | 
    
    |  |  | 
 | 
    
    |  |  | $('#channel form').submit(function(event) { | 
    
    |  |  | event.preventDefault(); | 
    
    |  |  | var input = $(this).find(':input'); | 
    
    |  |  | var msg = input.val(); | 
    
    |  |  | ws.send($.toJSON({ action: 'message', message: msg })); | 
    
    |  |  | input.val(''); | 
    
    |  |  | }); | 
    
    |  |  | 
 | 
    
    |  |  | // send name when joining | 
    
    |  |  | ws.onopen = function() { | 
    
    |  |  | ws.send($.toJSON({ action: 'join', user: name })); | 
    
    |  |  | } | 
    
    |  |  | } | 
    
    |  |  | }); | 
    
    |  |  | </script> | 
    
    |  |  | <style type="text/css" media="screen"> | 
    
    |  |  | * { | 
    
    |  |  | font-family: Georgia; | 
    
    |  |  | } | 
    
    |  |  | a { | 
    
    |  |  | color: #000; | 
    
    |  |  | text-decoration: none; | 
    
    |  |  | } | 
    
    |  |  | a:hover { | 
    
    |  |  | text-decoration: underline; | 
    
    |  |  | } | 
    
    |  |  | div.bordered { | 
    
    |  |  | margin: 0 auto; | 
    
    |  |  | margin-top: 100px; | 
    
    |  |  | width: 600px; | 
    
    |  |  | padding: 20px; | 
    
    |  |  | text-align: center; | 
    
    |  |  | border: 10px solid #ddd; | 
    
    |  |  | -webkit-border-radius: 20px; | 
    
    |  |  | } | 
    
    |  |  | #error { | 
    
    |  |  | background-color: #BA0000; | 
    
    |  |  | color: #fff; | 
    
    |  |  | font-weight: bold; | 
    
    |  |  | } | 
    
    |  |  | #ask { | 
    
    |  |  | font-size: 20pt; | 
    
    |  |  | } | 
    
    |  |  | #ask input { | 
    
    |  |  | font-size: 20pt; | 
    
    |  |  | padding: 10px; | 
    
    |  |  | margin: 0 10px; | 
    
    |  |  | } | 
    
    |  |  | #ask span.join { | 
    
    |  |  | padding: 10px; | 
    
    |  |  | background-color: #ddd; | 
    
    |  |  | -webkit-border-radius: 10px; | 
    
    |  |  | } | 
    
    |  |  | #channel { | 
    
    |  |  | margin-top: 20px; | 
    
    |  |  | height: 480px; | 
    
    |  |  | } | 
    
    |  |  | div#msgs { | 
    
    |  |  | overflow-y: scroll; | 
    
    |  |  | height: 400px; | 
    
    |  |  | } | 
    
    |  |  | div#msgs ul { | 
    
    |  |  | list-style: none; | 
    
    |  |  | padding: 0; | 
    
    |  |  | margin: 0; | 
    
    |  |  | text-align: left; | 
    
    |  |  | } | 
    
    |  |  | div#msgs li { | 
    
    |  |  | line-height: 20px; | 
    
    |  |  | } | 
    
    |  |  | div#msgs li span.user { | 
    
    |  |  | color: #ff9900; | 
    
    |  |  | } | 
    
    |  |  | div#msgs li span.user.self { | 
    
    |  |  | color: #aa2211; | 
    
    |  |  | } | 
    
    |  |  | div#msgs li.control { | 
    
    |  |  | text-align: center; | 
    
    |  |  | } | 
    
    |  |  | div#msgs li.control span.message { | 
    
    |  |  | color: #aaa; | 
    
    |  |  | } | 
    
    |  |  | div#input { | 
    
    |  |  | text-align: left; | 
    
    |  |  | margin-top: 20px; | 
    
    |  |  | } | 
    
    |  |  | div#input #message { | 
    
    |  |  | width: 600px; | 
    
    |  |  | border: 5px solid #bbb; | 
    
    |  |  | -webkit-border-radius: 3px; | 
    
    |  |  | font-size: 30pt; | 
    
    |  |  | } | 
    
    |  |  | </style> | 
    
    |  |  | </head> | 
    
    |  |  | <body> | 
    
    |  |  | <a href="http://github.com/you"> | 
    
    |  |  | <img style="position: absolute; top: 0; right: 0; border: 0;" src="http://s3.amazonaws.com/github/ribbons/forkme_right_darkblue_121621.png" alt="Fork me on GitHub" /> | 
    
    |  |  | </a> | 
    
    |  |  | <div id="error" class="bordered" style="display: none;"> | 
    
    |  |  | This browser has no native WebSocket support.<br/> | 
    
    |  |  | Use a WebKit nightly or Google Chrome. | 
    
    |  |  | </div> | 
    
    |  |  | <div id="ask" class="bordered" style="display: none;"> | 
    
    |  |  | Name: <input type="text" id="name" /> <a href="#"><span class="join">Join!</span></a> | 
    
    |  |  | </div> | 
    
    |  |  | <div id="channel" class="bordered" style="display: none;"> | 
    
    |  |  | <div id="msgs"> | 
    
    |  |  | <ul> | 
    
    |  |  | <li class="message" style="display: none"> | 
    
    |  |  | <span class="user"></span><span class="message"></span> | 
    
    |  |  | </li> | 
    
    |  |  | <li class="control" style="display: none"> | 
    
    |  |  | <span class="user"></span> <span class="message"></span> | 
    
    |  |  | </li> | 
    
    |  |  | </ul> | 
    
    |  |  | </div> | 
    
    |  |  | <div id="input"> | 
    
    |  |  | <form><input type="text" id="message" /></form> | 
    
    |  |  | </div> | 
    
    |  |  | </div> | 
    
    |  |  | </body> | 
    
    |  |  | </html> |