Documentation Coverage Report
Current view: top level - Cqrs/Bus - QueuedCommandBusReceiver.cs Hit Total Coverage
Version: 2.2 Artefacts: 7 7 100.0 %
Date: 2017-09-22

          Line data    Source code
       1             : #region Copyright
       2             : // // -----------------------------------------------------------------------
       3             : // // <copyright company="Chinchilla Software Limited">
       4             : // //   Copyright Chinchilla Software Limited. All rights reserved.
       5             : // // </copyright>
       6             : // // -----------------------------------------------------------------------
       7             : #endregion
       8             : 
       9             : using System;
      10             : using System.Collections.Concurrent;
      11             : using System.Collections.Generic;
      12             : using System.Linq;
      13             : using System.Threading;
      14             : using Cqrs.Authentication;
      15             : using Cqrs.Commands;
      16             : using cdmdotnet.Logging;
      17             : using Cqrs.Configuration;
      18             : using Cqrs.Exceptions;
      19             : using Cqrs.Messages;
      20             : 
      21             : namespace Cqrs.Bus
      22             : {
      23             :         /// <summary>
      24             :         /// Receives instances of a <see cref="ICommand{TAuthenticationToken}"/> from the command bus, places them into one of several internal concurrent queues and then processes the commands one at a time per queue.
      25             :         /// </summary>
      26             :         /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of authentication token.</typeparam>
      27             :         public abstract class QueuedCommandBusReceiver<TAuthenticationToken> : ICommandReceiver<TAuthenticationToken>
      28           1 :         {
      29             :                 /// <summary>
      30             :                 /// The queues keyed by an identifier.
      31             :                 /// </summary>
      32             :                 protected static ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>> QueueTracker { get; private set; }
      33             : 
      34             :                 /// <summary>
      35             :                 /// A <see cref="ReaderWriterLockSlim"/> for providing a lock mechanism around the main <see cref="QueueTracker"/>.
      36             :                 /// </summary>
      37             :                 protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
      38             : 
      39             :                 /// <summary>
      40             :                 /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>
      41             :                 /// </summary>
      42             :                 protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
      43             : 
      44             :                 /// <summary>
      45             :                 /// Gets or sets the <see cref="ICorrelationIdHelper"/>
      46             :                 /// </summary>
      47             :                 protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
      48             : 
      49             :                 /// <summary>
      50             :                 /// Gets or sets the <see cref="ILogger"/>
      51             :                 /// </summary>
      52             :                 protected ILogger Logger { get; private set; }
      53             : 
      54             :                 /// <summary>
      55             :                 /// Gets or sets the <see cref="IConfigurationManager"/>
      56             :                 /// </summary>
      57             :                 protected IConfigurationManager ConfigurationManager { get; private set; }
      58             : 
      59             :                 /// <summary>
      60             :                 /// Gets or sets the <see cref="IBusHelper"/>
      61             :                 /// </summary>
      62             :                 protected IBusHelper BusHelper { get; private set; }
      63             : 
      64             :                 /// <summary>
      65             :                 /// Gets or sets the routes or handlers that will be executed as the commands arrive.
      66             :                 /// </summary>
      67             :                 protected abstract IDictionary<Type, IList<Action<IMessage>>> Routes { get; }
      68             : 
      69             :                 /// <summary>
      70             :                 /// Instantiates a new instance of <see cref="QueuedCommandBusReceiver{TAuthenticationToken}"/>.
      71             :                 /// </summary>
      72           1 :                 protected QueuedCommandBusReceiver(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IConfigurationManager configurationManager, IBusHelper busHelper)
      73             :                 {
      74             :                         QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>>();
      75             :                         QueueTrackerLock = new ReaderWriterLockSlim();
      76             :                         AuthenticationTokenHelper = authenticationTokenHelper;
      77             :                         CorrelationIdHelper = correlationIdHelper;
      78             :                         Logger = logger;
      79             :                         ConfigurationManager = configurationManager;
      80             :                         BusHelper = busHelper;
      81             :                 }
      82             : 
      83             :                 /// <summary>
      84             :                 /// Places the provided <paramref name="command"/> into the appropriate queue in the <see cref="QueueTracker"/>.
      85             :                 /// </summary>
      86             :                 /// <param name="targetQueueName">The name of the target queue to place the command into</param>
      87             :                 /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to handle.</param>
      88           1 :                 protected virtual void EnqueueCommand(string targetQueueName, ICommand<TAuthenticationToken> command)
      89             :                 {
      90             :                         var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
      91             :                         queue.Enqueue(command);
      92             :                 }
      93             : 
      94             :                 /// <summary>
      95             :                 /// Checks if the queue exists, if it doesn't it creates a new queue in <see cref="QueueTracker"/> and then starts a separate <see cref="Thread"/> running <see cref="DequeuAndProcessCommand"/>.
      96             :                 /// </summary>
      97             :                 /// <param name="queueName">The name of the queue.</param>
      98           1 :                 protected virtual void CreateQueueAndAttachListenerIfNotExist(string queueName)
      99             :                 {
     100             :                         if (!QueueTracker.ContainsKey(queueName))
     101             :                         {
     102             :                                 QueueTrackerLock.EnterWriteLock();
     103             :                                 try
     104             :                                 {
     105             :                                         if (!QueueTracker.ContainsKey(queueName))
     106             :                                         {
     107             :                                                 QueueTracker.TryAdd(queueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
     108             :                                                 new Thread(() =>
     109             :                                                 {
     110             :                                                         Thread.CurrentThread.Name = queueName;
     111             :                                                         DequeuAndProcessCommand(queueName);
     112             :                                                 }).Start();
     113             :                                         }
     114             :                                 }
     115             :                                 catch (Exception exception)
     116             :                                 {
     117             :                                         Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
     118             :                                 }
     119             :                                 finally
     120             :                                 {
     121             :                                         QueueTrackerLock.ExitWriteLock();
     122             :                                 }
     123             :                         }
     124             :                 }
     125             : 
     126             :                 /// <summary>
     127             :                 /// Infinitely runs a loop checking if the queue exists in <see cref="QueueTracker"/>
     128             :                 /// and then dequeues <see cref="ICommand{TAuthenticationToken}"/> one at a time, pausing for 0.1 seconds between loops.
     129             :                 /// </summary>
     130             :                 /// <param name="queueName">The name of the queue.</param>
     131           1 :                 protected virtual void DequeuAndProcessCommand(string queueName)
     132             :                 {
     133             :                         long loop = long.MinValue;
     134             :                         while (true)
     135             :                         {
     136             :                                 try
     137             :                                 {
     138             :                                         ConcurrentQueue<ICommand<TAuthenticationToken>> queue;
     139             :                                         if (QueueTracker.TryGetValue(queueName, out queue))
     140             :                                         {
     141             :                                                 while (!queue.IsEmpty)
     142             :                                                 {
     143             :                                                         ICommand<TAuthenticationToken> command;
     144             :                                                         if (queue.TryDequeue(out command))
     145             :                                                         {
     146             :                                                                 try
     147             :                                                                 {
     148             :                                                                         CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
     149             :                                                                 }
     150             :                                                                 catch (Exception exception)
     151             :                                                                 {
     152             :                                                                         Logger.LogError(string.Format("Trying to set the CorrelationId from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
     153             :                                                                 }
     154             :                                                                 try
     155             :                                                                 {
     156             :                                                                         AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
     157             :                                                                 }
     158             :                                                                 catch (Exception exception)
     159             :                                                                 {
     160             :                                                                         Logger.LogError(string.Format("Trying to set the AuthenticationToken from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
     161             :                                                                 }
     162             :                                                                 try
     163             :                                                                 {
     164             :                                                                         ReceiveCommand(command);
     165             :                                                                 }
     166             :                                                                 catch (Exception exception)
     167             :                                                                 {
     168             :                                                                         Logger.LogError(string.Format("Processing the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
     169             :                                                                         queue.Enqueue(command);
     170             :                                                                 }
     171             :                                                         }
     172             :                                                         else
     173             :                                                                 Logger.LogDebug(string.Format("Trying to dequeue a command from the queue '{0}' failed.", queueName));
     174             :                                                 }
     175             :                                         }
     176             :                                         else
     177             :                                                 Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
     178             : 
     179             :                                         if (loop++ % 5 == 0)
     180             :                                                 Thread.Yield();
     181             :                                         else
     182             :                                                 Thread.Sleep(100);
     183             :                                         if (loop == long.MaxValue)
     184             :                                                 loop = long.MinValue;
     185             :                                 }
     186             :                                 catch (Exception exception)
     187             :                                 {
     188             :                                         Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
     189             :                                 }
     190             :                         }
     191             :                 }
     192             : 
     193             :                 /// <summary>
     194             :                 /// The current number of queues in <see cref="QueueTracker"/>.
     195             :                 /// </summary>
     196             :                 public int QueueCount
     197             :                 {
     198             :                         get
     199             :                         {
     200             :                                 QueueTrackerLock.EnterReadLock();
     201             :                                 try
     202             :                                 {
     203             :                                         return QueueTracker.Count;
     204             :                                 }
     205             :                                 finally
     206             :                                 {
     207             :                                         QueueTrackerLock.ExitReadLock();
     208             :                                 }
     209             :                         }
     210             :                 }
     211             : 
     212             :                 /// <summary>
     213             :                 /// Gets the names of all queues in <see cref="QueueTracker"/>.
     214             :                 /// </summary>
     215             :                 public ICollection<string> QueueNames
     216             :                 {
     217             :                         get
     218             :                         {
     219             :                                 QueueTrackerLock.EnterReadLock();
     220             :                                 try
     221             :                                 {
     222             :                                         return QueueTracker.Keys;
     223             :                                 }
     224             :                                 finally
     225             :                                 {
     226             :                                         QueueTrackerLock.ExitReadLock();
     227             :                                 }
     228             :                         }
     229             :                 }
     230             : 
     231             :                 #region Implementation of ICommandReceiver<TAuthenticationToken>
     232             : 
     233             :                 /// <summary>
     234             :                 /// Receives a <see cref="ICommand{TAuthenticationToken}"/> from the command bus.
     235             :                 /// </summary>
     236           1 :                 public virtual bool? ReceiveCommand(ICommand<TAuthenticationToken> command)
     237             :                 {
     238             :                         CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
     239             :                         AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
     240             : 
     241             :                         Type commandType = command.GetType();
     242             : 
     243             :                         bool response = true;
     244             :                         bool isRequired = BusHelper.IsEventRequired(commandType);
     245             : 
     246             :                         IList<Action<IMessage>> handlers;
     247             :                         if (Routes.TryGetValue(commandType, out handlers))
     248             :                         {
     249             :                                 if (handlers != null)
     250             :                                 {
     251             :                                         if (handlers.Count != 1)
     252             :                                                 throw new MultipleCommandHandlersRegisteredException(commandType);
     253             :                                         if (handlers.Count == 1)
     254             :                                                 handlers.Single()(command);
     255             :                                         else if (isRequired)
     256             :                                                 throw new NoCommandHandlerRegisteredException(commandType);
     257             :                                         else
     258             :                                                 response = false;
     259             :                                 }
     260             :                                 else if (isRequired)
     261             :                                         throw new NoCommandHandlerRegisteredException(commandType);
     262             :                                 else
     263             :                                         response = false;
     264             :                         }
     265             :                         else if (isRequired)
     266             :                                 throw new NoCommandHandlerRegisteredException(commandType);
     267             :                         else
     268             :                                 response = false;
     269             :                         return response;
     270             :                 }
     271             : 
     272             :                 #endregion
     273             : 
     274             :                 #region Implementation of ICommandReceiver
     275             : 
     276             :                 /// <summary>
     277             :                 /// Starts listening and processing instances of <see cref="ICommand{TAuthenticationToken}"/> from the command bus.
     278             :                 /// </summary>
     279           1 :                 public abstract void Start();
     280             : 
     281             :                 #endregion
     282             :         }
     283             : }

Generated by: LCOV version 1.10