using Amazon; using Amazon.SQS; using Amazon.SQS.Model; using Amazon.SQS.Util; using System; using System.Collections.Generic; using System.Linq; using System.Threading; namespace Sixeyed.Blogging.Aws { public class QueueClient { private AmazonSQSClient _sqsClient; public string QueueName { get; private set; } internal string QueueUrl { get; private set; } internal string QueueArn { get; private set; } private Action _receiveAction; private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); public QueueClient(string queueName) { _sqsClient = new AmazonSQSClient(RegionEndpoint.EUWest1); QueueName = queueName; Ensure(); } public void Ensure() { if (!Exists()) { var request = new CreateQueueRequest(); request.QueueName = QueueName; var response = _sqsClient.CreateQueue(request); QueueUrl = response.QueueUrl; } } public bool Exists() { var exists = false; var queues = _sqsClient.ListQueues(); var matchString = string.Format("/{0}", QueueName); var matches = queues.QueueUrls.Where(x => x.EndsWith(QueueName)); if (matches.Count() == 1) { exists = true; QueueUrl = matches.ElementAt(0); } return exists; } public void Unsubscribe() { _cancellationTokenSource.Cancel(); } private async void Subscribe() { if (!_cancellationTokenSource.IsCancellationRequested) { var request = new ReceiveMessageRequest { MaxNumberOfMessages = 10 }; request.QueueUrl = QueueUrl; var result = await _sqsClient.ReceiveMessageAsync(request, _cancellationTokenSource.Token); if (result.Messages.Count > 0) { foreach (var message in result.Messages) { if (_receiveAction != null && message != null) { _receiveAction(message); DeleteMessage(message.ReceiptHandle); } } } } if (!_cancellationTokenSource.IsCancellationRequested) { Subscribe(); } } private DeleteMessageResponse DeleteMessage(string receiptHandle) { var request = new DeleteMessageRequest(); request.QueueUrl = QueueUrl; request.ReceiptHandle = receiptHandle; return _sqsClient.DeleteMessage(request); } public void Subscribe(Action receiveAction) { _receiveAction = receiveAction; _cancellationTokenSource = new CancellationTokenSource(); Subscribe(); } public void Send(Message message) { var request = new SendMessageRequest(); request.QueueUrl = QueueUrl; request.MessageBody = message.Body; _sqsClient.SendMessage(request); } public bool HasMessages() { var request = new GetQueueAttributesRequest { QueueUrl = QueueUrl, AttributeNames = new List(new string[] { SQSConstants.ATTRIBUTE_APPROXIMATE_NUMBER_OF_MESSAGES}) }; var response = _sqsClient.GetQueueAttributes(request); return response.ApproximateNumberOfMessages > 0; } public bool IsListening() { return !_cancellationTokenSource.IsCancellationRequested; } } }