Documentation Coverage Report
Current view: top level - Azure/Cqrs.Azure.EventHub - AzureBusHelper.cs Hit Total Coverage
Version: 2.2 Artefacts: 12 12 100.0 %
Date: 2017-09-22

          Line data    Source code
       1             : #region IMPORTANT NOTE
       2             : // This is copied almost exactly into the servicebus except for a string difference. Replicate changes there until a refactor is done.
       3             : #endregion
       4             : 
       5             : #region Copyright
       6             : // // -----------------------------------------------------------------------
       7             : // // <copyright company="Chinchilla Software Limited">
       8             : // //   Copyright Chinchilla Software Limited. All rights reserved.
       9             : // // </copyright>
      10             : // // -----------------------------------------------------------------------
      11             : #endregion
      12             : 
      13             : using System;
      14             : using System.Collections.Generic;
      15             : using System.Diagnostics;
      16             : using System.Linq;
      17             : using System.Text.RegularExpressions;
      18             : using System.Threading;
      19             : using System.Threading.Tasks;
      20             : using cdmdotnet.Logging;
      21             : using Cqrs.Authentication;
      22             : using Cqrs.Bus;
      23             : using Cqrs.Commands;
      24             : using Cqrs.Configuration;
      25             : using Cqrs.Events;
      26             : using Cqrs.Messages;
      27             : using Microsoft.ServiceBus.Messaging;
      28             : using Newtonsoft.Json;
      29             : 
      30             : namespace Cqrs.Azure.ServiceBus
      31             : {
      32             :         /// <summary>
      33             :         /// A helper for Azure Service Bus and Event Hub.
      34             :         /// </summary>
      35             :         /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
      36             :         public class AzureBusHelper<TAuthenticationToken> : IAzureBusHelper<TAuthenticationToken>
      37             :         {
      38             :                 /// <summary>
      39             :                 /// Instantiates a new instance of <see cref="AzureBusHelper{TAuthenticationToken}"/>.
      40             :                 /// </summary>
      41           2 :                 public AzureBusHelper(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IBusHelper busHelper, IConfigurationManager configurationManager, IDependencyResolver dependencyResolver)
      42             :                 {
      43             :                         AuthenticationTokenHelper = authenticationTokenHelper;
      44             :                         CorrelationIdHelper = correlationIdHelper;
      45             :                         Logger = logger;
      46             :                         MessageSerialiser = messageSerialiser;
      47             :                         BusHelper = busHelper;
      48             :                         DependencyResolver = dependencyResolver;
      49             :                         ConfigurationManager = configurationManager;
      50             :                 }
      51             : 
      52             :                 /// <summary>
      53             :                 /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
      54             :                 /// </summary>
      55             :                 protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
      56             : 
      57             :                 /// <summary>
      58             :                 /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
      59             :                 /// </summary>
      60             :                 protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
      61             : 
      62             :                 /// <summary>
      63             :                 /// Gets or sets the <see cref="ILogger"/>.
      64             :                 /// </summary>
      65             :                 protected ILogger Logger { get; private set; }
      66             : 
      67             :                 /// <summary>
      68             :                 /// Gets or sets the <see cref="IMessageSerialiser{TAuthenticationToken}"/>.
      69             :                 /// </summary>
      70             :                 protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
      71             : 
      72             :                 /// <summary>
      73             :                 /// Gets or sets the <see cref="IBusHelper"/>.
      74             :                 /// </summary>
      75             :                 protected IBusHelper BusHelper { get; private set; }
      76             : 
      77             :                 /// <summary>
      78             :                 /// Gets or sets the <see cref="IConfigurationManager"/>.
      79             :                 /// </summary>
      80             :                 protected IConfigurationManager ConfigurationManager { get; private set; }
      81             : 
      82             :                 /// <summary>
      83             :                 /// Gets or sets the <see cref="IDependencyResolver"/>.
      84             :                 /// </summary>
      85             :                 protected IDependencyResolver DependencyResolver { get; private set; }
      86             : 
      87             :                 /// <summary>
      88             :                 /// Prepares a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
      89             :                 /// </summary>
      90             :                 /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
      91             :                 /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
      92             :                 /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
      93           2 :                 public virtual void PrepareCommand<TCommand>(TCommand command, string framework)
      94             :                         where TCommand : ICommand<TAuthenticationToken>
      95             :                 {
      96             :                         if (command.AuthenticationToken == null || command.AuthenticationToken.Equals(default(TAuthenticationToken)))
      97             :                                 command.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
      98             :                         command.CorrelationId = CorrelationIdHelper.GetCorrelationId();
      99             : 
     100             :                         if (string.IsNullOrWhiteSpace(command.OriginatingFramework))
     101             :                                 command.OriginatingFramework = framework;
     102             :                         var frameworks = new List<string>();
     103             :                         if (command.Frameworks != null)
     104             :                                 frameworks.AddRange(command.Frameworks);
     105             :                         frameworks.Add(framework);
     106             :                         command.Frameworks = frameworks;
     107             :                 }
     108             : 
     109             :                 /// <summary>
     110             :                 /// Prepares and validates a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
     111             :                 /// </summary>
     112             :                 /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
     113             :                 /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
     114             :                 /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
     115           2 :                 public virtual bool PrepareAndValidateCommand<TCommand>(TCommand command, string framework)
     116             :                         where TCommand : ICommand<TAuthenticationToken>
     117             :                 {
     118             :                         Type commandType = command.GetType();
     119             : 
     120             :                         if (command.Frameworks != null && command.Frameworks.Contains(framework))
     121             :                         {
     122             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     123             :                                 if (command.Frameworks.Count() != 1)
     124             :                                 {
     125             :                                         Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
     126             :                                         return false;
     127             :                                 }
     128             :                         }
     129             : 
     130             :                         ICommandValidator<TAuthenticationToken, TCommand> commandValidator = null;
     131             :                         try
     132             :                         {
     133             :                                 commandValidator = DependencyResolver.Resolve<ICommandValidator<TAuthenticationToken, TCommand>>();
     134             :                         }
     135             :                         catch (Exception exception)
     136             :                         {
     137             :                                 Logger.LogDebug("Locating an ICommandValidator failed.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName), exception);
     138             :                         }
     139             : 
     140             :                         if (commandValidator != null && !commandValidator.IsCommandValid(command))
     141             :                         {
     142             :                                 Logger.LogInfo("The provided command is not valid.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
     143             :                                 return false;
     144             :                         }
     145             : 
     146             :                         PrepareCommand(command, framework);
     147             :                         return true;
     148             :                 }
     149             : 
     150             :                 /// <summary>
     151             :                 /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveCommandHandler"/>.
     152             :                 /// </summary>
     153             :                 /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
     154             :                 /// <param name="receiveCommandHandler">The handler method that will process the <see cref="ICommand{TAuthenticationToken}"/>.</param>
     155             :                 /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
     156             :                 /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="ICommand{TAuthenticationToken}"/> is being skipped.</param>
     157             :                 /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
     158             :                 /// <returns>The <see cref="ICommand{TAuthenticationToken}"/> that was processed.</returns>
     159           2 :                 public virtual ICommand<TAuthenticationToken> ReceiveCommand(string messageBody, Func<ICommand<TAuthenticationToken>, bool?> receiveCommandHandler, string messageId, Action skippedAction = null, Action lockRefreshAction = null)
     160             :                 {
     161             :                         ICommand<TAuthenticationToken> command;
     162             :                         try
     163             :                         {
     164             :                                 command = MessageSerialiser.DeserialiseCommand(messageBody);
     165             :                         }
     166             :                         catch (JsonSerializationException exception)
     167             :                         {
     168             :                                 JsonSerializationException checkException = exception;
     169             :                                 bool safeToExit = false;
     170             :                                 do
     171             :                                 {
     172             :                                         if (checkException.Message.StartsWith("Could not load assembly"))
     173             :                                         {
     174             :                                                 safeToExit = true;
     175             :                                                 break;
     176             :                                         }
     177             :                                 } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
     178             :                                 if (safeToExit)
     179             :                                 {
     180             :                                         const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
     181             :                                         Match match = new Regex(pattern).Match(exception.Message);
     182             :                                         if (match.Success)
     183             :                                         {
     184             :                                                 string[] typeParts = match.Value.Split(',');
     185             :                                                 if (typeParts.Length == 2)
     186             :                                                 {
     187             :                                                         string classType = typeParts[0];
     188             :                                                         bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
     189             : 
     190             :                                                         if (!isRequired)
     191             :                                                         {
     192             :                                                                 if (skippedAction != null)
     193             :                                                                         skippedAction();
     194             :                                                                 return null;
     195             :                                                         }
     196             :                                                 }
     197             :                                         }
     198             :                                 }
     199             :                                 throw;
     200             :                         }
     201             : 
     202             :                         string commandTypeName = command.GetType().FullName;
     203             :                         CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
     204             :                         AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
     205             :                         Logger.LogInfo(string.Format("A command message arrived with the {0} was of type {1}.", messageId, commandTypeName));
     206             : 
     207             :                         bool canRefresh;
     208             :                         if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", commandTypeName), out canRefresh))
     209             :                                 canRefresh = false;
     210             : 
     211             :                         if (canRefresh)
     212             :                         {
     213             :                                 if (lockRefreshAction == null)
     214             :                                         Logger.LogWarning(string.Format("A command message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, commandTypeName));
     215             :                                 else
     216             :                                         lockRefreshAction();
     217             :                         }
     218             : 
     219             :                         // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
     220             :                         bool? result = receiveCommandHandler(command);
     221             :                         if (result != null && !result.Value)
     222             :                                 if (skippedAction != null)
     223             :                                         skippedAction();
     224             : 
     225             :                         return command;
     226             :                 }
     227             : 
     228             :                 /// <summary>
     229             :                 /// The default command handler that
     230             :                 /// check if the <see cref="ICommand{TAuthenticationToken}"/> has already been processed by this framework,
     231             :                 /// checks if the <see cref="ICommand{TAuthenticationToken}"/> is required,
     232             :                 /// finds the handler from the provided <paramref name="routeManager"/>.
     233             :                 /// </summary>
     234             :                 /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to process.</param>
     235             :                 /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="ICommandHandler{TAuthenticationToken,TCommand}"/> from.</param>
     236             :                 /// <param name="framework">The current framework.</param>
     237             :                 /// <returns>
     238             :                 /// True indicates the <paramref name="command"/> was successfully handled by a handler.
     239             :                 /// False indicates the <paramref name="command"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
     240             :                 /// Null indicates the command<paramref name="command"/> wasn't handled as it was already handled.
     241             :                 /// </returns>
     242           2 :                 public virtual bool? DefaultReceiveCommand(ICommand<TAuthenticationToken> command, RouteManager routeManager, string framework)
     243             :                 {
     244             :                         Type commandType = command.GetType();
     245             : 
     246             :                         if (command.Frameworks != null && command.Frameworks.Contains(framework))
     247             :                         {
     248             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     249             :                                 if (command.Frameworks.Count() != 1)
     250             :                                 {
     251             :                                         Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\DefaultReceiveCommand({1})", GetType().FullName, commandType.FullName));
     252             :                                         return null;
     253             :                                 }
     254             :                         }
     255             : 
     256             :                         CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
     257             :                         AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
     258             : 
     259             :                         bool isRequired = BusHelper.IsEventRequired(commandType);
     260             : 
     261             :                         RouteHandlerDelegate commandHandler = routeManager.GetSingleHandler(command, isRequired);
     262             :                         // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
     263             :                         if (commandHandler == null)
     264             :                         {
     265             :                                 Logger.LogDebug(string.Format("The command handler for '{0}' is not required.", commandType.FullName));
     266             :                                 return false;
     267             :                         }
     268             : 
     269             :                         Action<IMessage> handler = commandHandler.Delegate;
     270             :                         handler(command);
     271             :                         return true;
     272             :                 }
     273             : 
     274             :                 /// <summary>
     275             :                 /// Prepares an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
     276             :                 /// </summary>
     277             :                 /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
     278             :                 /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
     279             :                 /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
     280           2 :                 public virtual void PrepareEvent<TEvent>(TEvent @event, string framework)
     281             :                         where TEvent : IEvent<TAuthenticationToken>
     282             :                 {
     283             :                         if (@event.AuthenticationToken == null || @event.AuthenticationToken.Equals(default(TAuthenticationToken)))
     284             :                                 @event.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
     285             :                         @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
     286             :                         @event.TimeStamp = DateTimeOffset.UtcNow;
     287             : 
     288             :                         if (string.IsNullOrWhiteSpace(@event.OriginatingFramework))
     289             :                                 @event.OriginatingFramework = framework;
     290             :                         var frameworks = new List<string>();
     291             :                         if (@event.Frameworks != null)
     292             :                                 frameworks.AddRange(@event.Frameworks);
     293             :                         frameworks.Add(framework);
     294             :                         @event.Frameworks = frameworks;
     295             :                 }
     296             : 
     297             :                 /// <summary>
     298             :                 /// Prepares and validates an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
     299             :                 /// </summary>
     300             :                 /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
     301             :                 /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
     302             :                 /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
     303           2 :                 public virtual bool PrepareAndValidateEvent<TEvent>(TEvent @event, string framework)
     304             :                         where TEvent : IEvent<TAuthenticationToken>
     305             :                 {
     306             :                         Type eventType = @event.GetType();
     307             : 
     308             :                         if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
     309             :                         {
     310             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     311             :                                 if (@event.Frameworks.Count() != 1)
     312             :                                 {
     313             :                                         Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, eventType.FullName));
     314             :                                         return false;
     315             :                                 }
     316             :                         }
     317             : 
     318             :                         PrepareEvent(@event, framework);
     319             :                         return true;
     320             :                 }
     321             : 
     322             :                 /// <summary>
     323             :                 /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveEventHandler"/>.
     324             :                 /// </summary>
     325             :                 /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
     326             :                 /// <param name="receiveEventHandler">The handler method that will process the <see cref="IEvent{TAuthenticationToken}"/>.</param>
     327             :                 /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
     328             :                 /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="IEvent{TAuthenticationToken}"/> is being skipped.</param>
     329             :                 /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
     330             :                 /// <returns>The <see cref="IEvent{TAuthenticationToken}"/> that was processed.</returns>
     331           2 :                 public virtual IEvent<TAuthenticationToken> ReceiveEvent(string messageBody, Func<IEvent<TAuthenticationToken>, bool?> receiveEventHandler, string messageId, Action skippedAction = null, Action lockRefreshAction = null)
     332             :                 {
     333             :                         IEvent<TAuthenticationToken> @event;
     334             :                         try
     335             :                         {
     336             :                                 @event = MessageSerialiser.DeserialiseEvent(messageBody);
     337             :                         }
     338             :                         catch (JsonSerializationException exception)
     339             :                         {
     340             :                                 JsonSerializationException checkException = exception;
     341             :                                 bool safeToExit = false;
     342             :                                 do
     343             :                                 {
     344             :                                         if (checkException.Message.StartsWith("Could not load assembly"))
     345             :                                         {
     346             :                                                 safeToExit = true;
     347             :                                                 break;
     348             :                                         }
     349             :                                 } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
     350             :                                 if (safeToExit)
     351             :                                 {
     352             :                                         const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
     353             :                                         Match match = new Regex(pattern).Match(exception.Message);
     354             :                                         if (match.Success)
     355             :                                         {
     356             :                                                 string[] typeParts = match.Value.Split(',');
     357             :                                                 if (typeParts.Length == 2)
     358             :                                                 {
     359             :                                                         string classType = typeParts[0];
     360             :                                                         bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
     361             : 
     362             :                                                         if (!isRequired)
     363             :                                                         {
     364             :                                                                 if (skippedAction != null)
     365             :                                                                         skippedAction();
     366             :                                                                 return null;
     367             :                                                         }
     368             :                                                 }
     369             :                                         }
     370             :                                 }
     371             :                                 throw;
     372             :                         }
     373             : 
     374             :                         string eventTypeName = @event.GetType().FullName;
     375             :                         CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
     376             :                         AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
     377             :                         Logger.LogInfo(string.Format("An event message arrived with the {0} was of type {1}.", messageId, eventTypeName));
     378             : 
     379             :                         bool canRefresh;
     380             :                         if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", eventTypeName), out canRefresh))
     381             :                                 canRefresh = false;
     382             : 
     383             :                         if (canRefresh)
     384             :                         {
     385             :                                 if (lockRefreshAction == null)
     386             :                                         Logger.LogWarning(string.Format("An event message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, eventTypeName));
     387             :                                 else
     388             :                                         lockRefreshAction();
     389             :                         }
     390             : 
     391             :                         // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
     392             :                         bool? result = receiveEventHandler(@event);
     393             :                         if (result != null && !result.Value)
     394             :                                 if (skippedAction != null)
     395             :                                         skippedAction();
     396             : 
     397             :                         return @event;
     398             :                 }
     399             : 
     400             :                 /// <summary>
     401             :                 /// Refreshes the network lock.
     402             :                 /// </summary>
     403           2 :                 public virtual void RefreshLock(CancellationTokenSource brokeredMessageRenewCancellationTokenSource, BrokeredMessage message, string type = "message")
     404             :                 {
     405             :                         Task.Factory.StartNewSafely(() =>
     406             :                         {
     407             :                                 // The capturing of ObjectDisposedException is because even the properties can throw it.
     408             :                                 try
     409             :                                 {
     410             :                                         long loop = long.MinValue;
     411             :                                         while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
     412             :                                         {
     413             :                                                 // Based on LockedUntilUtc property to determine if the lock expires soon
     414             :                                                 // We lock for 45 seconds to ensure any thread based issues are mitigated.
     415             :                                                 if (DateTime.UtcNow > message.LockedUntilUtc.AddSeconds(-45))
     416             :                                                 {
     417             :                                                         // If so, renew the lock
     418             :                                                         for (int i = 0; i < 10; i++)
     419             :                                                         {
     420             :                                                                 try
     421             :                                                                 {
     422             :                                                                         message.RenewLock();
     423             :                                                                         try
     424             :                                                                         {
     425             :                                                                                 Logger.LogDebug(string.Format("Renewed the lock on {1} '{0}'.", message.MessageId, type));
     426             :                                                                         }
     427             :                                                                         catch
     428             :                                                                         {
     429             :                                                                                 Trace.TraceError("Renewed the lock on {1} '{0}'.", message.MessageId, type);
     430             :                                                                         }
     431             : 
     432             :                                                                         break;
     433             :                                                                 }
     434             :                                                                 catch (ObjectDisposedException)
     435             :                                                                 {
     436             :                                                                         return;
     437             :                                                                 }
     438             :                                                                 catch (MessageLockLostException exception)
     439             :                                                                 {
     440             :                                                                         try
     441             :                                                                         {
     442             :                                                                                 Logger.LogWarning(string.Format("Renewing the lock on {1} '{0}' failed as the message lock was lost.", message.MessageId, type), exception: exception);
     443             :                                                                         }
     444             :                                                                         catch
     445             :                                                                         {
     446             :                                                                                 Trace.TraceError("Renewing the lock on {1} '{0}' failed as the message lock was lost.\r\n{2}", message.MessageId, type, exception.Message);
     447             :                                                                         }
     448             :                                                                         return;
     449             :                                                                 }
     450             :                                                                 catch (Exception exception)
     451             :                                                                 {
     452             :                                                                         try
     453             :                                                                         {
     454             :                                                                                 Logger.LogWarning(string.Format("Renewing the lock on {1} '{0}' failed.", message.MessageId, type), exception: exception);
     455             :                                                                         }
     456             :                                                                         catch
     457             :                                                                         {
     458             :                                                                                 Trace.TraceError("Renewing the lock on {1} '{0}' failed.\r\n{2}", message.MessageId, type, exception.Message);
     459             :                                                                         }
     460             :                                                                         if (i == 9)
     461             :                                                                                 return;
     462             :                                                                 }
     463             :                                                         }
     464             :                                                 }
     465             : 
     466             :                                                 if (loop++ % 5 == 0)
     467             :                                                         Thread.Yield();
     468             :                                                 else
     469             :                                                         Thread.Sleep(500);
     470             :                                                 if (loop == long.MaxValue)
     471             :                                                         loop = long.MinValue;
     472             :                                         }
     473             :                                         try
     474             :                                         {
     475             :                                                 brokeredMessageRenewCancellationTokenSource.Dispose();
     476             :                                         }
     477             :                                         catch (ObjectDisposedException) { }
     478             :                                 }
     479             :                                 catch (ObjectDisposedException) { }
     480             :                         }, brokeredMessageRenewCancellationTokenSource.Token);
     481             :                 }
     482             : 
     483             :                 /// <summary>
     484             :                 /// The default event handler that
     485             :                 /// check if the <see cref="IEvent{TAuthenticationToken}"/> has already been processed by this framework,
     486             :                 /// checks if the <see cref="IEvent{TAuthenticationToken}"/> is required,
     487             :                 /// finds the handler from the provided <paramref name="routeManager"/>.
     488             :                 /// </summary>
     489             :                 /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to process.</param>
     490             :                 /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="IEventHandler{TAuthenticationToken,TCommand}"/> from.</param>
     491             :                 /// <param name="framework">The current framework.</param>
     492             :                 /// <returns>
     493             :                 /// True indicates the <paramref name="event"/> was successfully handled by a handler.
     494             :                 /// False indicates the <paramref name="event"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
     495             :                 /// Null indicates the <paramref name="event"/> wasn't handled as it was already handled.
     496             :                 /// </returns>
     497           2 :                 public virtual bool? DefaultReceiveEvent(IEvent<TAuthenticationToken> @event, RouteManager routeManager, string framework)
     498             :                 {
     499             :                         Type eventType = @event.GetType();
     500             : 
     501             :                         if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
     502             :                         {
     503             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     504             :                                 if (@event.Frameworks.Count() != 1)
     505             :                                 {
     506             :                                         Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\DefaultReceiveEvent({1})", GetType().FullName, eventType.FullName));
     507             :                                         return null;
     508             :                                 }
     509             :                         }
     510             : 
     511             :                         CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
     512             :                         AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
     513             : 
     514             :                         bool isRequired = BusHelper.IsEventRequired(eventType);
     515             : 
     516             :                         IEnumerable<Action<IMessage>> handlers = routeManager.GetHandlers(@event, isRequired).Select(x => x.Delegate).ToList();
     517             :                         // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
     518             :                         if (!handlers.Any())
     519             :                         {
     520             :                                 Logger.LogDebug(string.Format("The event handler for '{0}' is not required.", eventType.FullName));
     521             :                                 return false;
     522             :                         }
     523             : 
     524             :                         foreach (Action<IMessage> handler in handlers)
     525             :                                 handler(@event);
     526             :                         return true;
     527             :                 }
     528             : 
     529             :                 /// <summary>
     530             :                 /// Manually registers the provided <paramref name="handler"/> 
     531             :                 /// on the provided <paramref name="routeManger"/>
     532             :                 /// </summary>
     533             :                 /// <typeparam name="TMessage">The <see cref="Type"/> of <see cref="IMessage"/> the <paramref name="handler"/> can handle.</typeparam>
     534           2 :                 public virtual void RegisterHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
     535             :                         where TMessage : IMessage
     536             :                 {
     537             :                         Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
     538             : 
     539             :                         routeManger.RegisterHandler(registerableHandler, targetedType);
     540             : 
     541             :                         telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
     542             :                         telemetryHelper.Flush();
     543             :                 }
     544             : 
     545             :                 /// <summary>
     546             :                 /// Register an event handler that will listen and respond to all events.
     547             :                 /// </summary>
     548           2 :                 public void RegisterGlobalEventHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler,
     549             :                         bool holdMessageLock = true) where TMessage : IMessage
     550             :                 {
     551             :                         Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
     552             : 
     553             :                         routeManger.RegisterGlobalEventHandler(registerableHandler);
     554             : 
     555             :                         telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterGlobalEventHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
     556             :                         telemetryHelper.Flush();
     557             :                 }
     558             :         }
     559             : }

Generated by: LCOV version 1.10