Skip to content

Instantly share code, notes, and snippets.

@calebickler
Last active August 1, 2022 16:00
Show Gist options
  • Save calebickler/899b577088b8e4189dd3a8e3ba07ebaa to your computer and use it in GitHub Desktop.
Save calebickler/899b577088b8e4189dd3a8e3ba07ebaa to your computer and use it in GitHub Desktop.

Revisions

  1. Cale Bickler revised this gist Jun 23, 2019. No changes.
  2. Cale Bickler revised this gist Jun 23, 2019. 2 changed files with 1 addition and 17 deletions.
    16 changes: 0 additions & 16 deletions IQueueSubscriber.cs
    Original file line number Diff line number Diff line change
    @@ -1,16 +0,0 @@
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Models.Message;
    using Utility.Queue.Handler;

    namespace Utility.Queue.Subscribers
    {
    public interface IQueueSubscriber
    {
    Task SetQueueName(string queueName);
    Task Poll<T, U>(IServiceProvider serviceProvider, CancellationToken stoppingToken) where T : IMessageHandler<U>;
    }
    }
    2 changes: 1 addition & 1 deletion SQSQueueSubscriber.cs
    Original file line number Diff line number Diff line change
    @@ -11,7 +11,7 @@

    namespace Utility.Queue.Subscribers
    {
    public class SQSQueueSubscriber : IQueueSubscriber
    public class SQSQueueSubscriber
    {
    private readonly IEnvironmentRequester EnvironmentRequester;
    private readonly IServiceProvider ServiceProvider;
  3. Cale Bickler revised this gist Jun 23, 2019. 2 changed files with 0 additions and 84 deletions.
    13 changes: 0 additions & 13 deletions IMessageHandler.cs
    Original file line number Diff line number Diff line change
    @@ -1,13 +0,0 @@
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Models.Message;

    namespace Utility.Queue.Handler
    {
    public interface IMessageHandler<T>
    {
    Task<string> HandleMessage(int portalId, T message);
    }
    }
    71 changes: 0 additions & 71 deletions MessageHandlerCaller.cs
    Original file line number Diff line number Diff line change
    @@ -1,71 +0,0 @@
    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Text;
    using System.Threading.Tasks;
    using Models.Message;
    using Utility.Environment;
    using Utility.Logging;
    using Utility.Queue.Handler;
    using Utility.RESTControllers.Filters;
    using Microsoft.Extensions.DependencyInjection;
    using Newtonsoft.Json;

    namespace Utility.Queue.Subscribers
    {
    public class MessageHandlerCaller
    {
    private readonly IServiceProvider ServiceProvider;
    private readonly string ServiceName;

    public HandlerCaller(IEnvironmentRequester environmentRequester,
    IServiceProvider serviceProvider)
    {
    ServiceName = environmentRequester.GetVariable("ServiceName");
    ServiceProvider = serviceProvider;
    }

    public async Task CallHandler<T, U>(string messageId, string body) where T : IMessageHandler<U>
    {
    Stopwatch stopWatch = Stopwatch.StartNew();

    U message = JsonConvert.DeserializeObject<U>(body);

    var messageBase = message as MessageBase;

    int responseCode = 200;
    string result;

    try
    {
    var handler = ActivatorUtilities.CreateInstance<T>(ServiceProvider);
    result = await handler.HandleMessage(messageBase.PortalId, message);
    }
    catch (Exception ex)
    {
    var exceptionResponse = ExceptionFilter.GetExceptionResponse(ex, out responseCode);

    result = JsonConvert.SerializeObject(exceptionResponse);
    }

    stopWatch.Stop();

    var logger = new Logger
    {
    Service = ServiceName.ToLower(),
    RequestTime = stopWatch.ElapsedMilliseconds,
    Action = $"{ServiceName.ToLower()}_{typeof(T).Name.ToLower()}",
    Method = "queue",
    IPAddress = "",
    RequestBody = body,
    PortalId = messageBase.PortalId,
    ContainerId = System.Environment.MachineName,
    RequestId = messageId,
    ResponseCode = responseCode,
    Response = result,
    };

    logger.Log();
    }
    }
    }
  4. Cale Bickler revised this gist Jun 23, 2019. No changes.
  5. Cale Bickler created this gist Jun 23, 2019.
    13 changes: 13 additions & 0 deletions IMessageHandler.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,13 @@
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Models.Message;

    namespace Utility.Queue.Handler
    {
    public interface IMessageHandler<T>
    {
    Task<string> HandleMessage(int portalId, T message);
    }
    }
    16 changes: 16 additions & 0 deletions IQueueSubscriber.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,16 @@
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Models.Message;
    using Utility.Queue.Handler;

    namespace Utility.Queue.Subscribers
    {
    public interface IQueueSubscriber
    {
    Task SetQueueName(string queueName);
    Task Poll<T, U>(IServiceProvider serviceProvider, CancellationToken stoppingToken) where T : IMessageHandler<U>;
    }
    }
    71 changes: 71 additions & 0 deletions MessageHandlerCaller.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,71 @@
    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Text;
    using System.Threading.Tasks;
    using Models.Message;
    using Utility.Environment;
    using Utility.Logging;
    using Utility.Queue.Handler;
    using Utility.RESTControllers.Filters;
    using Microsoft.Extensions.DependencyInjection;
    using Newtonsoft.Json;

    namespace Utility.Queue.Subscribers
    {
    public class MessageHandlerCaller
    {
    private readonly IServiceProvider ServiceProvider;
    private readonly string ServiceName;

    public HandlerCaller(IEnvironmentRequester environmentRequester,
    IServiceProvider serviceProvider)
    {
    ServiceName = environmentRequester.GetVariable("ServiceName");
    ServiceProvider = serviceProvider;
    }

    public async Task CallHandler<T, U>(string messageId, string body) where T : IMessageHandler<U>
    {
    Stopwatch stopWatch = Stopwatch.StartNew();

    U message = JsonConvert.DeserializeObject<U>(body);

    var messageBase = message as MessageBase;

    int responseCode = 200;
    string result;

    try
    {
    var handler = ActivatorUtilities.CreateInstance<T>(ServiceProvider);
    result = await handler.HandleMessage(messageBase.PortalId, message);
    }
    catch (Exception ex)
    {
    var exceptionResponse = ExceptionFilter.GetExceptionResponse(ex, out responseCode);

    result = JsonConvert.SerializeObject(exceptionResponse);
    }

    stopWatch.Stop();

    var logger = new Logger
    {
    Service = ServiceName.ToLower(),
    RequestTime = stopWatch.ElapsedMilliseconds,
    Action = $"{ServiceName.ToLower()}_{typeof(T).Name.ToLower()}",
    Method = "queue",
    IPAddress = "",
    RequestBody = body,
    PortalId = messageBase.PortalId,
    ContainerId = System.Environment.MachineName,
    RequestId = messageId,
    ResponseCode = responseCode,
    Response = result,
    };

    logger.Log();
    }
    }
    }
    36 changes: 36 additions & 0 deletions QueueSubscriberService.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,36 @@
    using Utility.Environment;
    using Utility.Queue.Handler;
    using Utility.Queue.Subscribers;
    using Microsoft.Extensions.Hosting;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;

    namespace Utility.Queue
    {
    public class QueueSubscriberService<T, U> : BackgroundService where T : IMessageHandler<U>
    {
    private readonly IServiceProvider ServiceProvider;
    private readonly IEnvironmentRequester EnvironmentRequester;

    public QueueSubscriber(IServiceProvider serviceProvider,
    IEnvironmentRequester environmentRequester)
    {
    ServiceProvider = serviceProvider;
    EnvironmentRequester = environmentRequester;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
    var subscriber = new SQSSubscriber(EnvironmentRequester, ServiceProvider);

    var nameResolver = new NameResolver(EnvironmentRequester);

    await subscriber.SetQueueName(nameResolver.GetQueueName<U>());

    await subscriber.PollSQS<T, U>(ServiceProvider, stoppingToken);
    }
    }
    }
    102 changes: 102 additions & 0 deletions SQSQueueSubscriber.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,102 @@
    using Amazon.SQS;
    using Amazon.SQS.Model;
    using Utility.Environment;
    using Utility.Queue.Handler;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;

    namespace Utility.Queue.Subscribers
    {
    public class SQSQueueSubscriber : IQueueSubscriber
    {
    private readonly IEnvironmentRequester EnvironmentRequester;
    private readonly IServiceProvider ServiceProvider;
    private readonly string QueueBaseUrl;
    private readonly string AccessKey;
    private readonly string SecretAccessKey;
    private readonly AmazonSQSConfig SQSConfig;
    private string QueueUrl;

    public SQSSubscriber(IEnvironmentRequester environmentRequester,
    IServiceProvider serviceProvider)
    {
    AccessKey = environmentRequester.GetVariable("AWSAccessKeyId");
    SecretAccessKey = environmentRequester.GetVariable("AWSSecretAccessKey");

    QueueBaseUrl = environmentRequester.GetVariable("SQSEndpointBase");

    SQSConfig = new AmazonSQSConfig
    {
    // Locally /queue will be added for elasticmq, remove. Live urls will never have /queue.
    ServiceURL = QueueBaseUrl.Replace("/queue", "")
    };

    EnvironmentRequester = environmentRequester;
    ServiceProvider = serviceProvider;
    }

    public async Task SetQueueName(string queueName)
    {
    QueueUrl = QueueBaseUrl + "/" + queueName;

    using (var client = new AmazonSQSClient(AccessKey, SecretAccessKey, SQSConfig))
    {
    CreateQueueRequest createQueueRequest = new CreateQueueRequest
    {
    QueueName = queueName
    };

    await client.CreateQueueAsync(createQueueRequest);
    }
    }

    public async Task Poll<T, U>(IServiceProvider serviceProvider, CancellationToken stoppingToken) where T : IMessageHandler<U>
    {
    using (var client = new AmazonSQSClient(AccessKey, SecretAccessKey, SQSConfig))
    {
    while (!stoppingToken.IsCancellationRequested)
    {
    string receiptHandle = "";

    try
    {
    var request = new ReceiveMessageRequest
    {
    QueueUrl = QueueUrl,
    MaxNumberOfMessages = 1,
    WaitTimeSeconds = 20
    };

    var response = await client.ReceiveMessageAsync(request);

    if (response.Messages.Count > 0)
    {
    var sqsMessage = response.Messages.FirstOrDefault();

    receiptHandle = sqsMessage.ReceiptHandle;

    var caller = new HandlerCaller(EnvironmentRequester, ServiceProvider);

    Task task = Task.Run(() => caller.CallHandler<T, U>(sqsMessage.MessageId, sqsMessage.Body));

    await client.DeleteMessageAsync(QueueUrl, sqsMessage.ReceiptHandle);
    }
    }
    catch (Exception ex)
    {
    Console.WriteLine(ex.ToString());

    if (!string.IsNullOrEmpty(receiptHandle))
    {
    await client.DeleteMessageAsync(QueueUrl, receiptHandle);
    }
    }
    }
    }
    }
    }
    }