Documentation Coverage Report
Current view: top level - Azure/Cqrs.Azure.ServiceBus - AzureBusHelper.cs Hit Total Coverage
Version: 2.2 Artefacts: 14 14 100.0 %
Date: 2018-08-07 15:04:50

          Line data    Source code
       1             : #region IMPORTANT NOTE
       2             : // This is copied almost exactly into the eventhub except for a string difference. Replicate changes there until a refactor is done.
       3             : #endregion
       4             : 
       5             : #region Copyright
       6             : // // -----------------------------------------------------------------------
       7             : // // <copyright company="Chinchilla Software Limited">
       8             : // //   Copyright Chinchilla Software Limited. All rights reserved.
       9             : // // </copyright>
      10             : // // -----------------------------------------------------------------------
      11             : #endregion
      12             : 
      13             : using System;
      14             : using System.Collections.Generic;
      15             : using System.Diagnostics;
      16             : using System.IO;
      17             : using System.Linq;
      18             : using System.Security.Cryptography;
      19             : using System.Text;
      20             : using System.Text.RegularExpressions;
      21             : using System.Threading;
      22             : using System.Threading.Tasks;
      23             : using cdmdotnet.Logging;
      24             : using Cqrs.Authentication;
      25             : using Cqrs.Bus;
      26             : using Cqrs.Commands;
      27             : using Cqrs.Configuration;
      28             : using Cqrs.Events;
      29             : using Cqrs.Exceptions;
      30             : using Cqrs.Messages;
      31             : using Microsoft.ServiceBus.Messaging;
      32             : using Newtonsoft.Json;
      33             : 
      34             : namespace Cqrs.Azure.ServiceBus
      35             : {
      36             :         /// <summary>
      37             :         /// A helper for Azure Service Bus and Event Hub.
      38             :         /// </summary>
      39             :         /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
      40             :         public class AzureBusHelper<TAuthenticationToken>
      41             :                 : IAzureBusHelper<TAuthenticationToken>
      42           1 :         {
      43             :                 /// <summary>
      44             :                 /// Instantiates a new instance of <see cref="AzureBusHelper{TAuthenticationToken}"/>.
      45             :                 /// </summary>
      46           1 :                 public AzureBusHelper(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IBusHelper busHelper, IHashAlgorithmFactory hashAlgorithmFactory, IConfigurationManager configurationManager, IDependencyResolver dependencyResolver)
      47             :                 {
      48             :                         AuthenticationTokenHelper = authenticationTokenHelper;
      49             :                         CorrelationIdHelper = correlationIdHelper;
      50             :                         Logger = logger;
      51             :                         MessageSerialiser = messageSerialiser;
      52             :                         BusHelper = busHelper;
      53             :                         DependencyResolver = dependencyResolver;
      54             :                         ConfigurationManager = configurationManager;
      55             :                         Signer = hashAlgorithmFactory;
      56             :                 }
      57             : 
      58             :                 /// <summary>
      59             :                 /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
      60             :                 /// </summary>
      61             :                 protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
      62             : 
      63             :                 /// <summary>
      64             :                 /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
      65             :                 /// </summary>
      66             :                 protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
      67             : 
      68             :                 /// <summary>
      69             :                 /// Gets or sets the <see cref="ILogger"/>.
      70             :                 /// </summary>
      71             :                 protected ILogger Logger { get; private set; }
      72             : 
      73             :                 /// <summary>
      74             :                 /// Gets or sets the <see cref="IMessageSerialiser{TAuthenticationToken}"/>.
      75             :                 /// </summary>
      76             :                 protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
      77             : 
      78             :                 /// <summary>
      79             :                 /// Gets or sets the <see cref="IBusHelper"/>.
      80             :                 /// </summary>
      81             :                 protected IBusHelper BusHelper { get; private set; }
      82             : 
      83             :                 /// <summary>
      84             :                 /// Gets or sets the <see cref="IConfigurationManager"/>.
      85             :                 /// </summary>
      86             :                 protected IConfigurationManager ConfigurationManager { get; private set; }
      87             : 
      88             :                 /// <summary>
      89             :                 /// Gets or sets the <see cref="IDependencyResolver"/>.
      90             :                 /// </summary>
      91             :                 protected IDependencyResolver DependencyResolver { get; private set; }
      92             : 
      93             :                 /// <summary>
      94             :                 /// The configuration key for the default message refreshing setting as used by <see cref="IConfigurationManager"/>.
      95             :                 /// </summary>
      96             :                 protected const string DefaultMessagesShouldRefreshConfigurationKey = "Cqrs.Azure.Messages.ShouldRefresh";
      97             : 
      98             :                 /// <summary>
      99             :                 /// The <see cref="IHashAlgorithmFactory"/> to use to sign messages.
     100             :                 /// </summary>
     101             :                 protected IHashAlgorithmFactory Signer { get; private set; }
     102             : 
     103             :                 /// <summary>
     104             :                 /// Prepares a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
     105             :                 /// </summary>
     106             :                 /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
     107             :                 /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
     108             :                 /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
     109           1 :                 public virtual void PrepareCommand<TCommand>(TCommand command, string framework)
     110             :                         where TCommand : ICommand<TAuthenticationToken>
     111             :                 {
     112             :                         if (command.AuthenticationToken == null || command.AuthenticationToken.Equals(default(TAuthenticationToken)))
     113             :                                 command.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
     114             :                         command.CorrelationId = CorrelationIdHelper.GetCorrelationId();
     115             : 
     116             :                         if (string.IsNullOrWhiteSpace(command.OriginatingFramework))
     117             :                                 command.OriginatingFramework = framework;
     118             :                         var frameworks = new List<string>();
     119             :                         if (command.Frameworks != null)
     120             :                                 frameworks.AddRange(command.Frameworks);
     121             :                         frameworks.Add(framework);
     122             :                         command.Frameworks = frameworks;
     123             :                 }
     124             : 
     125             :                 /// <summary>
     126             :                 /// Prepares and validates a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
     127             :                 /// </summary>
     128             :                 /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
     129             :                 /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
     130             :                 /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
     131           1 :                 public virtual bool PrepareAndValidateCommand<TCommand>(TCommand command, string framework)
     132             :                         where TCommand : ICommand<TAuthenticationToken>
     133             :                 {
     134             :                         Type commandType = command.GetType();
     135             : 
     136             :                         if (command.Frameworks != null && command.Frameworks.Contains(framework))
     137             :                         {
     138             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     139             :                                 if (command.Frameworks.Count() != 1)
     140             :                                 {
     141             :                                         Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
     142             :                                         return false;
     143             :                                 }
     144             :                         }
     145             : 
     146             :                         ICommandValidator<TAuthenticationToken, TCommand> commandValidator = null;
     147             :                         try
     148             :                         {
     149             :                                 commandValidator = DependencyResolver.Resolve<ICommandValidator<TAuthenticationToken, TCommand>>();
     150             :                         }
     151             :                         catch (Exception exception)
     152             :                         {
     153             :                                 Logger.LogDebug("Locating an ICommandValidator failed.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName), exception);
     154             :                         }
     155             : 
     156             :                         if (commandValidator != null && !commandValidator.IsCommandValid(command))
     157             :                         {
     158             :                                 Logger.LogInfo("The provided command is not valid.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
     159             :                                 return false;
     160             :                         }
     161             : 
     162             :                         PrepareCommand(command, framework);
     163             :                         return true;
     164             :                 }
     165             : 
     166             :                 /// <summary>
     167             :                 /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveCommandHandler"/>.
     168             :                 /// </summary>
     169             :                 /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
     170             :                 /// <param name="receiveCommandHandler">The handler method that will process the <see cref="ICommand{TAuthenticationToken}"/>.</param>
     171             :                 /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
     172             :                 /// <param name="signature">The signature of the <see cref="IMessage"/>.</param>
     173             :                 /// <param name="signingTokenConfigurationKey">The configuration key for the signing token as used by <see cref="IConfigurationManager"/>.</param>
     174             :                 /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="ICommand{TAuthenticationToken}"/> is being skipped.</param>
     175             :                 /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
     176             :                 /// <returns>The <see cref="ICommand{TAuthenticationToken}"/> that was processed.</returns>
     177           1 :                 public virtual ICommand<TAuthenticationToken> ReceiveCommand(string messageBody, Func<ICommand<TAuthenticationToken>, bool?> receiveCommandHandler, string messageId, string signature, string signingTokenConfigurationKey, Action skippedAction = null, Action lockRefreshAction = null)
     178             :                 {
     179             :                         ICommand<TAuthenticationToken> command;
     180             :                         try
     181             :                         {
     182             :                                 command = MessageSerialiser.DeserialiseCommand(messageBody);
     183             :                         }
     184             :                         catch (JsonSerializationException exception)
     185             :                         {
     186             :                                 JsonSerializationException checkException = exception;
     187             :                                 bool safeToExit = false;
     188             :                                 do
     189             :                                 {
     190             :                                         if (checkException.Message.StartsWith("Could not load assembly"))
     191             :                                         {
     192             :                                                 safeToExit = true;
     193             :                                                 break;
     194             :                                         }
     195             :                                 } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
     196             :                                 if (safeToExit)
     197             :                                 {
     198             :                                         const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
     199             :                                         Match match = new Regex(pattern).Match(exception.Message);
     200             :                                         if (match.Success)
     201             :                                         {
     202             :                                                 string[] typeParts = match.Value.Split(',');
     203             :                                                 if (typeParts.Length == 2)
     204             :                                                 {
     205             :                                                         string classType = typeParts[0];
     206             :                                                         bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
     207             : 
     208             :                                                         if (!isRequired)
     209             :                                                         {
     210             :                                                                 if (skippedAction != null)
     211             :                                                                         skippedAction();
     212             :                                                                 return null;
     213             :                                                         }
     214             :                                                 }
     215             :                                         }
     216             :                                 }
     217             :                                 throw;
     218             :                         }
     219             : 
     220             :                         string commandTypeName = command.GetType().FullName;
     221             :                         CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
     222             :                         AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
     223             :                         string identifyMessage = null;
     224             :                         var identifiedEvent = command as ICommandWithIdentity<TAuthenticationToken>;
     225             :                         if (identifiedEvent != null)
     226             :                                 identifyMessage = string.Format(" for aggregate {0}", identifiedEvent.Rsn);
     227             :                         Logger.LogInfo(string.Format("A command message arrived with the {0} was of type {1}{2}.", messageId, commandTypeName, identifyMessage));
     228             : 
     229             :                         VerifySignature(signingTokenConfigurationKey, signature, "A command", messageId, commandTypeName, identifyMessage, messageBody);
     230             :                         bool canRefresh;
     231             :                         if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", commandTypeName), out canRefresh))
     232             :                                 canRefresh = false;
     233             : 
     234             :                         if (canRefresh)
     235             :                         {
     236             :                                 if (lockRefreshAction == null)
     237             :                                         Logger.LogWarning(string.Format("A command message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, commandTypeName));
     238             :                                 else
     239             :                                         lockRefreshAction();
     240             :                         }
     241             : 
     242             :                         // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
     243             :                         bool? result = receiveCommandHandler(command);
     244             :                         if (result != null && !result.Value)
     245             :                                 if (skippedAction != null)
     246             :                                         skippedAction();
     247             : 
     248             :                         return command;
     249             :                 }
     250             : 
     251             :                 /// <summary>
     252             :                 /// The default command handler that
     253             :                 /// check if the <see cref="ICommand{TAuthenticationToken}"/> has already been processed by this framework,
     254             :                 /// checks if the <see cref="ICommand{TAuthenticationToken}"/> is required,
     255             :                 /// finds the handler from the provided <paramref name="routeManager"/>.
     256             :                 /// </summary>
     257             :                 /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to process.</param>
     258             :                 /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="ICommandHandler{TAuthenticationToken,TCommand}"/> from.</param>
     259             :                 /// <param name="framework">The current framework.</param>
     260             :                 /// <returns>
     261             :                 /// True indicates the <paramref name="command"/> was successfully handled by a handler.
     262             :                 /// False indicates the <paramref name="command"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
     263             :                 /// Null indicates the command<paramref name="command"/> wasn't handled as it was already handled.
     264             :                 /// </returns>
     265           1 :                 public virtual bool? DefaultReceiveCommand(ICommand<TAuthenticationToken> command, RouteManager routeManager, string framework)
     266             :                 {
     267             :                         Type commandType = command.GetType();
     268             : 
     269             :                         if (command.Frameworks != null && command.Frameworks.Contains(framework))
     270             :                         {
     271             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     272             :                                 if (command.Frameworks.Count() != 1)
     273             :                                 {
     274             :                                         Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\DefaultReceiveCommand({1})", GetType().FullName, commandType.FullName));
     275             :                                         return null;
     276             :                                 }
     277             :                         }
     278             : 
     279             :                         CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
     280             :                         AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
     281             : 
     282             :                         bool isRequired = BusHelper.IsEventRequired(commandType);
     283             : 
     284             :                         RouteHandlerDelegate commandHandler = routeManager.GetSingleHandler(command, isRequired);
     285             :                         // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
     286             :                         if (commandHandler == null)
     287             :                         {
     288             :                                 Logger.LogDebug(string.Format("The command handler for '{0}' is not required.", commandType.FullName));
     289             :                                 return false;
     290             :                         }
     291             : 
     292             :                         Action<IMessage> handler = commandHandler.Delegate;
     293             :                         handler(command);
     294             :                         return true;
     295             :                 }
     296             : 
     297             :                 /// <summary>
     298             :                 /// Prepares an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
     299             :                 /// </summary>
     300             :                 /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
     301             :                 /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
     302             :                 /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
     303           1 :                 public virtual void PrepareEvent<TEvent>(TEvent @event, string framework)
     304             :                         where TEvent : IEvent<TAuthenticationToken>
     305             :                 {
     306             :                         if (@event.AuthenticationToken == null || @event.AuthenticationToken.Equals(default(TAuthenticationToken)))
     307             :                                 @event.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
     308             :                         @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
     309             :                         @event.TimeStamp = DateTimeOffset.UtcNow;
     310             : 
     311             :                         if (string.IsNullOrWhiteSpace(@event.OriginatingFramework))
     312             :                                 @event.OriginatingFramework = framework;
     313             :                         var frameworks = new List<string>();
     314             :                         if (@event.Frameworks != null)
     315             :                                 frameworks.AddRange(@event.Frameworks);
     316             :                         frameworks.Add(framework);
     317             :                         @event.Frameworks = frameworks;
     318             :                 }
     319             : 
     320             :                 /// <summary>
     321             :                 /// Prepares and validates an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
     322             :                 /// </summary>
     323             :                 /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
     324             :                 /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
     325             :                 /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
     326           1 :                 public virtual bool PrepareAndValidateEvent<TEvent>(TEvent @event, string framework)
     327             :                         where TEvent : IEvent<TAuthenticationToken>
     328             :                 {
     329             :                         Type eventType = @event.GetType();
     330             : 
     331             :                         if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
     332             :                         {
     333             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     334             :                                 if (@event.Frameworks.Count() != 1)
     335             :                                 {
     336             :                                         Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, eventType.FullName));
     337             :                                         return false;
     338             :                                 }
     339             :                         }
     340             : 
     341             :                         PrepareEvent(@event, framework);
     342             :                         return true;
     343             :                 }
     344             : 
     345             :                 /// <summary>
     346             :                 /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveEventHandler"/>.
     347             :                 /// </summary>
     348             :                 /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
     349             :                 /// <param name="receiveEventHandler">The handler method that will process the <see cref="IEvent{TAuthenticationToken}"/>.</param>
     350             :                 /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
     351             :                 /// <param name="signature">The signature of the <see cref="IMessage"/>.</param>
     352             :                 /// <param name="signingTokenConfigurationKey">The configuration key for the signing token as used by <see cref="IConfigurationManager"/>.</param>
     353             :                 /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="IEvent{TAuthenticationToken}"/> is being skipped.</param>
     354             :                 /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
     355             :                 /// <returns>The <see cref="IEvent{TAuthenticationToken}"/> that was processed.</returns>
     356           1 :                 public virtual IEvent<TAuthenticationToken> ReceiveEvent(string messageBody, Func<IEvent<TAuthenticationToken>, bool?> receiveEventHandler, string messageId, string signature, string signingTokenConfigurationKey, Action skippedAction = null, Action lockRefreshAction = null)
     357             :                 {
     358             :                         IEvent<TAuthenticationToken> @event;
     359             :                         try
     360             :                         {
     361             :                                 @event = MessageSerialiser.DeserialiseEvent(messageBody);
     362             :                         }
     363             :                         catch (JsonSerializationException exception)
     364             :                         {
     365             :                                 JsonSerializationException checkException = exception;
     366             :                                 bool safeToExit = false;
     367             :                                 do
     368             :                                 {
     369             :                                         if (checkException.Message.StartsWith("Could not load assembly"))
     370             :                                         {
     371             :                                                 safeToExit = true;
     372             :                                                 break;
     373             :                                         }
     374             :                                 } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
     375             :                                 if (safeToExit)
     376             :                                 {
     377             :                                         const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
     378             :                                         Match match = new Regex(pattern).Match(exception.Message);
     379             :                                         if (match.Success)
     380             :                                         {
     381             :                                                 string[] typeParts = match.Value.Split(',');
     382             :                                                 if (typeParts.Length == 2)
     383             :                                                 {
     384             :                                                         string classType = typeParts[0];
     385             :                                                         bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
     386             : 
     387             :                                                         if (!isRequired)
     388             :                                                         {
     389             :                                                                 if (skippedAction != null)
     390             :                                                                         skippedAction();
     391             :                                                                 return null;
     392             :                                                         }
     393             :                                                 }
     394             :                                         }
     395             :                                 }
     396             :                                 throw;
     397             :                         }
     398             : 
     399             :                         string eventTypeName = @event.GetType().FullName;
     400             :                         CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
     401             :                         AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
     402             :                         object identifyMessage = null;
     403             :                         var identifiedEvent = @event as IEventWithIdentity<TAuthenticationToken>;
     404             :                         if (identifiedEvent != null)
     405             :                                 identifyMessage = string.Format(" for aggregate {0}", identifiedEvent.Rsn);
     406             :                         Logger.LogInfo(string.Format("An event message arrived with the {0} was of type {1}{2}.", messageId, eventTypeName, identifyMessage));
     407             : 
     408             :                         VerifySignature(signingTokenConfigurationKey, signature, "An event", messageId, eventTypeName, identifyMessage, messageBody);
     409             :                         bool canRefresh;
     410             :                         if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", eventTypeName), out canRefresh))
     411             :                                 if (!ConfigurationManager.TryGetSetting(DefaultMessagesShouldRefreshConfigurationKey, out canRefresh))
     412             :                                         canRefresh = false;
     413             : 
     414             :                         if (canRefresh)
     415             :                         {
     416             :                                 if (lockRefreshAction == null)
     417             :                                         Logger.LogWarning(string.Format("An event message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, eventTypeName));
     418             :                                 else
     419             :                                         lockRefreshAction();
     420             :                         }
     421             : 
     422             :                         // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
     423             :                         bool? result = receiveEventHandler(@event);
     424             :                         if (result != null && !result.Value)
     425             :                                 if (skippedAction != null)
     426             :                                         skippedAction();
     427             : 
     428             :                         return @event;
     429             :                 }
     430             : 
     431             :                 /// <summary>
     432             :                 /// Refreshes the network lock.
     433             :                 /// </summary>
     434           1 :                 public virtual void RefreshLock(CancellationTokenSource brokeredMessageRenewCancellationTokenSource, BrokeredMessage message, string type = "message")
     435             :                 {
     436             :                         Task.Factory.StartNewSafely(() =>
     437             :                         {
     438             :                                 // The capturing of ObjectDisposedException is because even the properties can throw it.
     439             :                                 try
     440             :                                 {
     441             :                                         object value;
     442             :                                         string typeName = null;
     443             :                                         if (message.Properties.TryGetValue("Type", out value))
     444             :                                                 typeName = value.ToString();
     445             : 
     446             :                                         long loop = long.MinValue;
     447             :                                         while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
     448             :                                         {
     449             :                                                 // Based on LockedUntilUtc property to determine if the lock expires soon
     450             :                                                 // We lock for 45 seconds to ensure any thread based issues are mitigated.
     451             :                                                 if (DateTime.UtcNow > message.LockedUntilUtc.AddSeconds(-45))
     452             :                                                 {
     453             :                                                         // If so, renew the lock
     454             :                                                         for (int i = 0; i < 10; i++)
     455             :                                                         {
     456             :                                                                 try
     457             :                                                                 {
     458             :                                                                         if (brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
     459             :                                                                                 return;
     460             :                                                                         message.RenewLock();
     461             :                                                                         try
     462             :                                                                         {
     463             :                                                                                 Logger.LogDebug(string.Format("Renewed the {2} lock on {1} '{0}'.", message.MessageId, type, typeName));
     464             :                                                                         }
     465             :                                                                         catch
     466             :                                                                         {
     467             :                                                                                 Trace.TraceError("Renewed the {2} lock on {1} '{0}'.", message.MessageId, type, typeName);
     468             :                                                                         }
     469             : 
     470             :                                                                         break;
     471             :                                                                 }
     472             :                                                                 catch (ObjectDisposedException)
     473             :                                                                 {
     474             :                                                                         return;
     475             :                                                                 }
     476             :                                                                 catch (MessageLockLostException exception)
     477             :                                                                 {
     478             :                                                                         try
     479             :                                                                         {
     480             :                                                                                 Logger.LogWarning(string.Format("Renewing the {2} lock on {1} '{0}' failed as the message lock was lost.", message.MessageId, type, typeName), exception: exception);
     481             :                                                                         }
     482             :                                                                         catch
     483             :                                                                         {
     484             :                                                                                 Trace.TraceError("Renewing the {2} lock on {1} '{0}' failed as the message lock was lost.\r\n{3}", message.MessageId, type, typeName, exception.Message);
     485             :                                                                         }
     486             :                                                                         return;
     487             :                                                                 }
     488             :                                                                 catch (Exception exception)
     489             :                                                                 {
     490             :                                                                         try
     491             :                                                                         {
     492             :                                                                                 Logger.LogWarning(string.Format("Renewing the {2} lock on {1} '{0}' failed.", message.MessageId, type, typeName), exception: exception);
     493             :                                                                         }
     494             :                                                                         catch
     495             :                                                                         {
     496             :                                                                                 Trace.TraceError("Renewing the {2} lock on {1} '{0}' failed.\r\n{3}", message.MessageId, type, typeName, exception.Message);
     497             :                                                                         }
     498             :                                                                         if (i == 9)
     499             :                                                                                 return;
     500             :                                                                 }
     501             :                                                         }
     502             :                                                 }
     503             : 
     504             :                                                 if (loop++ % 5 == 0)
     505             :                                                         Thread.Yield();
     506             :                                                 else
     507             :                                                         Thread.Sleep(500);
     508             :                                                 if (loop == long.MaxValue)
     509             :                                                         loop = long.MinValue;
     510             :                                         }
     511             :                                         try
     512             :                                         {
     513             :                                                 brokeredMessageRenewCancellationTokenSource.Dispose();
     514             :                                         }
     515             :                                         catch (ObjectDisposedException) { }
     516             :                                 }
     517             :                                 catch (ObjectDisposedException) { }
     518             :                         }, brokeredMessageRenewCancellationTokenSource.Token);
     519             :                 }
     520             : 
     521             :                 /// <summary>
     522             :                 /// The default event handler that
     523             :                 /// check if the <see cref="IEvent{TAuthenticationToken}"/> has already been processed by this framework,
     524             :                 /// checks if the <see cref="IEvent{TAuthenticationToken}"/> is required,
     525             :                 /// finds the handler from the provided <paramref name="routeManager"/>.
     526             :                 /// </summary>
     527             :                 /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to process.</param>
     528             :                 /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="IEventHandler{TAuthenticationToken,TCommand}"/> from.</param>
     529             :                 /// <param name="framework">The current framework.</param>
     530             :                 /// <returns>
     531             :                 /// True indicates the <paramref name="event"/> was successfully handled by a handler.
     532             :                 /// False indicates the <paramref name="event"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
     533             :                 /// Null indicates the <paramref name="event"/> wasn't handled as it was already handled.
     534             :                 /// </returns>
     535           1 :                 public virtual bool? DefaultReceiveEvent(IEvent<TAuthenticationToken> @event, RouteManager routeManager, string framework)
     536             :                 {
     537             :                         Type eventType = @event.GetType();
     538             : 
     539             :                         if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
     540             :                         {
     541             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     542             :                                 if (@event.Frameworks.Count() != 1)
     543             :                                 {
     544             :                                         Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\DefaultReceiveEvent({1})", GetType().FullName, eventType.FullName));
     545             :                                         return null;
     546             :                                 }
     547             :                         }
     548             : 
     549             :                         CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
     550             :                         AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
     551             : 
     552             :                         bool isRequired = BusHelper.IsEventRequired(eventType);
     553             : 
     554             :                         IEnumerable<Action<IMessage>> handlers = routeManager.GetHandlers(@event, isRequired).Select(x => x.Delegate).ToList();
     555             :                         // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
     556             :                         if (!handlers.Any())
     557             :                         {
     558             :                                 Logger.LogDebug(string.Format("The event handler for '{0}' is not required.", eventType.FullName));
     559             :                                 return false;
     560             :                         }
     561             : 
     562             :                         foreach (Action<IMessage> handler in handlers)
     563             :                                 handler(@event);
     564             :                         return true;
     565             :                 }
     566             : 
     567             :                 /// <summary>
     568             :                 /// Verifies that the signature is authorised.
     569             :                 /// </summary>
     570           1 :                 protected virtual void VerifySignature(string signingTokenConfigurationKey, string signature, string messagetype, string messageId, string typeName, object identifyMessage, string messageBody)
     571             :                 {
     572             :                         if (string.IsNullOrWhiteSpace(signature))
     573             :                                 Logger.LogWarning(string.Format("{3} message arrived with the {0} was of type {1}{2} and had no signature.", messageId, typeName, identifyMessage, messagetype));
     574             :                         else
     575             :                         {
     576             :                                 bool messageIsValid = false;
     577             :                                 // see https://github.com/Chinchilla-Software-Com/CQRS/wiki/Inter-process-function-security</remarks>
     578             :                                 string configurationKey = string.Format("{0}.SigningToken", typeName);
     579             :                                 string signingToken;
     580             :                                 HashAlgorithm signer = Signer.Create();
     581             :                                 if (ConfigurationManager.TryGetSetting(configurationKey, out signingToken) && !string.IsNullOrWhiteSpace(signingToken))
     582             :                                         using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", signingToken, messageBody))))
     583             :                                                 messageIsValid = signature == Convert.ToBase64String(signer.ComputeHash(hashStream));
     584             :                                 if (!messageIsValid && ConfigurationManager.TryGetSetting(signingTokenConfigurationKey, out signingToken) && !string.IsNullOrWhiteSpace(signingToken))
     585             :                                         using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", signingToken, messageBody))))
     586             :                                                 messageIsValid = signature == Convert.ToBase64String(signer.ComputeHash(hashStream));
     587             :                                 if (!messageIsValid)
     588             :                                         using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", Guid.Empty.ToString("N"), messageBody))))
     589             :                                                 messageIsValid = signature == Convert.ToBase64String(signer.ComputeHash(hashStream));
     590             :                                 if (!messageIsValid)
     591             :                                         throw new UnAuthorisedMessageReceivedException(typeName, messageId, identifyMessage);
     592             :                         }
     593             :                 }
     594             : 
     595             :                 /// <summary>
     596             :                 /// Manually registers the provided <paramref name="handler"/> 
     597             :                 /// on the provided <paramref name="routeManger"/>
     598             :                 /// </summary>
     599             :                 /// <typeparam name="TMessage">The <see cref="Type"/> of <see cref="IMessage"/> the <paramref name="handler"/> can handle.</typeparam>
     600           1 :                 public virtual void RegisterHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
     601             :                         where TMessage : IMessage
     602             :                 {
     603             :                         Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
     604             : 
     605             :                         routeManger.RegisterHandler(registerableHandler, targetedType);
     606             : 
     607             :                         telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
     608             :                         telemetryHelper.Flush();
     609             :                 }
     610             : 
     611             :                 /// <summary>
     612             :                 /// Register an event handler that will listen and respond to all events.
     613             :                 /// </summary>
     614           1 :                 public virtual void RegisterGlobalEventHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler, bool holdMessageLock = true)
     615             :                         where TMessage : IMessage
     616             :                 {
     617             :                         Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
     618             : 
     619             :                         routeManger.RegisterGlobalEventHandler(registerableHandler);
     620             : 
     621             :                         telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterGlobalEventHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
     622             :                         telemetryHelper.Flush();
     623             :                 }
     624             :         }
     625             : }

Generated by: LCOV version 1.12