Skip to content

Instantly share code, notes, and snippets.

@saicologic
Forked from mfojtik/app.rb
Created May 14, 2012 07:16
Show Gist options
  • Select an option

  • Save saicologic/2692404 to your computer and use it in GitHub Desktop.

Select an option

Save saicologic/2692404 to your computer and use it in GitHub Desktop.

Revisions

  1. Michal Fojtik created this gist Apr 25, 2012.
    92 changes: 92 additions & 0 deletions app.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,92 @@
    require 'rack'
    require 'sinatra/base'
    require 'uuidtools'
    require 'rest-client'
    require 'eventmachine'
    require 'json'
    require_relative './cache'

    class Worker < EM::Connection
    attr_reader :query

    def receive_data(data)
    Thread.new {
    query_id, url = JSON::parse(data)
    puts 'W: Received [%s][%s]' % [query_id, url]
    notify(query_id, {:status => 'received'})
    result = RestClient.get url
    notify(query_id, {:status => 'processing'})
    sleep 30 # Simulate that we're doing something for 30 seconds
    puts 'W: Processed [%s][%s]' % [query_id, url]
    notify(query_id, {:status => 'finished', :results => result.to_s})
    }
    end

    private

    def notify(query_id, result)
    RestClient.post 'http://localhost:4567/broker/%s/results' % query_id, result.to_json, :content_type => 'text/html'
    end
    end

    class Broker

    def initialize(app, options = {})
    @app = app
    puts "B: Starting broker"
    EM::next_tick do
    @server = EM::connect('127.0.0.1', 4000, Worker, self)
    end
    end

    def call(env)
    env['broker'] = @server
    @app.call(env)
    end

    end

    class App < Sinatra::Base

    use Rack::CommonLogger
    use Broker

    set server: 'thin'
    enable :xhtml
    enable :dump_errors
    enable :show_errors
    disable :show_exceptions

    helpers do
    def broker; env['broker']; end
    end

    post '/query' do
    worker_id = UUIDTools::UUID.timestamp_create.to_s
    broker.send_data([worker_id, params[:url]].to_json)
    [202, { 'Location' => url('/worker/%s/results' % worker_id) }, '']
    end

    post '/broker/:id/results' do
    [201, {}, Cache[params[:id]] = request.body.read]
    end

    get '/broker/:id/results', provides: 'text/event-stream' do
    worker_id = params[:id]
    stream :keep_open do |out|
    timer = EventMachine::PeriodicTimer.new(1) do
    result = Cache[worker_id] || { :status => 'queued' }
    out << result
    timer.cancel if JSON::parse(result)['status'] == 'finished'
    end
    out.callback { timer.cancel }
    out.errback { timer.cancel }
    end
    end

    end

    EM::run {
    EventMachine::start_server '127.0.0.1', '4000', Worker
    App.run!
    }