|           Line data    Source code 
       1             : #region Copyright
       2             : // // -----------------------------------------------------------------------
       3             : // // <copyright company="Chinchilla Software Limited">
       4             : // //   Copyright Chinchilla Software Limited. All rights reserved.
       5             : // // </copyright>
       6             : // // -----------------------------------------------------------------------
       7             : #endregion
       8             : 
       9             : using System;
      10             : using System.Collections.Concurrent;
      11             : using System.Collections.Generic;
      12             : using System.Diagnostics;
      13             : using System.Linq;
      14             : using System.Threading;
      15             : using System.Threading.Tasks;
      16             : using Chinchilla.Logging;
      17             : using Chinchilla.StateManagement;
      18             : using Cqrs.Authentication;
      19             : using Cqrs.Commands;
      20             : using Cqrs.Configuration;
      21             : using Cqrs.Events;
      22             : using Cqrs.Messages;
      23             : 
      24             : namespace Cqrs.Bus
      25             : {
      26             :         /// <summary>
      27             :         /// A helper for command and event buses that also caches <see cref="IConfigurationManager"/> look ups.
      28             :         /// </summary>
      29             :         public class BusHelper : IBusHelper
      30           1 :         {
      31             :                 /// <summary>
      32             :                 /// Instantiates a new instance of <see cref="BusHelper"/>
      33             :                 /// </summary>
      34           1 :                 public BusHelper(IConfigurationManager configurationManager, IContextItemCollectionFactory factory)
      35             :                 {
      36             :                         Cache = factory.GetCurrentContext();
      37             :                         ConfigurationManager = configurationManager;
      38             :                         CachedChecks = new ConcurrentDictionary<string, Tuple<bool, DateTime>>();
      39             :                         NullableCachedChecks = new ConcurrentDictionary<string, Tuple<bool?, DateTime>>();
      40             :                         bool isblackListRequired;
      41             :                         if (!ConfigurationManager.TryGetSetting("Cqrs.MessageBus.BlackListProcessing", out isblackListRequired))
      42             :                                 isblackListRequired = true;
      43             :                         EventBlackListProcessing = isblackListRequired;
      44             :                         StartRefreshCachedChecks();
      45             :                 }
      46             : 
      47             :                 /// <summary>
      48             :                 /// Gets or sets the <see cref="IConfigurationManager"/>.
      49             :                 /// </summary>
      50             :                 protected IConfigurationManager ConfigurationManager { get; private set; }
      51             : 
      52             :                 /// <summary>
      53             :                 /// A collection of <see cref="Tuple{T1, T2}"/> holding the configurations value (always a <see cref="bool"/>) and the <see cref="DateTime"/>
      54             :                 /// The value was last checked, keyed by it's configuration key.
      55             :                 /// </summary>
      56             :                 protected IDictionary<string, Tuple<bool, DateTime>> CachedChecks { get; private set; }
      57             : 
      58             :                 /// <summary>
      59             :                 /// A collection of <see cref="Tuple{T1, T2}"/> holding the configurations value (always a <see cref="bool"/>) and the <see cref="DateTime"/>
      60             :                 /// The value was last checked, keyed by it's configuration key.
      61             :                 /// </summary>
      62             :                 protected IDictionary<string, Tuple<bool?, DateTime>> NullableCachedChecks { get; private set; }
      63             : 
      64             :                 /// <summary>
      65             :                 /// The current value of "Cqrs.MessageBus.BlackListProcessing" from <see cref="ConfigurationManager"/>.
      66             :                 /// </summary>
      67             :                 protected bool EventBlackListProcessing { get; private set; }
      68             : 
      69             :                 /// <summary>
      70             :                 /// Refreshes <see cref="EventBlackListProcessing"/> and every item currently in <see cref="CachedChecks"/>.
      71             :                 /// </summary>
      72           1 :                 protected virtual void RefreshCachedChecks()
      73             :                 {
      74             :                         // First refresh the EventBlackListProcessing property
      75             :                         bool isblackListRequired;
      76             :                         if (!ConfigurationManager.TryGetSetting("Cqrs.MessageBus.BlackListProcessing", out isblackListRequired))
      77             :                                 isblackListRequired = true;
      78             :                         EventBlackListProcessing = isblackListRequired;
      79             : 
      80             :                         // Now in a dictionary safe way check each key for a value.
      81             :                         IList<string> keys = CachedChecks.Keys.ToList();
      82             :                         foreach (string configurationKey in keys)
      83             :                         {
      84             :                                 Tuple<bool, DateTime> pair = CachedChecks[configurationKey];
      85             :                                 bool value;
      86             :                                 // If we can't a value or there is no specific setting, remove it from the cache
      87             :                                 if (!ConfigurationManager.TryGetSetting(configurationKey, out value))
      88             :                                         CachedChecks.Remove(configurationKey);
      89             :                                 // Refresh the value and reset it's expiry if the value has changed
      90             :                                 else if (pair.Item1 != value)
      91             :                                         CachedChecks[configurationKey] = new Tuple<bool, DateTime>(value, DateTime.UtcNow);
      92             :                                 // Check it's age - by adding 20 minutes from being obtained or refreshed and if it's older than now remove it
      93             :                                 else if (pair.Item2.AddMinutes(20) < DateTime.UtcNow)
      94             :                                         CachedChecks.Remove(configurationKey);
      95             :                         }
      96             : 
      97             :                         // Now in a dictionary safe way check each key for a value.
      98             :                         keys = NullableCachedChecks.Keys.ToList();
      99             :                         foreach (string configurationKey in keys)
     100             :                         {
     101             :                                 Tuple<bool?, DateTime> pair = NullableCachedChecks[configurationKey];
     102             :                                 // Check it's age - by adding 20 minutes from being obtained or refreshed and if it's older than now remove it
     103             :                                 if (pair.Item2.AddMinutes(20) < DateTime.UtcNow)
     104             :                                         NullableCachedChecks.Remove(configurationKey);
     105             :                         }
     106             :                 }
     107             : 
     108             :                 /// <summary>
     109             :                 /// Starts <see cref="RefreshCachedChecks"/> in a <see cref="Task"/> on a one second loop.
     110             :                 /// </summary>
     111           1 :                 protected virtual void StartRefreshCachedChecks()
     112             :                 {
     113             :                         Task.Factory.StartNewSafely(() =>
     114             :                         {
     115             :                                 long loop = 0;
     116             :                                 while (true)
     117             :                                 {
     118             :                                         RefreshCachedChecks();
     119             : 
     120             :                                         if (loop++%5 == 0)
     121             :                                                 Thread.Yield();
     122             :                                         else
     123             :                                                 Thread.Sleep(1000);
     124             :                                         if (loop == long.MaxValue)
     125             :                                                 loop = long.MinValue;
     126             :                                 }
     127             :                         });
     128             :                 }
     129             : 
     130             :                 /// <summary>
     131             :                 /// Checks if a white-list or black-list approach is taken, then checks the <see cref="IConfigurationManager"/> to see if a key exists defining if the event is required or not.
     132             :                 /// If the event is required and it cannot be resolved, an error will be raised.
     133             :                 /// Otherwise the event will be marked as processed.
     134             :                 /// </summary>
     135             :                 /// <param name="messageType">The <see cref="Type"/> of the message being processed.</param>
     136           1 :                 public virtual bool IsEventRequired(Type messageType)
     137             :                 {
     138             :                         return IsEventRequired(string.Format("{0}.IsRequired", messageType.FullName));
     139             :                 }
     140             : 
     141             :                 /// <summary>
     142             :                 /// Checks if a white-list or black-list approach is taken, then checks the <see cref="IConfigurationManager"/> to see if a key exists defining if the event is required or not.
     143             :                 /// If the event is required and it cannot be resolved, an error will be raised.
     144             :                 /// Otherwise the event will be marked as processed.
     145             :                 /// </summary>
     146             :                 /// <param name="configurationKey">The configuration key to check.</param>
     147           1 :                 public virtual bool IsEventRequired(string configurationKey)
     148             :                 {
     149             :                         Tuple<bool, DateTime> settings;
     150             :                         bool isRequired;
     151             :                         if (!CachedChecks.TryGetValue(configurationKey, out settings))
     152             :                         {
     153             :                                 // If we can't find a value or there is no specific setting, we default to EventBlackListProcessing
     154             :                                 if (!ConfigurationManager.TryGetSetting(configurationKey, out isRequired))
     155             :                                         isRequired = EventBlackListProcessing;
     156             : 
     157             :                                 // Now cache the response
     158             :                                 try
     159             :                                 {
     160             :                                         CachedChecks.Add(configurationKey, new Tuple<bool, DateTime>(isRequired, DateTime.UtcNow));
     161             :                                 }
     162             :                                 catch (ArgumentException exception)
     163             :                                 {
     164             :                                         if (exception.Message != "The key already existed in the dictionary.")
     165             :                                                 throw;
     166             :                                         // It's been added since we checked... adding locks is slow, so just move on.
     167             :                                 }
     168             :                         }
     169             :                         // Don't refresh the expiry, we'll just update the cache every so often which is faster than constantly changing dictionary values.
     170             :                         else
     171             :                                 isRequired = settings.Item1;
     172             : 
     173             :                         return isRequired;
     174             :                 }
     175             : 
     176             :                 /// <summary>
     177             :                 /// Checks if the private bus is required to send the message. Note, this does not imply the public bus is not required as well.
     178             :                 /// </summary>
     179             :                 /// <param name="messageType">The <see cref="Type"/> of the message being processed.</param>
     180             :                 /// <returns>Null for unconfigured, True for private bus transmission, false otherwise.</returns>
     181           1 :                 public virtual bool? IsPrivateBusRequired(Type messageType)
     182             :                 {
     183             :                         return IsABusRequired(messageType, false);
     184             :                 }
     185             : 
     186             :                 /// <summary>
     187             :                 /// Checks if the public bus is required to send the message. Note, this does not imply the public bus is not required as well.
     188             :                 /// </summary>
     189             :                 /// <param name="messageType">The <see cref="Type"/> of the message being processed.</param>
     190             :                 /// <returns>Null for unconfigured, True for private bus transmission, false otherwise.</returns>
     191           1 :                 public virtual bool? IsPublicBusRequired(Type messageType)
     192             :                 {
     193             :                         return IsABusRequired(messageType, true);
     194             :                 }
     195             : 
     196             :                 /// <summary>
     197             :                 /// Checks if the particular bus is required to send the message. Note, this does not imply the public bus is not required as well.
     198             :                 /// </summary>
     199             :                 /// <param name="messageType">The <see cref="Type"/> of the message being processed.</param>
     200             :                 /// <param name="checkPublic">Check for the public or private bus.</param>
     201             :                 /// <returns>Null for unconfigured, True for a particular bus transmission, false otherwise.</returns>
     202           1 :                 protected virtual bool? IsABusRequired(Type messageType, bool checkPublic)
     203             :                 {
     204             :                         string configurationKey = string.Format(checkPublic ? "{0}.IsPublicBusRequired" : "{0}.IsPrivateBusRequired", messageType.FullName);
     205             :                         Tuple<bool?, DateTime> settings;
     206             :                         bool? isRequired;
     207             :                         if (!NullableCachedChecks.TryGetValue(configurationKey, out settings))
     208             :                         {
     209             :                                 bool isRequired1;
     210             :                                 // Check if there is a cached value
     211             :                                 if (ConfigurationManager.TryGetSetting(configurationKey, out isRequired1))
     212             :                                         isRequired = isRequired1;
     213             :                                 // If not, check the attributes
     214             :                                 else if (checkPublic)
     215             :                                 {
     216             :                                         var eventAttribute = Attribute.GetCustomAttribute(messageType, typeof(PublicEventAttribute)) as PublicEventAttribute;
     217             :                                         isRequired = eventAttribute == null ? (bool?) null : true;
     218             :                                 }
     219             :                                 // If not, check the attributes
     220             :                                 else
     221             :                                 {
     222             :                                         var eventAttribute = Attribute.GetCustomAttribute(messageType, typeof(PrivateEventAttribute)) as PrivateEventAttribute;
     223             :                                         isRequired = eventAttribute == null ? (bool?)null : true;
     224             :                                 }
     225             : 
     226             :                                 // Now cache the response
     227             :                                 try
     228             :                                 {
     229             :                                         NullableCachedChecks.Add(configurationKey, new Tuple<bool?, DateTime>(isRequired, DateTime.UtcNow));
     230             :                                 }
     231             :                                 catch (ArgumentException exception)
     232             :                                 {
     233             :                                         if (exception.Message != "The key already existed in the dictionary.")
     234             :                                                 throw;
     235             :                                         // It's been added since we checked... adding locks is slow, so just move on.
     236             :                                 }
     237             :                         }
     238             :                         // Don't refresh the expiry, we'll just update the cache every so often which is faster than constantly changing dictionary values.
     239             :                         else
     240             :                                 isRequired = settings.Item1;
     241             : 
     242             :                         // If all the above is still not difinitive, react to the bus the originating message was received on, but we only need to check for private.
     243             :                         // We do this here so caching is atleast used, but this cannot be cached as that would be wrong
     244             :                         if (isRequired == null && !checkPublic)
     245             :                                 if (GetWasPrivateBusUsed())
     246             :                                         return true;
     247             : 
     248             :                         return isRequired;
     249             :                 }
     250             : 
     251             :                 /// <summary>
     252             :                 /// Build a message handler that implements telemetry capturing as well as off thread handling.
     253             :                 /// </summary>
     254           1 :                 public virtual Action<TMessage> BuildTelemeteredActionHandler<TMessage, TAuthenticationToken>(ITelemetryHelper telemetryHelper, Action<TMessage> handler, bool holdMessageLock, string source)
     255             :                         where TMessage : IMessage
     256             :                 {
     257             :                         Action<TMessage> registerableMessageHandler = message =>
     258             :                         {
     259             :                                 DateTimeOffset startedAt = DateTimeOffset.UtcNow;
     260             :                                 Stopwatch mainStopWatch = Stopwatch.StartNew();
     261             :                                 string responseCode = "200";
     262             :                                 bool wasSuccessfull = true;
     263             : 
     264             :                                 string telemetryName = message.GetType().FullName;
     265             :                                 var telemeteredMessage = message as ITelemeteredMessage;
     266             :                                 string messagePrefix = null;
     267             :                                 object authenticationToken = null;
     268             :                                 var @event = message as IEvent<TAuthenticationToken>;
     269             :                                 if (@event != null)
     270             :                                 {
     271             :                                         messagePrefix = "Event/";
     272             :                                         telemetryName = string.Format("{0}/{1}/{2}", telemetryName, @event.GetIdentity(), @event.Id);
     273             :                                         authenticationToken = @event.AuthenticationToken;
     274             :                                 }
     275             :                                 else
     276             :                                 {
     277             :                                         var command = message as ICommand<TAuthenticationToken>;
     278             :                                         if (command != null)
     279             :                                         {
     280             :                                                 messagePrefix = "Command/";
     281             :                                                 telemetryName = string.Format("{0}/{1}/{2}", telemetryName, command.GetIdentity(), command.Id);
     282             :                                                 authenticationToken = command.AuthenticationToken;
     283             :                                         }
     284             :                                 }
     285             : 
     286             :                                 if (telemeteredMessage != null)
     287             :                                         telemetryName = telemeteredMessage.TelemetryName;
     288             : 
     289             :                                 telemetryHelper.TrackEvent(string.Format("Cqrs/Handle/{0}{1}/Started", messagePrefix, telemetryName));
     290             : 
     291             :                                 try
     292             :                                 {
     293             :                                         handler(message);
     294             :                                 }
     295             :                                 catch (Exception exception)
     296             :                                 {
     297             :                                         telemetryHelper.TrackException(exception);
     298             :                                         wasSuccessfull = false;
     299             :                                         responseCode = "500";
     300             :                                         throw;
     301             :                                 }
     302             :                                 finally
     303             :                                 {
     304             :                                         telemetryHelper.TrackEvent(string.Format("Cqrs/Handle/{0}{1}/Finished", messagePrefix, telemetryName));
     305             : 
     306             :                                         mainStopWatch.Stop();
     307             :                                         if (authenticationToken is ISingleSignOnToken)
     308             :                                                 telemetryHelper.TrackRequest
     309             :                                                 (
     310             :                                                         string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
     311             :                                                         (ISingleSignOnToken)authenticationToken,
     312             :                                                         startedAt,
     313             :                                                         mainStopWatch.Elapsed,
     314             :                                                         responseCode,
     315             :                                                         wasSuccessfull,
     316             :                                                         new Dictionary<string, string> { { "Type", source } }
     317             :                                                 );
     318             :                                         else if (authenticationToken is Guid)
     319             :                                                 telemetryHelper.TrackRequest
     320             :                                                 (
     321             :                                                         string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
     322             :                                                         (Guid?)authenticationToken,
     323             :                                                         startedAt,
     324             :                                                         mainStopWatch.Elapsed,
     325             :                                                         responseCode,
     326             :                                                         wasSuccessfull,
     327             :                                                         new Dictionary<string, string> { { "Type", source } }
     328             :                                                 );
     329             :                                         else if (authenticationToken is int)
     330             :                                                 telemetryHelper.TrackRequest
     331             :                                                 (
     332             :                                                         string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
     333             :                                                         (int?)authenticationToken,
     334             :                                                         startedAt,
     335             :                                                         mainStopWatch.Elapsed,
     336             :                                                         responseCode,
     337             :                                                         wasSuccessfull,
     338             :                                                         new Dictionary<string, string> { { "Type", source } }
     339             :                                                 );
     340             :                                         else
     341             :                                         {
     342             :                                                 string token = authenticationToken == null ? null : authenticationToken.ToString();
     343             :                                                 telemetryHelper.TrackRequest
     344             :                                                 (
     345             :                                                         string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
     346             :                                                         token,
     347             :                                                         startedAt,
     348             :                                                         mainStopWatch.Elapsed,
     349             :                                                         responseCode,
     350             :                                                         wasSuccessfull,
     351             :                                                         new Dictionary<string, string> { { "Type", source } }
     352             :                                                 );
     353             :                                         }
     354             : 
     355             :                                         telemetryHelper.Flush();
     356             :                                 }
     357             :                         };
     358             : 
     359             :                         return BuildActionHandler(registerableMessageHandler, holdMessageLock);
     360             :                 }
     361             : 
     362             :                 /// <summary>
     363             :                 /// Build a message handler that implements telemetry capturing as well as off thread handling.
     364             :                 /// </summary>
     365           1 :                 public virtual Action<TMessage> BuildActionHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock)
     366             :                         where TMessage : IMessage
     367             :                 {
     368             :                         Action<TMessage> registerableMessageHandler = handler;
     369             : 
     370             :                         Action<TMessage> registerableHandler = registerableMessageHandler;
     371             :                         if (!holdMessageLock)
     372             :                         {
     373             :                                 registerableHandler = message =>
     374             :                                 {
     375             :                                         Task.Factory.StartNewSafely(() =>
     376             :                                         {
     377             :                                                 registerableMessageHandler(message);
     378             :                                         });
     379             :                                 };
     380             :                         }
     381             : 
     382             :                         return registerableHandler;
     383             :                 }
     384             : 
     385             :                 /// <summary>
     386             :                 /// The key used to store the authentication token in the <see cref="Cache"/>.
     387             :                 /// </summary>
     388             :                 protected string CacheKey = "WasPrivateBusUsed";
     389             : 
     390             :                 /// <summary>
     391             :                 /// Get or set the Cache.
     392             :                 /// </summary>
     393             :                 protected IContextItemCollection Cache { get; private set; }
     394             : 
     395             :                 /// <summary>
     396             :                 /// Indicates if the message was received via the private bus or not. If false, this implies the public was use used.
     397             :                 /// </summary>
     398           1 :                 public bool GetWasPrivateBusUsed()
     399             :                 {
     400             :                         try
     401             :                         {
     402             :                                 return Cache.GetData<bool>(CacheKey);
     403             :                         }
     404             :                         catch
     405             :                         {
     406             :                                 return false;
     407             :                         }
     408             :                 }
     409             : 
     410             :                 /// <summary>
     411             :                 /// Set whether the message was received via the private bus or not. If false, this indicates the public was use used.
     412             :                 /// </summary>
     413           1 :                 public bool SetWasPrivateBusUsed(bool wasPrivate)
     414             :                 {
     415             :                         return Cache.SetData(CacheKey, wasPrivate);
     416             :                 }
     417             :         }
     418             : }
 |