Skip to content

Instantly share code, notes, and snippets.

@mahesh-singh
Last active July 12, 2016 13:12
Show Gist options
  • Save mahesh-singh/9214295 to your computer and use it in GitHub Desktop.
Save mahesh-singh/9214295 to your computer and use it in GitHub Desktop.

Revisions

  1. mahesh-singh revised this gist Feb 26, 2014. 1 changed file with 121 additions and 80 deletions.
    201 changes: 121 additions & 80 deletions gistfile1.cs
    Original file line number Diff line number Diff line change
    @@ -1,111 +1,152 @@
    public void Consume(CancellationToken token)
    {
    StartConnection(_ConnectionFactory.Get());



    List<Task> tasks = new List<Task>();
    foreach (SubscriberType subscriberType in (SubscriberType[])Enum.GetValues(typeof(SubscriberType)))
    //Message subscriber implementation
    public class AuditSubscriber : IMessageSubscriber
    {
    //Start listing
    Task task = Task.Factory.StartNew(() => ConsumeMessage(subscriberType, token), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    tasks.Add(task);
    }
    public IList<string> SubscribedRouteKeys
    {
    get { return new List<string>()
    {
    "*.inceitive.attested.*"
    };
    }
    }

    Task.WhenAll(tasks);
    }
    public async Task<bool> Process(Core.MessageInfo MessageItem)
    {
    //Start new task to process the message
    bool _ProcessedResult = await Task<bool>.Factory.StartNew(() => MessageProcesser(MessageItem), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);

    async Task ConsumeMessage(SubscriberType subscriberType, CancellationToken token)
    {
    try
    {

    //Get message subscriber which will process the message
    IMessageSubscriber _MessageSubscriber = _MessageSubscriberFactory.GetMessageSubscriber(subscriberType);
    return _ProcessedResult;
    }

    using (IModel _ConsumerChannel = _Connection.CreateModel())
    protected bool MessageProcesser(MessageInfo MessageItem)
    {
    _ConsumerChannel.ExchangeDeclare(_ExchangeProperties.Name, _ExchangeProperties.Type, _ExchangeProperties.Durable);
    Thread.Sleep(1000); //Acthual work
    return true;
    }

    string _QueueName = Enum.GetName(typeof(SubscriberType), subscriberType);
    abstract protected IList<string> SetSubscribedRoutes();

    }


    _ConsumerChannel.QueueDeclare(_QueueName, _QueueProperties.Durable, _QueueProperties.Exclusive, _QueueProperties.AutoDelete, _QueueProperties.Arguments);
    public class RabbitMQMessageConsumer : AbstractRabbitMQClient, IMessageConsumer
    {

    foreach (string routeKey in _MessageSubscriber.SubscribedRouteKeys)
    //Message consumer method, which will initiate number of tasks based upon the available subscriber.
    public void Consume(CancellationToken token)
    {
    _ConsumerChannel.QueueBind(_QueueName, _ExchangeProperties.Name, routeKey);
    }


    var consumer = new QueueingBasicConsumer(_ConsumerChannel);
    _ConsumerChannel.BasicConsume(_QueueName, false, consumer);


    //Start Rabbit MQ connection
    StartConnection(_ConnectionFactory.Get());



    while (true)
    {

    if (token.IsCancellationRequested)
    List<Task> tasks = new List<Task>();
    foreach (SubscriberType subscriberType in (SubscriberType[])Enum.GetValues(typeof(SubscriberType)))
    {
    break;
    //Start listeing to all queues based upon the number of subscriber type availbale in the system
    Task task = Task.Factory.StartNew(() => ConsumeMessage(subscriberType, token), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    tasks.Add(task);
    }


    Task.WhenAll(tasks);
    }

    //Listen to queue
    async Task ConsumeMessage(SubscriberType subscriberType, CancellationToken token)
    {
    try
    {
    BasicDeliverEventArgs eventArgs;

    if (consumer.Queue.Dequeue(1000, out eventArgs))
    {

    //Get message subscriber which will process the message
    IMessageSubscriber _MessageSubscriber = _MessageSubscriberFactory.GetMessageSubscriber(subscriberType);

    using (IModel _ConsumerChannel = _Connection.CreateModel())
    {

    if (eventArgs != null)
    _ConsumerChannel.ExchangeDeclare(_ExchangeProperties.Name, _ExchangeProperties.Type, _ExchangeProperties.Durable);

    string _QueueName = Enum.GetName(typeof(SubscriberType), subscriberType);


    _ConsumerChannel.QueueDeclare(_QueueName, _QueueProperties.Durable, _QueueProperties.Exclusive, _QueueProperties.AutoDelete, _QueueProperties.Arguments);

    foreach (string routeKey in _MessageSubscriber.SubscribedRouteKeys)
    {

    MessageInfo _MessageItem = ByteArrayToMessageInfo(eventArgs.Body);


    //Message process by async method
    var messageProcesser = _MessageSubscriber.Process(_MessageItem);

    //Wait for result
    bool _MessageProcessed = await messageProcesser;


    if (_MessageProcessed)
    _ConsumerChannel.QueueBind(_QueueName, _ExchangeProperties.Name, routeKey);
    }


    var consumer = new QueueingBasicConsumer(_ConsumerChannel);
    _ConsumerChannel.BasicConsume(_QueueName, false, consumer);


    //Infinite loop to listen the queueu
    while (true)
    {

    if (token.IsCancellationRequested)
    {
    _ConsumerChannel.BasicAck(eventArgs.DeliveryTag, false);
    break;
    }

    try
    {
    BasicDeliverEventArgs eventArgs;

    //Get meesage or time out
    if (consumer.Queue.Dequeue(1000, out eventArgs))
    {

    if (eventArgs != null)
    {

    MessageInfo _MessageItem = ByteArrayToMessageInfo(eventArgs.Body);


    //Message process by async method
    var messageProcesser = _MessageSubscriber.Process(_MessageItem);

    //Wait for result
    bool _MessageProcessed = await messageProcesser;


    if (_MessageProcessed)
    {
    _ConsumerChannel.BasicAck(eventArgs.DeliveryTag, false);
    }
    else
    {

    _ConsumerChannel.BasicNack(eventArgs.DeliveryTag, false, true);
    }
    }
    else
    {
    //connnection is dead
    }

    }
    }
    else
    catch (EndOfStreamException ex)
    {

    _ConsumerChannel.BasicNack(eventArgs.DeliveryTag, false, true);
    Console.WriteLine(ex.Message);
    throw;
    }


    }
    else
    {
    //connnection is dead
    }

    }
    }
    catch (EndOfStreamException ex)
    catch (Exception ex)
    {

    Console.WriteLine(ex.Message);
    //TODO: Restart the task again
    throw;
    }



    }
    }
    }
    catch (Exception ex)
    {

    Console.WriteLine(ex.Message);
    //TODO: Restart the task again
    throw;
    }



    }
  2. mahesh-singh created this gist Feb 25, 2014.
    111 changes: 111 additions & 0 deletions gistfile1.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,111 @@
    public void Consume(CancellationToken token)
    {
    StartConnection(_ConnectionFactory.Get());



    List<Task> tasks = new List<Task>();
    foreach (SubscriberType subscriberType in (SubscriberType[])Enum.GetValues(typeof(SubscriberType)))
    {
    //Start listing
    Task task = Task.Factory.StartNew(() => ConsumeMessage(subscriberType, token), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    tasks.Add(task);
    }

    Task.WhenAll(tasks);
    }

    async Task ConsumeMessage(SubscriberType subscriberType, CancellationToken token)
    {
    try
    {

    //Get message subscriber which will process the message
    IMessageSubscriber _MessageSubscriber = _MessageSubscriberFactory.GetMessageSubscriber(subscriberType);

    using (IModel _ConsumerChannel = _Connection.CreateModel())
    {
    _ConsumerChannel.ExchangeDeclare(_ExchangeProperties.Name, _ExchangeProperties.Type, _ExchangeProperties.Durable);

    string _QueueName = Enum.GetName(typeof(SubscriberType), subscriberType);


    _ConsumerChannel.QueueDeclare(_QueueName, _QueueProperties.Durable, _QueueProperties.Exclusive, _QueueProperties.AutoDelete, _QueueProperties.Arguments);

    foreach (string routeKey in _MessageSubscriber.SubscribedRouteKeys)
    {
    _ConsumerChannel.QueueBind(_QueueName, _ExchangeProperties.Name, routeKey);
    }


    var consumer = new QueueingBasicConsumer(_ConsumerChannel);
    _ConsumerChannel.BasicConsume(_QueueName, false, consumer);



    while (true)
    {

    if (token.IsCancellationRequested)
    {
    break;
    }

    try
    {
    BasicDeliverEventArgs eventArgs;

    if (consumer.Queue.Dequeue(1000, out eventArgs))
    {

    if (eventArgs != null)
    {

    MessageInfo _MessageItem = ByteArrayToMessageInfo(eventArgs.Body);


    //Message process by async method
    var messageProcesser = _MessageSubscriber.Process(_MessageItem);

    //Wait for result
    bool _MessageProcessed = await messageProcesser;


    if (_MessageProcessed)
    {
    _ConsumerChannel.BasicAck(eventArgs.DeliveryTag, false);
    }
    else
    {

    _ConsumerChannel.BasicNack(eventArgs.DeliveryTag, false, true);
    }
    }
    else
    {
    //connnection is dead
    }

    }
    }
    catch (EndOfStreamException ex)
    {
    Console.WriteLine(ex.Message);
    throw;
    }


    }
    }
    }
    catch (Exception ex)
    {

    Console.WriteLine(ex.Message);
    //TODO: Restart the task again
    throw;
    }



    }