using Amazon.SQS; using Amazon.SQS.Model; using Utility.Environment; using Utility.Queue.Handler; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utility.Queue.Subscribers { public class SQSQueueSubscriber { private readonly IEnvironmentRequester EnvironmentRequester; private readonly IServiceProvider ServiceProvider; private readonly string QueueBaseUrl; private readonly string AccessKey; private readonly string SecretAccessKey; private readonly AmazonSQSConfig SQSConfig; private string QueueUrl; public SQSSubscriber(IEnvironmentRequester environmentRequester, IServiceProvider serviceProvider) { AccessKey = environmentRequester.GetVariable("AWSAccessKeyId"); SecretAccessKey = environmentRequester.GetVariable("AWSSecretAccessKey"); QueueBaseUrl = environmentRequester.GetVariable("SQSEndpointBase"); SQSConfig = new AmazonSQSConfig { // Locally /queue will be added for elasticmq, remove. Live urls will never have /queue. ServiceURL = QueueBaseUrl.Replace("/queue", "") }; EnvironmentRequester = environmentRequester; ServiceProvider = serviceProvider; } public async Task SetQueueName(string queueName) { QueueUrl = QueueBaseUrl + "/" + queueName; using (var client = new AmazonSQSClient(AccessKey, SecretAccessKey, SQSConfig)) { CreateQueueRequest createQueueRequest = new CreateQueueRequest { QueueName = queueName }; await client.CreateQueueAsync(createQueueRequest); } } public async Task Poll(IServiceProvider serviceProvider, CancellationToken stoppingToken) where T : IMessageHandler { using (var client = new AmazonSQSClient(AccessKey, SecretAccessKey, SQSConfig)) { while (!stoppingToken.IsCancellationRequested) { string receiptHandle = ""; try { var request = new ReceiveMessageRequest { QueueUrl = QueueUrl, MaxNumberOfMessages = 1, WaitTimeSeconds = 20 }; var response = await client.ReceiveMessageAsync(request); if (response.Messages.Count > 0) { var sqsMessage = response.Messages.FirstOrDefault(); receiptHandle = sqsMessage.ReceiptHandle; var caller = new HandlerCaller(EnvironmentRequester, ServiceProvider); Task task = Task.Run(() => caller.CallHandler(sqsMessage.MessageId, sqsMessage.Body)); await client.DeleteMessageAsync(QueueUrl, sqsMessage.ReceiptHandle); } } catch (Exception ex) { Console.WriteLine(ex.ToString()); if (!string.IsNullOrEmpty(receiptHandle)) { await client.DeleteMessageAsync(QueueUrl, receiptHandle); } } } } } } }