Skip to content

Instantly share code, notes, and snippets.

@thecarlo
Forked from sixeyed/QueueClient.cs
Created April 28, 2019 15:33
Show Gist options
  • Save thecarlo/ade7cc4e7d0ddc60f855022c6afa739d to your computer and use it in GitHub Desktop.
Save thecarlo/ade7cc4e7d0ddc60f855022c6afa739d to your computer and use it in GitHub Desktop.

Revisions

  1. @sixeyed sixeyed created this gist Oct 11, 2013.
    122 changes: 122 additions & 0 deletions QueueClient.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,122 @@
    using Amazon;
    using Amazon.SQS;
    using Amazon.SQS.Model;
    using Amazon.SQS.Util;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;

    namespace Sixeyed.Blogging.Aws
    {
    public class QueueClient
    {
    private AmazonSQSClient _sqsClient;
    public string QueueName { get; private set; }
    internal string QueueUrl { get; private set; }
    internal string QueueArn { get; private set; }
    private Action<Message> _receiveAction;
    private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();

    public QueueClient(string queueName)
    {
    _sqsClient = new AmazonSQSClient(RegionEndpoint.EUWest1);
    QueueName = queueName;
    Ensure();
    }

    public void Ensure()
    {
    if (!Exists())
    {
    var request = new CreateQueueRequest();
    request.QueueName = QueueName;
    var response = _sqsClient.CreateQueue(request);
    QueueUrl = response.QueueUrl;
    }
    }

    public bool Exists()
    {
    var exists = false;
    var queues = _sqsClient.ListQueues();
    var matchString = string.Format("/{0}", QueueName);
    var matches = queues.QueueUrls.Where(x => x.EndsWith(QueueName));
    if (matches.Count() == 1)
    {
    exists = true;
    QueueUrl = matches.ElementAt(0);
    }
    return exists;
    }

    public void Unsubscribe()
    {
    _cancellationTokenSource.Cancel();
    }

    private async void Subscribe()
    {
    if (!_cancellationTokenSource.IsCancellationRequested)
    {
    var request = new ReceiveMessageRequest { MaxNumberOfMessages = 10 };
    request.QueueUrl = QueueUrl;
    var result = await _sqsClient.ReceiveMessageAsync(request, _cancellationTokenSource.Token);
    if (result.Messages.Count > 0)
    {
    foreach (var message in result.Messages)
    {
    if (_receiveAction != null && message != null)
    {
    _receiveAction(message);
    DeleteMessage(message.ReceiptHandle);
    }
    }
    }
    }
    if (!_cancellationTokenSource.IsCancellationRequested)
    {
    Subscribe();
    }
    }

    private DeleteMessageResponse DeleteMessage(string receiptHandle)
    {
    var request = new DeleteMessageRequest();
    request.QueueUrl = QueueUrl;
    request.ReceiptHandle = receiptHandle;
    return _sqsClient.DeleteMessage(request);
    }

    public void Subscribe(Action<Message> receiveAction)
    {
    _receiveAction = receiveAction;
    _cancellationTokenSource = new CancellationTokenSource();
    Subscribe();
    }

    public void Send(Message message)
    {
    var request = new SendMessageRequest();
    request.QueueUrl = QueueUrl;
    request.MessageBody = message.Body;
    _sqsClient.SendMessage(request);
    }

    public bool HasMessages()
    {
    var request = new GetQueueAttributesRequest
    {
    QueueUrl = QueueUrl,
    AttributeNames = new List<string>(new string[] { SQSConstants.ATTRIBUTE_APPROXIMATE_NUMBER_OF_MESSAGES})
    };
    var response = _sqsClient.GetQueueAttributes(request);
    return response.ApproximateNumberOfMessages > 0;
    }

    public bool IsListening()
    {
    return !_cancellationTokenSource.IsCancellationRequested;
    }
    }
    }