Last active
July 12, 2016 13:12
-
-
Save mahesh-singh/9214295 to your computer and use it in GitHub Desktop.
Revisions
-
mahesh-singh revised this gist
Feb 26, 2014 . 1 changed file with 121 additions and 80 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,111 +1,152 @@ //Message subscriber implementation public class AuditSubscriber : IMessageSubscriber { public IList<string> SubscribedRouteKeys { get { return new List<string>() { "*.inceitive.attested.*" }; } } public async Task<bool> Process(Core.MessageInfo MessageItem) { //Start new task to process the message bool _ProcessedResult = await Task<bool>.Factory.StartNew(() => MessageProcesser(MessageItem), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); return _ProcessedResult; } protected bool MessageProcesser(MessageInfo MessageItem) { Thread.Sleep(1000); //Acthual work return true; } abstract protected IList<string> SetSubscribedRoutes(); } public class RabbitMQMessageConsumer : AbstractRabbitMQClient, IMessageConsumer { //Message consumer method, which will initiate number of tasks based upon the available subscriber. public void Consume(CancellationToken token) { //Start Rabbit MQ connection StartConnection(_ConnectionFactory.Get()); List<Task> tasks = new List<Task>(); foreach (SubscriberType subscriberType in (SubscriberType[])Enum.GetValues(typeof(SubscriberType))) { //Start listeing to all queues based upon the number of subscriber type availbale in the system Task task = Task.Factory.StartNew(() => ConsumeMessage(subscriberType, token), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); tasks.Add(task); } Task.WhenAll(tasks); } //Listen to queue async Task ConsumeMessage(SubscriberType subscriberType, CancellationToken token) { try { //Get message subscriber which will process the message IMessageSubscriber _MessageSubscriber = _MessageSubscriberFactory.GetMessageSubscriber(subscriberType); using (IModel _ConsumerChannel = _Connection.CreateModel()) { _ConsumerChannel.ExchangeDeclare(_ExchangeProperties.Name, _ExchangeProperties.Type, _ExchangeProperties.Durable); string _QueueName = Enum.GetName(typeof(SubscriberType), subscriberType); _ConsumerChannel.QueueDeclare(_QueueName, _QueueProperties.Durable, _QueueProperties.Exclusive, _QueueProperties.AutoDelete, _QueueProperties.Arguments); foreach (string routeKey in _MessageSubscriber.SubscribedRouteKeys) { _ConsumerChannel.QueueBind(_QueueName, _ExchangeProperties.Name, routeKey); } var consumer = new QueueingBasicConsumer(_ConsumerChannel); _ConsumerChannel.BasicConsume(_QueueName, false, consumer); //Infinite loop to listen the queueu while (true) { if (token.IsCancellationRequested) { break; } try { BasicDeliverEventArgs eventArgs; //Get meesage or time out if (consumer.Queue.Dequeue(1000, out eventArgs)) { if (eventArgs != null) { MessageInfo _MessageItem = ByteArrayToMessageInfo(eventArgs.Body); //Message process by async method var messageProcesser = _MessageSubscriber.Process(_MessageItem); //Wait for result bool _MessageProcessed = await messageProcesser; if (_MessageProcessed) { _ConsumerChannel.BasicAck(eventArgs.DeliveryTag, false); } else { _ConsumerChannel.BasicNack(eventArgs.DeliveryTag, false, true); } } else { //connnection is dead } } } catch (EndOfStreamException ex) { Console.WriteLine(ex.Message); throw; } } } } catch (Exception ex) { Console.WriteLine(ex.Message); //TODO: Restart the task again throw; } } } -
mahesh-singh created this gist
Feb 25, 2014 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,111 @@ public void Consume(CancellationToken token) { StartConnection(_ConnectionFactory.Get()); List<Task> tasks = new List<Task>(); foreach (SubscriberType subscriberType in (SubscriberType[])Enum.GetValues(typeof(SubscriberType))) { //Start listing Task task = Task.Factory.StartNew(() => ConsumeMessage(subscriberType, token), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); tasks.Add(task); } Task.WhenAll(tasks); } async Task ConsumeMessage(SubscriberType subscriberType, CancellationToken token) { try { //Get message subscriber which will process the message IMessageSubscriber _MessageSubscriber = _MessageSubscriberFactory.GetMessageSubscriber(subscriberType); using (IModel _ConsumerChannel = _Connection.CreateModel()) { _ConsumerChannel.ExchangeDeclare(_ExchangeProperties.Name, _ExchangeProperties.Type, _ExchangeProperties.Durable); string _QueueName = Enum.GetName(typeof(SubscriberType), subscriberType); _ConsumerChannel.QueueDeclare(_QueueName, _QueueProperties.Durable, _QueueProperties.Exclusive, _QueueProperties.AutoDelete, _QueueProperties.Arguments); foreach (string routeKey in _MessageSubscriber.SubscribedRouteKeys) { _ConsumerChannel.QueueBind(_QueueName, _ExchangeProperties.Name, routeKey); } var consumer = new QueueingBasicConsumer(_ConsumerChannel); _ConsumerChannel.BasicConsume(_QueueName, false, consumer); while (true) { if (token.IsCancellationRequested) { break; } try { BasicDeliverEventArgs eventArgs; if (consumer.Queue.Dequeue(1000, out eventArgs)) { if (eventArgs != null) { MessageInfo _MessageItem = ByteArrayToMessageInfo(eventArgs.Body); //Message process by async method var messageProcesser = _MessageSubscriber.Process(_MessageItem); //Wait for result bool _MessageProcessed = await messageProcesser; if (_MessageProcessed) { _ConsumerChannel.BasicAck(eventArgs.DeliveryTag, false); } else { _ConsumerChannel.BasicNack(eventArgs.DeliveryTag, false, true); } } else { //connnection is dead } } } catch (EndOfStreamException ex) { Console.WriteLine(ex.Message); throw; } } } } catch (Exception ex) { Console.WriteLine(ex.Message); //TODO: Restart the task again throw; } }