LCOV - code coverage report
Current view: top level - Cqrs/Bus - QueuedCommandBusReceiver.cs Hit Total Coverage
Test: doc-coverage.info Lines: 0 7 0.0 %
Date: 2017-07-26

          Line data    Source code
       1             : #region Copyright
       2             : // // -----------------------------------------------------------------------
       3             : // // <copyright company="cdmdotnet Limited">
       4             : // //   Copyright cdmdotnet Limited. All rights reserved.
       5             : // // </copyright>
       6             : // // -----------------------------------------------------------------------
       7             : #endregion
       8             : 
       9             : using System;
      10             : using System.Collections.Concurrent;
      11             : using System.Collections.Generic;
      12             : using System.Linq;
      13             : using System.Threading;
      14             : using Cqrs.Authentication;
      15             : using Cqrs.Commands;
      16             : using cdmdotnet.Logging;
      17             : using Cqrs.Configuration;
      18             : using Cqrs.Messages;
      19             : 
      20             : namespace Cqrs.Bus
      21             : {
      22             :         public abstract class QueuedCommandBusReceiver<TAuthenticationToken> : ICommandReceiver<TAuthenticationToken>
      23           0 :         {
      24             :                 protected static ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>> QueueTracker { get; private set; }
      25             : 
      26             :                 protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
      27             : 
      28             :                 protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
      29             : 
      30             :                 protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
      31             : 
      32             :                 protected ILogger Logger { get; private set; }
      33             : 
      34             :                 protected IConfigurationManager ConfigurationManager { get; private set; }
      35             : 
      36             :                 protected IBusHelper BusHelper { get; private set; }
      37             : 
      38             :                 protected abstract IDictionary<Type, IList<Action<IMessage>>> Routes { get; }
      39             : 
      40           0 :                 protected QueuedCommandBusReceiver(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IConfigurationManager configurationManager, IBusHelper busHelper)
      41             :                 {
      42             :                         QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>>();
      43             :                         QueueTrackerLock = new ReaderWriterLockSlim();
      44             :                         AuthenticationTokenHelper = authenticationTokenHelper;
      45             :                         CorrelationIdHelper = correlationIdHelper;
      46             :                         Logger = logger;
      47             :                         ConfigurationManager = configurationManager;
      48             :                         BusHelper = busHelper;
      49             :                 }
      50             : 
      51           0 :                 protected virtual void EnqueueCommand(string targetQueueName, ICommand<TAuthenticationToken> command)
      52             :                 {
      53             :                         var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
      54             :                         queue.Enqueue(command);
      55             :                 }
      56             : 
      57           0 :                 protected virtual void CreateQueueAndAttachListenerIfNotExist(string queueName)
      58             :                 {
      59             :                         if (!QueueTracker.ContainsKey(queueName))
      60             :                         {
      61             :                                 QueueTrackerLock.EnterWriteLock();
      62             :                                 try
      63             :                                 {
      64             :                                         if (!QueueTracker.ContainsKey(queueName))
      65             :                                         {
      66             :                                                 QueueTracker.TryAdd(queueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
      67             :                                                 new Thread(() =>
      68             :                                                 {
      69             :                                                         Thread.CurrentThread.Name = queueName;
      70             :                                                         DequeuAndProcessCommand(queueName);
      71             :                                                 }).Start();
      72             :                                         }
      73             :                                 }
      74             :                                 catch (Exception exception)
      75             :                                 {
      76             :                                         Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
      77             :                                 }
      78             :                                 finally
      79             :                                 {
      80             :                                         QueueTrackerLock.ExitWriteLock();
      81             :                                 }
      82             :                         }
      83             :                 }
      84             : 
      85           0 :                 protected virtual void DequeuAndProcessCommand(string queueName)
      86             :                 {
      87             :                         long loop = long.MinValue;
      88             :                         while (true)
      89             :                         {
      90             :                                 try
      91             :                                 {
      92             :                                         ConcurrentQueue<ICommand<TAuthenticationToken>> queue;
      93             :                                         if (QueueTracker.TryGetValue(queueName, out queue))
      94             :                                         {
      95             :                                                 while (!queue.IsEmpty)
      96             :                                                 {
      97             :                                                         ICommand<TAuthenticationToken> command;
      98             :                                                         if (queue.TryDequeue(out command))
      99             :                                                         {
     100             :                                                                 try
     101             :                                                                 {
     102             :                                                                         CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
     103             :                                                                 }
     104             :                                                                 catch (Exception exception)
     105             :                                                                 {
     106             :                                                                         Logger.LogError(string.Format("Trying to set the CorrelationId from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
     107             :                                                                 }
     108             :                                                                 try
     109             :                                                                 {
     110             :                                                                         AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
     111             :                                                                 }
     112             :                                                                 catch (Exception exception)
     113             :                                                                 {
     114             :                                                                         Logger.LogError(string.Format("Trying to set the AuthenticationToken from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
     115             :                                                                 }
     116             :                                                                 try
     117             :                                                                 {
     118             :                                                                         ReceiveCommand(command);
     119             :                                                                 }
     120             :                                                                 catch (Exception exception)
     121             :                                                                 {
     122             :                                                                         Logger.LogError(string.Format("Processing the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
     123             :                                                                         queue.Enqueue(command);
     124             :                                                                 }
     125             :                                                         }
     126             :                                                         else
     127             :                                                                 Logger.LogDebug(string.Format("Trying to dequeue a command from the queue '{0}' failed.", queueName));
     128             :                                                 }
     129             :                                         }
     130             :                                         else
     131             :                                                 Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
     132             : 
     133             :                                         if (loop++ % 5 == 0)
     134             :                                                 Thread.Yield();
     135             :                                         else
     136             :                                                 Thread.Sleep(100);
     137             :                                         if (loop == long.MaxValue)
     138             :                                                 loop = long.MinValue;
     139             :                                 }
     140             :                                 catch (Exception exception)
     141             :                                 {
     142             :                                         Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
     143             :                                 }
     144             :                         }
     145             :                 }
     146             : 
     147             :                 public int QueueCount
     148             :                 {
     149             :                         get
     150             :                         {
     151             :                                 QueueTrackerLock.EnterReadLock();
     152             :                                 try
     153             :                                 {
     154             :                                         return QueueTracker.Count;
     155             :                                 }
     156             :                                 finally
     157             :                                 {
     158             :                                         QueueTrackerLock.ExitReadLock();
     159             :                                 }
     160             :                         }
     161             :                 }
     162             : 
     163             :                 public ICollection<string> QueueNames
     164             :                 {
     165             :                         get
     166             :                         {
     167             :                                 QueueTrackerLock.EnterReadLock();
     168             :                                 try
     169             :                                 {
     170             :                                         return QueueTracker.Keys;
     171             :                                 }
     172             :                                 finally
     173             :                                 {
     174             :                                         QueueTrackerLock.ExitReadLock();
     175             :                                 }
     176             :                         }
     177             :                 }
     178             : 
     179           0 :                 public virtual bool? ReceiveCommand(ICommand<TAuthenticationToken> command)
     180             :                 {
     181             :                         CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
     182             :                         AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
     183             : 
     184             :                         Type commandType = command.GetType();
     185             : 
     186             :                         bool response = true;
     187             :                         bool isRequired = BusHelper.IsEventRequired(commandType);
     188             : 
     189             :                         IList<Action<IMessage>> handlers;
     190             :                         if (Routes.TryGetValue(commandType, out handlers))
     191             :                         {
     192             :                                 if (handlers != null)
     193             :                                 {
     194             :                                         if (handlers.Count != 1)
     195             :                                                 throw new MultipleCommandHandlersRegisteredException(commandType);
     196             :                                         if (handlers.Count == 1)
     197             :                                                 handlers.Single()(command);
     198             :                                         else if (isRequired)
     199             :                                                 throw new NoCommandHandlerRegisteredException(commandType);
     200             :                                         else
     201             :                                                 response = false;
     202             :                                 }
     203             :                                 else if (isRequired)
     204             :                                         throw new NoCommandHandlerRegisteredException(commandType);
     205             :                                 else
     206             :                                         response = false;
     207             :                         }
     208             :                         else if (isRequired)
     209             :                                 throw new NoCommandHandlerRegisteredException(commandType);
     210             :                         else
     211             :                                 response = false;
     212             :                         return response;
     213             :                 }
     214             : 
     215             :                 #region Implementation of ICommandReceiver
     216             : 
     217           0 :                 public abstract void Start();
     218             : 
     219             :                 #endregion
     220             :         }
     221             : }

Generated by: LCOV version 1.10