Documentation Coverage Report
Current view: top level - Azure/Cqrs.Azure.ServiceBus - AzureBusHelper.cs Hit Total Coverage
Version: 4.0 Artefacts: 13 13 100.0 %
Date: 2019-11-24 03:15:41

          Line data    Source code
       1             : #region IMPORTANT NOTE
       2             : // This is copied almost exactly into the eventhub except for a string difference. Replicate changes there until a refactor is done.
       3             : #endregion
       4             : 
       5             : #region Copyright
       6             : // // -----------------------------------------------------------------------
       7             : // // <copyright company="Chinchilla Software Limited">
       8             : // //   Copyright Chinchilla Software Limited. All rights reserved.
       9             : // // </copyright>
      10             : // // -----------------------------------------------------------------------
      11             : #endregion
      12             : 
      13             : using System;
      14             : using System.Collections.Generic;
      15             : using System.Diagnostics;
      16             : using System.IO;
      17             : using System.Linq;
      18             : using System.Security.Cryptography;
      19             : using System.Text;
      20             : using System.Text.RegularExpressions;
      21             : using System.Threading;
      22             : using System.Threading.Tasks;
      23             : using Chinchilla.Logging;
      24             : using Cqrs.Authentication;
      25             : using Cqrs.Bus;
      26             : using Cqrs.Commands;
      27             : using Cqrs.Configuration;
      28             : using Cqrs.Events;
      29             : using Cqrs.Exceptions;
      30             : using Cqrs.Messages;
      31             : #if NET452
      32             : using Microsoft.ServiceBus.Messaging;
      33             : #endif
      34             : #if NETCOREAPP3_0
      35             : using Microsoft.Azure.ServiceBus;
      36             : using Microsoft.Azure.ServiceBus.Core;
      37             : using BrokeredMessage = Microsoft.Azure.ServiceBus.Message;
      38             : #endif
      39             : using Newtonsoft.Json;
      40             : 
      41             : namespace Cqrs.Azure.ServiceBus
      42             : {
      43             :         /// <summary>
      44             :         /// A helper for Azure Service Bus and Event Hub.
      45             :         /// </summary>
      46             :         /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
      47             :         public class AzureBusHelper<TAuthenticationToken>
      48             :                 : IAzureBusHelper<TAuthenticationToken>
      49           1 :         {
      50             :                 /// <summary>
      51             :                 /// Instantiates a new instance of <see cref="AzureBusHelper{TAuthenticationToken}"/>.
      52             :                 /// </summary>
      53           1 :                 public AzureBusHelper(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IBusHelper busHelper, IHashAlgorithmFactory hashAlgorithmFactory, IConfigurationManager configurationManager, IDependencyResolver dependencyResolver)
      54             :                 {
      55             :                         AuthenticationTokenHelper = authenticationTokenHelper;
      56             :                         CorrelationIdHelper = correlationIdHelper;
      57             :                         Logger = logger;
      58             :                         MessageSerialiser = messageSerialiser;
      59             :                         BusHelper = busHelper;
      60             :                         DependencyResolver = dependencyResolver;
      61             :                         ConfigurationManager = configurationManager;
      62             :                         Signer = hashAlgorithmFactory;
      63             :                 }
      64             : 
      65             :                 /// <summary>
      66             :                 /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
      67             :                 /// </summary>
      68             :                 protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
      69             : 
      70             :                 /// <summary>
      71             :                 /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
      72             :                 /// </summary>
      73             :                 protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
      74             : 
      75             :                 /// <summary>
      76             :                 /// Gets or sets the <see cref="ILogger"/>.
      77             :                 /// </summary>
      78             :                 protected ILogger Logger { get; private set; }
      79             : 
      80             :                 /// <summary>
      81             :                 /// Gets or sets the <see cref="IMessageSerialiser{TAuthenticationToken}"/>.
      82             :                 /// </summary>
      83             :                 protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
      84             : 
      85             :                 /// <summary>
      86             :                 /// Gets or sets the <see cref="IBusHelper"/>.
      87             :                 /// </summary>
      88             :                 protected IBusHelper BusHelper { get; private set; }
      89             : 
      90             :                 /// <summary>
      91             :                 /// Gets or sets the <see cref="IConfigurationManager"/>.
      92             :                 /// </summary>
      93             :                 protected IConfigurationManager ConfigurationManager { get; private set; }
      94             : 
      95             :                 /// <summary>
      96             :                 /// Gets or sets the <see cref="IDependencyResolver"/>.
      97             :                 /// </summary>
      98             :                 protected IDependencyResolver DependencyResolver { get; private set; }
      99             : 
     100             :                 /// <summary>
     101             :                 /// The configuration key for the default message refreshing setting as used by <see cref="IConfigurationManager"/>.
     102             :                 /// </summary>
     103             :                 protected const string DefaultMessagesShouldRefreshConfigurationKey = "Cqrs.Azure.Messages.ShouldRefresh";
     104             : 
     105             :                 /// <summary>
     106             :                 /// The <see cref="IHashAlgorithmFactory"/> to use to sign messages.
     107             :                 /// </summary>
     108             :                 protected IHashAlgorithmFactory Signer { get; private set; }
     109             : 
     110             :                 /// <summary>
     111             :                 /// Prepares a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
     112             :                 /// </summary>
     113             :                 /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
     114             :                 /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
     115             :                 /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
     116           1 :                 public virtual void PrepareCommand<TCommand>(TCommand command, string framework)
     117             :                         where TCommand : ICommand<TAuthenticationToken>
     118             :                 {
     119             :                         if (command.AuthenticationToken == null || command.AuthenticationToken.Equals(default(TAuthenticationToken)))
     120             :                                 command.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
     121             :                         command.CorrelationId = CorrelationIdHelper.GetCorrelationId();
     122             : 
     123             :                         if (string.IsNullOrWhiteSpace(command.OriginatingFramework))
     124             :                                 command.OriginatingFramework = framework;
     125             :                         var frameworks = new List<string>();
     126             :                         if (command.Frameworks != null)
     127             :                                 frameworks.AddRange(command.Frameworks);
     128             :                         frameworks.Add(framework);
     129             :                         command.Frameworks = frameworks;
     130             :                 }
     131             : 
     132             :                 /// <summary>
     133             :                 /// Prepares and validates a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
     134             :                 /// </summary>
     135             :                 /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
     136             :                 /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
     137             :                 /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
     138           1 :                 public virtual bool PrepareAndValidateCommand<TCommand>(TCommand command, string framework)
     139             :                         where TCommand : ICommand<TAuthenticationToken>
     140             :                 {
     141             :                         Type commandType = command.GetType();
     142             : 
     143             :                         if (command.Frameworks != null && command.Frameworks.Contains(framework))
     144             :                         {
     145             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     146             :                                 if (command.Frameworks.Count() != 1)
     147             :                                 {
     148             :                                         Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
     149             :                                         return false;
     150             :                                 }
     151             :                         }
     152             : 
     153             :                         ICommandValidator<TAuthenticationToken, TCommand> commandValidator = null;
     154             :                         try
     155             :                         {
     156             :                                 commandValidator = DependencyResolver.Resolve<ICommandValidator<TAuthenticationToken, TCommand>>();
     157             :                         }
     158             :                         catch (Exception exception)
     159             :                         {
     160             :                                 Logger.LogDebug("Locating an ICommandValidator failed.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName), exception);
     161             :                         }
     162             : 
     163             :                         if (commandValidator != null && !commandValidator.IsCommandValid(command))
     164             :                         {
     165             :                                 Logger.LogInfo("The provided command is not valid.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
     166             :                                 return false;
     167             :                         }
     168             : 
     169             :                         PrepareCommand(command, framework);
     170             :                         return true;
     171             :                 }
     172             : 
     173             :                 /// <summary>
     174             :                 /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveCommandHandler"/>.
     175             :                 /// </summary>
     176             :                 /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
     177             :                 /// <param name="receiveCommandHandler">The handler method that will process the <see cref="ICommand{TAuthenticationToken}"/>.</param>
     178             :                 /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
     179             :                 /// <param name="signature">The signature of the <see cref="IMessage"/>.</param>
     180             :                 /// <param name="signingTokenConfigurationKey">The configuration key for the signing token as used by <see cref="IConfigurationManager"/>.</param>
     181             :                 /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="ICommand{TAuthenticationToken}"/> is being skipped.</param>
     182             :                 /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
     183             :                 /// <returns>The <see cref="ICommand{TAuthenticationToken}"/> that was processed.</returns>
     184           1 :                 public virtual ICommand<TAuthenticationToken> ReceiveCommand(string messageBody, Func<ICommand<TAuthenticationToken>, bool?> receiveCommandHandler, string messageId, string signature, string signingTokenConfigurationKey, Action skippedAction = null, Action lockRefreshAction = null)
     185             :                 {
     186             :                         ICommand<TAuthenticationToken> command;
     187             :                         try
     188             :                         {
     189             :                                 command = MessageSerialiser.DeserialiseCommand(messageBody);
     190             :                         }
     191             :                         catch (JsonSerializationException exception)
     192             :                         {
     193             :                                 JsonSerializationException checkException = exception;
     194             :                                 bool safeToExit = false;
     195             :                                 do
     196             :                                 {
     197             :                                         if (checkException.Message.StartsWith("Could not load assembly"))
     198             :                                         {
     199             :                                                 safeToExit = true;
     200             :                                                 break;
     201             :                                         }
     202             :                                 } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
     203             :                                 if (safeToExit)
     204             :                                 {
     205             :                                         const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
     206             :                                         Match match = new Regex(pattern).Match(exception.Message);
     207             :                                         if (match.Success)
     208             :                                         {
     209             :                                                 string[] typeParts = match.Value.Split(',');
     210             :                                                 if (typeParts.Length == 2)
     211             :                                                 {
     212             :                                                         string classType = typeParts[0];
     213             :                                                         bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
     214             : 
     215             :                                                         if (!isRequired)
     216             :                                                         {
     217             :                                                                 if (skippedAction != null)
     218             :                                                                         skippedAction();
     219             :                                                                 return null;
     220             :                                                         }
     221             :                                                 }
     222             :                                         }
     223             :                                 }
     224             :                                 throw;
     225             :                         }
     226             : 
     227             :                         string commandTypeName = command.GetType().FullName;
     228             :                         CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
     229             :                         AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
     230             :                         string identifyMessage = null;
     231             :                         var identifiedEvent = command as ICommandWithIdentity<TAuthenticationToken>;
     232             :                         if (identifiedEvent != null)
     233             :                                 identifyMessage = string.Format(" for aggregate {0}", identifiedEvent.Rsn);
     234             :                         Logger.LogInfo(string.Format("A command message arrived with the {0} was of type {1}{2}.", messageId, commandTypeName, identifyMessage));
     235             : 
     236             :                         VerifySignature(signingTokenConfigurationKey, signature, "A command", messageId, commandTypeName, identifyMessage, messageBody);
     237             :                         bool canRefresh;
     238             :                         if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", commandTypeName), out canRefresh))
     239             :                                 canRefresh = false;
     240             : 
     241             :                         if (canRefresh)
     242             :                         {
     243             :                                 if (lockRefreshAction == null)
     244             :                                         Logger.LogWarning(string.Format("A command message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, commandTypeName));
     245             :                                 else
     246             :                                         lockRefreshAction();
     247             :                         }
     248             : 
     249             :                         // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
     250             :                         bool? result = receiveCommandHandler(command);
     251             :                         if (result != null && !result.Value)
     252             :                                 if (skippedAction != null)
     253             :                                         skippedAction();
     254             : 
     255             :                         return command;
     256             :                 }
     257             : 
     258             :                 /// <summary>
     259             :                 /// The default command handler that
     260             :                 /// check if the <see cref="ICommand{TAuthenticationToken}"/> has already been processed by this framework,
     261             :                 /// checks if the <see cref="ICommand{TAuthenticationToken}"/> is required,
     262             :                 /// finds the handler from the provided <paramref name="routeManager"/>.
     263             :                 /// </summary>
     264             :                 /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to process.</param>
     265             :                 /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="ICommandHandler{TAuthenticationToken,TCommand}"/> from.</param>
     266             :                 /// <param name="framework">The current framework.</param>
     267             :                 /// <returns>
     268             :                 /// True indicates the <paramref name="command"/> was successfully handled by a handler.
     269             :                 /// False indicates the <paramref name="command"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
     270             :                 /// Null indicates the command<paramref name="command"/> wasn't handled as it was already handled.
     271             :                 /// </returns>
     272           1 :                 public virtual bool? DefaultReceiveCommand(ICommand<TAuthenticationToken> command, RouteManager routeManager, string framework)
     273             :                 {
     274             :                         Type commandType = command.GetType();
     275             : 
     276             :                         if (command.Frameworks != null && command.Frameworks.Contains(framework))
     277             :                         {
     278             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     279             :                                 if (command.Frameworks.Count() != 1)
     280             :                                 {
     281             :                                         Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\DefaultReceiveCommand({1})", GetType().FullName, commandType.FullName));
     282             :                                         return null;
     283             :                                 }
     284             :                         }
     285             : 
     286             :                         CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
     287             :                         AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
     288             : 
     289             :                         bool isRequired = BusHelper.IsEventRequired(commandType);
     290             : 
     291             :                         RouteHandlerDelegate commandHandler = routeManager.GetSingleHandler(command, isRequired);
     292             :                         // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
     293             :                         if (commandHandler == null)
     294             :                         {
     295             :                                 Logger.LogDebug(string.Format("The command handler for '{0}' is not required.", commandType.FullName));
     296             :                                 return false;
     297             :                         }
     298             : 
     299             :                         Action<IMessage> handler = commandHandler.Delegate;
     300             :                         handler(command);
     301             :                         return true;
     302             :                 }
     303             : 
     304             :                 /// <summary>
     305             :                 /// Prepares an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
     306             :                 /// </summary>
     307             :                 /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
     308             :                 /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
     309             :                 /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
     310           1 :                 public virtual void PrepareEvent<TEvent>(TEvent @event, string framework)
     311             :                         where TEvent : IEvent<TAuthenticationToken>
     312             :                 {
     313             :                         if (@event.AuthenticationToken == null || @event.AuthenticationToken.Equals(default(TAuthenticationToken)))
     314             :                                 @event.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
     315             :                         @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
     316             :                         @event.TimeStamp = DateTimeOffset.UtcNow;
     317             : 
     318             :                         if (string.IsNullOrWhiteSpace(@event.OriginatingFramework))
     319             :                                 @event.OriginatingFramework = framework;
     320             :                         var frameworks = new List<string>();
     321             :                         if (@event.Frameworks != null)
     322             :                                 frameworks.AddRange(@event.Frameworks);
     323             :                         frameworks.Add(framework);
     324             :                         @event.Frameworks = frameworks;
     325             :                 }
     326             : 
     327             :                 /// <summary>
     328             :                 /// Prepares and validates an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
     329             :                 /// </summary>
     330             :                 /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
     331             :                 /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
     332             :                 /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
     333           1 :                 public virtual bool PrepareAndValidateEvent<TEvent>(TEvent @event, string framework)
     334             :                         where TEvent : IEvent<TAuthenticationToken>
     335             :                 {
     336             :                         Type eventType = @event.GetType();
     337             : 
     338             :                         if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
     339             :                         {
     340             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     341             :                                 if (@event.Frameworks.Count() != 1)
     342             :                                 {
     343             :                                         Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, eventType.FullName));
     344             :                                         return false;
     345             :                                 }
     346             :                         }
     347             : 
     348             :                         PrepareEvent(@event, framework);
     349             :                         return true;
     350             :                 }
     351             : 
     352             :                 /// <summary>
     353             :                 /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveEventHandler"/>.
     354             :                 /// </summary>
     355             :                 /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
     356             :                 /// <param name="receiveEventHandler">The handler method that will process the <see cref="IEvent{TAuthenticationToken}"/>.</param>
     357             :                 /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
     358             :                 /// <param name="signature">The signature of the <see cref="IMessage"/>.</param>
     359             :                 /// <param name="signingTokenConfigurationKey">The configuration key for the signing token as used by <see cref="IConfigurationManager"/>.</param>
     360             :                 /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="IEvent{TAuthenticationToken}"/> is being skipped.</param>
     361             :                 /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
     362             :                 /// <returns>The <see cref="IEvent{TAuthenticationToken}"/> that was processed.</returns>
     363           1 :                 public virtual IEvent<TAuthenticationToken> ReceiveEvent(string messageBody, Func<IEvent<TAuthenticationToken>, bool?> receiveEventHandler, string messageId, string signature, string signingTokenConfigurationKey, Action skippedAction = null, Action lockRefreshAction = null)
     364             :                 {
     365             :                         IEvent<TAuthenticationToken> @event;
     366             :                         try
     367             :                         {
     368             :                                 @event = MessageSerialiser.DeserialiseEvent(messageBody);
     369             :                         }
     370             :                         catch (JsonSerializationException exception)
     371             :                         {
     372             :                                 JsonSerializationException checkException = exception;
     373             :                                 bool safeToExit = false;
     374             :                                 do
     375             :                                 {
     376             :                                         if (checkException.Message.StartsWith("Could not load assembly"))
     377             :                                         {
     378             :                                                 safeToExit = true;
     379             :                                                 break;
     380             :                                         }
     381             :                                 } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
     382             :                                 if (safeToExit)
     383             :                                 {
     384             :                                         const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
     385             :                                         Match match = new Regex(pattern).Match(exception.Message);
     386             :                                         if (match.Success)
     387             :                                         {
     388             :                                                 string[] typeParts = match.Value.Split(',');
     389             :                                                 if (typeParts.Length == 2)
     390             :                                                 {
     391             :                                                         string classType = typeParts[0];
     392             :                                                         bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
     393             : 
     394             :                                                         if (!isRequired)
     395             :                                                         {
     396             :                                                                 if (skippedAction != null)
     397             :                                                                         skippedAction();
     398             :                                                                 return null;
     399             :                                                         }
     400             :                                                 }
     401             :                                         }
     402             :                                 }
     403             :                                 throw;
     404             :                         }
     405             : 
     406             :                         string eventTypeName = @event.GetType().FullName;
     407             :                         CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
     408             :                         AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
     409             :                         object identifyMessage = null;
     410             :                         var identifiedEvent = @event as IEventWithIdentity<TAuthenticationToken>;
     411             :                         if (identifiedEvent != null)
     412             :                                 identifyMessage = string.Format(" for aggregate {0}", identifiedEvent.Rsn);
     413             :                         Logger.LogInfo(string.Format("An event message arrived with the {0} was of type {1}{2}.", messageId, eventTypeName, identifyMessage));
     414             : 
     415             :                         VerifySignature(signingTokenConfigurationKey, signature, "An event", messageId, eventTypeName, identifyMessage, messageBody);
     416             :                         bool canRefresh;
     417             :                         if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", eventTypeName), out canRefresh))
     418             :                                 if (!ConfigurationManager.TryGetSetting(DefaultMessagesShouldRefreshConfigurationKey, out canRefresh))
     419             :                                         canRefresh = false;
     420             : 
     421             :                         if (canRefresh)
     422             :                         {
     423             :                                 if (lockRefreshAction == null)
     424             :                                         Logger.LogWarning(string.Format("An event message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, eventTypeName));
     425             :                                 else
     426             :                                         lockRefreshAction();
     427             :                         }
     428             : 
     429             :                         // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
     430             :                         bool? result = receiveEventHandler(@event);
     431             :                         if (result != null && !result.Value)
     432             :                                 if (skippedAction != null)
     433             :                                         skippedAction();
     434             : 
     435             :                         return @event;
     436             :                 }
     437             : 
     438             :                 /// <summary>
     439             :                 /// Refreshes the network lock.
     440             :                 /// </summary>
     441             : #if NET452
     442             :                 public virtual void RefreshLock(CancellationTokenSource brokeredMessageRenewCancellationTokenSource, BrokeredMessage message, string type = "message")
     443             : #endif
     444             : #if NETCOREAPP3_0
     445             :                 public virtual void RefreshLock(IMessageReceiver client, CancellationTokenSource brokeredMessageRenewCancellationTokenSource, BrokeredMessage message, string type = "message")
     446             : #endif
     447             :                 {
     448             :                         Task.Factory.StartNewSafely(() =>
     449             :                         {
     450             :                                 // The capturing of ObjectDisposedException is because even the properties can throw it.
     451             :                                 try
     452             :                                 {
     453             :                                         object value;
     454             :                                         string typeName = null;
     455             :                                         if (message.TryGetUserPropertyValue("Type", out value))
     456             :                                                 typeName = value.ToString();
     457             : 
     458             :                                         long loop = long.MinValue;
     459             :                                         while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
     460             :                                         {
     461             :                                                 // Based on LockedUntilUtc property to determine if the lock expires soon
     462             :                                                 // We lock for 45 seconds to ensure any thread based issues are mitigated.
     463             : #if NET452
     464             :                                                 if (DateTime.UtcNow > message.LockedUntilUtc.AddSeconds(-45))
     465             : #endif
     466             : #if NETCOREAPP3_0
     467             :                                                 if (DateTime.UtcNow > message.ExpiresAtUtc.AddSeconds(-45))
     468             : #endif
     469             :                                                 {
     470             :                                                         // If so, renew the lock
     471             :                                                         for (int i = 0; i < 10; i++)
     472             :                                                         {
     473             :                                                                 try
     474             :                                                                 {
     475             :                                                                         if (brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
     476             :                                                                                 return;
     477             : #if NET452
     478             :                                                                         message.RenewLock();
     479             : #endif
     480             : #if NETCOREAPP3_0
     481             :                                                                         client.RenewLockAsync(message).Wait(1500);
     482             : #endif
     483             :                                                                         try
     484             :                                                                         {
     485             :                                                                                 Logger.LogDebug(string.Format("Renewed the {2} lock on {1} '{0}'.", message.MessageId, type, typeName));
     486             :                                                                         }
     487             :                                                                         catch
     488             :                                                                         {
     489             :                                                                                 Trace.TraceError("Renewed the {2} lock on {1} '{0}'.", message.MessageId, type, typeName);
     490             :                                                                         }
     491             : 
     492             :                                                                         break;
     493             :                                                                 }
     494             :                                                                 catch (ObjectDisposedException)
     495             :                                                                 {
     496             :                                                                         return;
     497             :                                                                 }
     498             :                                                                 catch (MessageLockLostException exception)
     499             :                                                                 {
     500             :                                                                         try
     501             :                                                                         {
     502             :                                                                                 Logger.LogWarning(string.Format("Renewing the {2} lock on {1} '{0}' failed as the message lock was lost.", message.MessageId, type, typeName), exception: exception);
     503             :                                                                         }
     504             :                                                                         catch
     505             :                                                                         {
     506             :                                                                                 Trace.TraceError("Renewing the {2} lock on {1} '{0}' failed as the message lock was lost.\r\n{3}", message.MessageId, type, typeName, exception.Message);
     507             :                                                                         }
     508             :                                                                         return;
     509             :                                                                 }
     510             :                                                                 catch (Exception exception)
     511             :                                                                 {
     512             :                                                                         try
     513             :                                                                         {
     514             :                                                                                 Logger.LogWarning(string.Format("Renewing the {2} lock on {1} '{0}' failed.", message.MessageId, type, typeName), exception: exception);
     515             :                                                                         }
     516             :                                                                         catch
     517             :                                                                         {
     518             :                                                                                 Trace.TraceError("Renewing the {2} lock on {1} '{0}' failed.\r\n{3}", message.MessageId, type, typeName, exception.Message);
     519             :                                                                         }
     520             :                                                                         if (i == 9)
     521             :                                                                                 return;
     522             :                                                                 }
     523             :                                                         }
     524             :                                                 }
     525             : 
     526             :                                                 if (loop++ % 5 == 0)
     527             :                                                         Thread.Yield();
     528             :                                                 else
     529             :                                                         Thread.Sleep(500);
     530             :                                                 if (loop == long.MaxValue)
     531             :                                                         loop = long.MinValue;
     532             :                                         }
     533             :                                         try
     534             :                                         {
     535             :                                                 brokeredMessageRenewCancellationTokenSource.Dispose();
     536             :                                         }
     537             :                                         catch (ObjectDisposedException) { }
     538             :                                 }
     539             :                                 catch (ObjectDisposedException) { }
     540             :                         }, brokeredMessageRenewCancellationTokenSource.Token);
     541             :                 }
     542             : 
     543             :                 /// <summary>
     544             :                 /// The default event handler that
     545             :                 /// check if the <see cref="IEvent{TAuthenticationToken}"/> has already been processed by this framework,
     546             :                 /// checks if the <see cref="IEvent{TAuthenticationToken}"/> is required,
     547             :                 /// finds the handler from the provided <paramref name="routeManager"/>.
     548             :                 /// </summary>
     549             :                 /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to process.</param>
     550             :                 /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="IEventHandler{TAuthenticationToken,TCommand}"/> from.</param>
     551             :                 /// <param name="framework">The current framework.</param>
     552             :                 /// <returns>
     553             :                 /// True indicates the <paramref name="event"/> was successfully handled by a handler.
     554             :                 /// False indicates the <paramref name="event"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
     555             :                 /// Null indicates the <paramref name="event"/> wasn't handled as it was already handled.
     556             :                 /// </returns>
     557           1 :                 public virtual bool? DefaultReceiveEvent(IEvent<TAuthenticationToken> @event, RouteManager routeManager, string framework)
     558             :                 {
     559             :                         Type eventType = @event.GetType();
     560             : 
     561             :                         if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
     562             :                         {
     563             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     564             :                                 if (@event.Frameworks.Count() != 1)
     565             :                                 {
     566             :                                         Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\DefaultReceiveEvent({1})", GetType().FullName, eventType.FullName));
     567             :                                         return null;
     568             :                                 }
     569             :                         }
     570             : 
     571             :                         CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
     572             :                         AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
     573             : 
     574             :                         bool isRequired = BusHelper.IsEventRequired(eventType);
     575             : 
     576             :                         IEnumerable<Action<IMessage>> handlers = routeManager.GetHandlers(@event, isRequired).Select(x => x.Delegate).ToList();
     577             :                         // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
     578             :                         if (!handlers.Any())
     579             :                         {
     580             :                                 Logger.LogDebug(string.Format("The event handler for '{0}' is not required.", eventType.FullName));
     581             :                                 return false;
     582             :                         }
     583             : 
     584             :                         foreach (Action<IMessage> handler in handlers)
     585             :                                 handler(@event);
     586             :                         return true;
     587             :                 }
     588             : 
     589             :                 /// <summary>
     590             :                 /// Verifies that the signature is authorised.
     591             :                 /// </summary>
     592           1 :                 protected virtual void VerifySignature(string signingTokenConfigurationKey, string signature, string messagetype, string messageId, string typeName, object identifyMessage, string messageBody)
     593             :                 {
     594             :                         if (string.IsNullOrWhiteSpace(signature))
     595             :                                 Logger.LogWarning(string.Format("{3} message arrived with the {0} was of type {1}{2} and had no signature.", messageId, typeName, identifyMessage, messagetype));
     596             :                         else
     597             :                         {
     598             :                                 bool messageIsValid = false;
     599             :                                 // see https://github.com/Chinchilla-Software-Com/CQRS/wiki/Inter-process-function-security</remarks>
     600             :                                 string configurationKey = string.Format("{0}.SigningToken", typeName);
     601             :                                 string signingToken;
     602             :                                 HashAlgorithm signer = Signer.Create();
     603             :                                 if (ConfigurationManager.TryGetSetting(configurationKey, out signingToken) && !string.IsNullOrWhiteSpace(signingToken))
     604             :                                         using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", signingToken, messageBody))))
     605             :                                                 messageIsValid = signature == Convert.ToBase64String(signer.ComputeHash(hashStream));
     606             :                                 if (!messageIsValid && ConfigurationManager.TryGetSetting(signingTokenConfigurationKey, out signingToken) && !string.IsNullOrWhiteSpace(signingToken))
     607             :                                         using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", signingToken, messageBody))))
     608             :                                                 messageIsValid = signature == Convert.ToBase64String(signer.ComputeHash(hashStream));
     609             :                                 if (!messageIsValid)
     610             :                                         using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", Guid.Empty.ToString("N"), messageBody))))
     611             :                                                 messageIsValid = signature == Convert.ToBase64String(signer.ComputeHash(hashStream));
     612             :                                 if (!messageIsValid)
     613             :                                         throw new UnAuthorisedMessageReceivedException(typeName, messageId, identifyMessage);
     614             :                         }
     615             :                 }
     616             : 
     617             :                 /// <summary>
     618             :                 /// Manually registers the provided <paramref name="handler"/> 
     619             :                 /// on the provided <paramref name="routeManger"/>
     620             :                 /// </summary>
     621             :                 /// <typeparam name="TMessage">The <see cref="Type"/> of <see cref="IMessage"/> the <paramref name="handler"/> can handle.</typeparam>
     622           1 :                 public virtual void RegisterHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
     623             :                         where TMessage : IMessage
     624             :                 {
     625             :                         Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
     626             : 
     627             :                         routeManger.RegisterHandler(registerableHandler, targetedType);
     628             : 
     629             :                         telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
     630             :                         telemetryHelper.Flush();
     631             :                 }
     632             : 
     633             :                 /// <summary>
     634             :                 /// Register an event handler that will listen and respond to all events.
     635             :                 /// </summary>
     636           1 :                 public virtual void RegisterGlobalEventHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler, bool holdMessageLock = true)
     637             :                         where TMessage : IMessage
     638             :                 {
     639             :                         Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
     640             : 
     641             :                         routeManger.RegisterGlobalEventHandler(registerableHandler);
     642             : 
     643             :                         telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterGlobalEventHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
     644             :                         telemetryHelper.Flush();
     645             :                 }
     646             :         }
     647             : }

Generated by: LCOV version 1.13