# rubocop:disable Style/HashSyntax # shoryuken用のタスク namespace :queues do desc 'Create default SNS & SQS' task 'setup' => :environment do # http://qiita.com/takeyuweb/items/cdc262d97c3e863c15ff # TODO end desc "List all SQS queues" task :list => :environment do queues.each { |queue_url| puts queue_url } end desc "Get details for a specific SQS Queue" task :queue, [:identifier] => :environment do |_t, args| if args.identifier.present? puts queue(args.identifier) else fail 'No SQS queue name or url was specified' end end desc "Ensure existence of a SQS Dead Letter Queue" task :dead_letter_queue => :environment do name = ENV['SQS_FAILURE_QUEUE'] fail 'SQS_FAILURE_QUEUE env variable is not set' if name.blank? if queue_names.include?(name) puts "SQS Dead Letter Queue '#{name}' exists" else sqs.create_queue(queue_name: name) puts "SQS Dead Letter Queue '#{name}' created" end end desc "Create a new SQS queue with dead job support" task :create, [:name, :retries] => :dead_letter_queue do |_t, args| fail "An SQS queue name must be specified" if args.name.blank? if queue_names.include?(name) puts "SQS Queue '#{name}' exists" else Rake::Task['queues:dead_letter_queue'].invoke retries = args.retries || 7 arn = queue(ENV['SQS_FAILURE_QUEUE'])['QueueArn'] attrs = { 'RedrivePolicy' => %({"maxReceiveCount":"#{retries}", "deadLetterTargetArn":"#{arn}"}") } sqs.create_queue(queue_name: name, attributes: attrs) puts "SQS Queue '#{name}' created" end end desc "Setup the default SQS queues for a new project" task :setup, [:retries] => :environment do |_t, args| name = ENV['SQS_DEFAULT_QUEUE'] fail 'SQS_DEFAULT_QUEUE env variable is not set' if name.blank? retries = args.retries || 7 Rake::Task['queues:create'].invoke(name, retries) end desc "Delete an SQS queue" task :delete, [:identifier] => :environment do fail 'No SQS queue name or url was specified' if args.identifier.blank? sqs.delete_queue(queue_url: queue_url(args.identifier), attributes: attrs) puts "SQS Queue '#{name}' deleted" end desc "Delete all SQS queues" task :delete_all => :environment do STDOUT.puts "Are sure you want to delete all SQS Queues? Type 'CONFIRM' to confirm:" input = STDIN.gets.chomp unless input == 'CONFIRM' fail "Aborting deletion of SQS Queues. You entered: #{input}" end queues.each do |queue| Rake::Task['queues:delete'].invoke(queue) end end def sqs @sqs ||= Aws::SQS::Client.new end def sns @sns ||= Aws::SNS::Client.new end def queues sqs.list_queues.inject([]) do |list, page| list.concat(page.queue_urls) end end def queue_names queues.map { |queue| queue.rpartition('/').last } end def queue_url(identifier) if %r{^https?://}.match(identifier) identifier else queues.find { |queue| queue.rpartition('/').last == identifier } end end def queue(identifier) sqs.get_queue_attributes(queue_url: queue_url(identifier), attribute_names: ['All']).attributes end def exist_sns_topic?(topic_name) arn = Shoryuken::SnsArn.new(topic_name).to_s sns.get_topic_attributes(arn) rescue Aws::SNS::Errors::NotFound nil end def create_topic_and_queue(identifier) return false if exist_sns_topic?(identifier) # 通知先SNSトピック作成 resp = sns.create_topic( name: identifier # required ) topic_arn = resp.topic_arn # 通知先キュー作成 resp = sqs.create_queue( queue_name: identifier, # required attributes: { ReceiveMessageWaitTimeSeconds: '20' } ) qurl = resp.queue_url # トピックへのメッセージを作成した通知先キューへ送るように購読の設定 # キューのARNを取得 resp = sqs.get_queue_attributes( queue_url: qurl, attribute_names: %w(QueueArn) ) queue_arn = resp.attributes['QueueArn'] # 取得したARNを使って購読申込 resp = sns.subscribe( topic_arn: topic_arn, # required protocol: 'sqs', # required endpoint: queue_arn ) subscription_arn = resp.subscription_arn # 今回はSNSによるメタ情報は不要なので送信したメッセージをそのままキューに送る sns.set_subscription_attributes( subscription_arn: subscription_arn, # required attribute_name: 'RawMessageDelivery', attribute_value: 'true' ) # SNSトピックからSQSキューへのメッセージの追加を許可 # http://docs.aws.amazon.com/ja_jp/sns/latest/dg/SendMessageToSQS.html#SendMessageToSQS.sqs.permissions policy = { Version: '2012-10-17', Statement: [ { Sid: 'NotificationsToSQS', Effect: 'Allow', Principal: '*', Action: 'sqs:SendMessage', Resource: queue_arn, Condition: { ArnEquals: { 'aws:SourceArn' => topic_arn } } } ] } sqs.set_queue_attributes( queue_url: qurl, attributes: { Policy: policy.to_json } ) end end