|
|
@@ -0,0 +1,92 @@ |
|
|
require 'rack' |
|
|
require 'sinatra/base' |
|
|
require 'uuidtools' |
|
|
require 'rest-client' |
|
|
require 'eventmachine' |
|
|
require 'json' |
|
|
require_relative './cache' |
|
|
|
|
|
class Worker < EM::Connection |
|
|
attr_reader :query |
|
|
|
|
|
def receive_data(data) |
|
|
Thread.new { |
|
|
query_id, url = JSON::parse(data) |
|
|
puts 'W: Received [%s][%s]' % [query_id, url] |
|
|
notify(query_id, {:status => 'received'}) |
|
|
result = RestClient.get url |
|
|
notify(query_id, {:status => 'processing'}) |
|
|
sleep 30 # Simulate that we're doing something for 30 seconds |
|
|
puts 'W: Processed [%s][%s]' % [query_id, url] |
|
|
notify(query_id, {:status => 'finished', :results => result.to_s}) |
|
|
} |
|
|
end |
|
|
|
|
|
private |
|
|
|
|
|
def notify(query_id, result) |
|
|
RestClient.post 'http://localhost:4567/broker/%s/results' % query_id, result.to_json, :content_type => 'text/html' |
|
|
end |
|
|
end |
|
|
|
|
|
class Broker |
|
|
|
|
|
def initialize(app, options = {}) |
|
|
@app = app |
|
|
puts "B: Starting broker" |
|
|
EM::next_tick do |
|
|
@server = EM::connect('127.0.0.1', 4000, Worker, self) |
|
|
end |
|
|
end |
|
|
|
|
|
def call(env) |
|
|
env['broker'] = @server |
|
|
@app.call(env) |
|
|
end |
|
|
|
|
|
end |
|
|
|
|
|
class App < Sinatra::Base |
|
|
|
|
|
use Rack::CommonLogger |
|
|
use Broker |
|
|
|
|
|
set server: 'thin' |
|
|
enable :xhtml |
|
|
enable :dump_errors |
|
|
enable :show_errors |
|
|
disable :show_exceptions |
|
|
|
|
|
helpers do |
|
|
def broker; env['broker']; end |
|
|
end |
|
|
|
|
|
post '/query' do |
|
|
worker_id = UUIDTools::UUID.timestamp_create.to_s |
|
|
broker.send_data([worker_id, params[:url]].to_json) |
|
|
[202, { 'Location' => url('/worker/%s/results' % worker_id) }, ''] |
|
|
end |
|
|
|
|
|
post '/broker/:id/results' do |
|
|
[201, {}, Cache[params[:id]] = request.body.read] |
|
|
end |
|
|
|
|
|
get '/broker/:id/results', provides: 'text/event-stream' do |
|
|
worker_id = params[:id] |
|
|
stream :keep_open do |out| |
|
|
timer = EventMachine::PeriodicTimer.new(1) do |
|
|
result = Cache[worker_id] || { :status => 'queued' } |
|
|
out << result |
|
|
timer.cancel if JSON::parse(result)['status'] == 'finished' |
|
|
end |
|
|
out.callback { timer.cancel } |
|
|
out.errback { timer.cancel } |
|
|
end |
|
|
end |
|
|
|
|
|
end |
|
|
|
|
|
EM::run { |
|
|
EventMachine::start_server '127.0.0.1', '4000', Worker |
|
|
App.run! |
|
|
} |