LCOV - code coverage report
Current view: top level - Azure/Cqrs.Azure.EventHub - AzureBusHelper.cs Hit Total Coverage
Test: doc-coverage.info Lines: 2 11 18.2 %
Date: 2017-07-26

          Line data    Source code
       1             : #region IMPORTANT NOTE
       2             : // This is copied almost exactly into the servicebus except for a string difference. Replicate changes there until a refactor is done.
       3             : #endregion
       4             : 
       5             : #region Copyright
       6             : // // -----------------------------------------------------------------------
       7             : // // <copyright company="cdmdotnet Limited">
       8             : // //   Copyright cdmdotnet Limited. All rights reserved.
       9             : // // </copyright>
      10             : // // -----------------------------------------------------------------------
      11             : #endregion
      12             : 
      13             : using System;
      14             : using System.Collections.Generic;
      15             : using System.Diagnostics;
      16             : using System.Linq;
      17             : using System.Text.RegularExpressions;
      18             : using System.Threading;
      19             : using System.Threading.Tasks;
      20             : using cdmdotnet.Logging;
      21             : using Cqrs.Authentication;
      22             : using Cqrs.Bus;
      23             : using Cqrs.Commands;
      24             : using Cqrs.Configuration;
      25             : using Cqrs.Events;
      26             : using Cqrs.Messages;
      27             : using Microsoft.ServiceBus.Messaging;
      28             : using Newtonsoft.Json;
      29             : 
      30             : namespace Cqrs.Azure.ServiceBus
      31             : {
      32             :         public class AzureBusHelper<TAuthenticationToken> : IAzureBusHelper<TAuthenticationToken>
      33             :         {
      34           0 :                 public AzureBusHelper(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IBusHelper busHelper, IConfigurationManager configurationManager, IDependencyResolver dependencyResolver)
      35             :                 {
      36             :                         AuthenticationTokenHelper = authenticationTokenHelper;
      37             :                         CorrelationIdHelper = correlationIdHelper;
      38             :                         Logger = logger;
      39             :                         MessageSerialiser = messageSerialiser;
      40             :                         BusHelper = busHelper;
      41             :                         DependencyResolver = dependencyResolver;
      42             :                         ConfigurationManager = configurationManager;
      43             :                 }
      44             : 
      45             :                 protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
      46             : 
      47             :                 protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
      48             : 
      49             :                 protected ILogger Logger { get; private set; }
      50             : 
      51             :                 protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
      52             : 
      53             :                 protected IBusHelper BusHelper { get; private set; }
      54             : 
      55             :                 protected IConfigurationManager ConfigurationManager { get; private set; }
      56             : 
      57             :                 protected IDependencyResolver DependencyResolver { get; private set; }
      58             : 
      59           0 :                 public virtual void PrepareCommand<TCommand>(TCommand command, string framework)
      60             :                         where TCommand : ICommand<TAuthenticationToken>
      61             :                 {
      62             :                         if (command.AuthenticationToken == null || command.AuthenticationToken.Equals(default(TAuthenticationToken)))
      63             :                                 command.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
      64             :                         command.CorrelationId = CorrelationIdHelper.GetCorrelationId();
      65             : 
      66             :                         if (string.IsNullOrWhiteSpace(command.OriginatingFramework))
      67             :                                 command.OriginatingFramework = framework;
      68             :                         var frameworks = new List<string>();
      69             :                         if (command.Frameworks != null)
      70             :                                 frameworks.AddRange(command.Frameworks);
      71             :                         frameworks.Add(framework);
      72             :                         command.Frameworks = frameworks;
      73             :                 }
      74             : 
      75           0 :                 public virtual bool PrepareAndValidateCommand<TCommand>(TCommand command, string framework)
      76             :                         where TCommand : ICommand<TAuthenticationToken>
      77             :                 {
      78             :                         Type commandType = command.GetType();
      79             : 
      80             :                         if (command.Frameworks != null && command.Frameworks.Contains(framework))
      81             :                         {
      82             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
      83             :                                 if (command.Frameworks.Count() != 1)
      84             :                                 {
      85             :                                         Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
      86             :                                         return false;
      87             :                                 }
      88             :                         }
      89             : 
      90             :                         ICommandValidator<TAuthenticationToken, TCommand> commandValidator = null;
      91             :                         try
      92             :                         {
      93             :                                 commandValidator = DependencyResolver.Resolve<ICommandValidator<TAuthenticationToken, TCommand>>();
      94             :                         }
      95             :                         catch (Exception exception)
      96             :                         {
      97             :                                 Logger.LogDebug("Locating an ICommandValidator failed.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName), exception);
      98             :                         }
      99             : 
     100             :                         if (commandValidator != null && !commandValidator.IsCommandValid(command))
     101             :                         {
     102             :                                 Logger.LogInfo("The provided command is not valid.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
     103             :                                 return false;
     104             :                         }
     105             : 
     106             :                         PrepareCommand(command, framework);
     107             :                         return true;
     108             :                 }
     109             : 
     110           0 :                 public virtual ICommand<TAuthenticationToken> ReceiveCommand(string messageBody, Func<ICommand<TAuthenticationToken>, bool?> receiveCommandHandler, string messageId, Action skippedAction = null, Action lockRefreshAction = null)
     111             :                 {
     112             :                         ICommand<TAuthenticationToken> command;
     113             :                         try
     114             :                         {
     115             :                                 command = MessageSerialiser.DeserialiseCommand(messageBody);
     116             :                         }
     117             :                         catch (JsonSerializationException exception)
     118             :                         {
     119             :                                 JsonSerializationException checkException = exception;
     120             :                                 bool safeToExit = false;
     121             :                                 do
     122             :                                 {
     123             :                                         if (checkException.Message.StartsWith("Could not load assembly"))
     124             :                                         {
     125             :                                                 safeToExit = true;
     126             :                                                 break;
     127             :                                         }
     128             :                                 } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
     129             :                                 if (safeToExit)
     130             :                                 {
     131             :                                         const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
     132             :                                         Match match = new Regex(pattern).Match(exception.Message);
     133             :                                         if (match.Success)
     134             :                                         {
     135             :                                                 string[] typeParts = match.Value.Split(',');
     136             :                                                 if (typeParts.Length == 2)
     137             :                                                 {
     138             :                                                         string classType = typeParts[0];
     139             :                                                         bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
     140             : 
     141             :                                                         if (!isRequired)
     142             :                                                         {
     143             :                                                                 if (skippedAction != null)
     144             :                                                                         skippedAction();
     145             :                                                                 return null;
     146             :                                                         }
     147             :                                                 }
     148             :                                         }
     149             :                                 }
     150             :                                 throw;
     151             :                         }
     152             : 
     153             :                         string commandTypeName = command.GetType().FullName;
     154             :                         CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
     155             :                         Logger.LogInfo(string.Format("A command message arrived with the {0} was of type {1}.", messageId, commandTypeName));
     156             : 
     157             :                         bool canRefresh;
     158             :                         if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", commandTypeName), out canRefresh))
     159             :                                 canRefresh = false;
     160             : 
     161             :                         if (canRefresh)
     162             :                         {
     163             :                                 if (lockRefreshAction == null)
     164             :                                         Logger.LogWarning(string.Format("A command message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, commandTypeName));
     165             :                                 else
     166             :                                         lockRefreshAction();
     167             :                         }
     168             : 
     169             :                         // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
     170             :                         bool? result = receiveCommandHandler(command);
     171             :                         if (result != null && !result.Value)
     172             :                                 if (skippedAction != null)
     173             :                                         skippedAction();
     174             : 
     175             :                         return command;
     176             :                 }
     177             : 
     178             :                 /// <returns>
     179             :                 /// True indicates the <paramref name="command"/> was successfully handled by a handler.
     180             :                 /// False indicates the <paramref name="command"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
     181             :                 /// Null indicates the command<paramref name="command"/> wasn't handled as it was already handled.
     182             :                 /// </returns>
     183           2 :                 public virtual bool? DefaultReceiveCommand(ICommand<TAuthenticationToken> command, RouteManager routeManager, string framework)
     184             :                 {
     185             :                         Type commandType = command.GetType();
     186             : 
     187             :                         if (command.Frameworks != null && command.Frameworks.Contains(framework))
     188             :                         {
     189             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     190             :                                 if (command.Frameworks.Count() != 1)
     191             :                                 {
     192             :                                         Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\DefaultReceiveCommand({1})", GetType().FullName, commandType.FullName));
     193             :                                         return null;
     194             :                                 }
     195             :                         }
     196             : 
     197             :                         CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
     198             :                         AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
     199             : 
     200             :                         bool isRequired = BusHelper.IsEventRequired(commandType);
     201             : 
     202             :                         RouteHandlerDelegate commandHandler = routeManager.GetSingleHandler(command, isRequired);
     203             :                         // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
     204             :                         if (commandHandler == null)
     205             :                         {
     206             :                                 Logger.LogDebug(string.Format("The command handler for '{0}' is not required.", commandType.FullName));
     207             :                                 return false;
     208             :                         }
     209             : 
     210             :                         Action<IMessage> handler = commandHandler.Delegate;
     211             :                         handler(command);
     212             :                         return true;
     213             :                 }
     214             : 
     215           0 :                 public virtual void PrepareEvent<TEvent>(TEvent @event, string framework)
     216             :                         where TEvent : IEvent<TAuthenticationToken>
     217             :                 {
     218             :                         if (@event.AuthenticationToken == null || @event.AuthenticationToken.Equals(default(TAuthenticationToken)))
     219             :                                 @event.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
     220             :                         @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
     221             :                         @event.TimeStamp = DateTimeOffset.UtcNow;
     222             : 
     223             :                         if (string.IsNullOrWhiteSpace(@event.OriginatingFramework))
     224             :                                 @event.OriginatingFramework = framework;
     225             :                         var frameworks = new List<string>();
     226             :                         if (@event.Frameworks != null)
     227             :                                 frameworks.AddRange(@event.Frameworks);
     228             :                         frameworks.Add(framework);
     229             :                         @event.Frameworks = frameworks;
     230             :                 }
     231             : 
     232           0 :                 public virtual bool PrepareAndValidateEvent<TEvent>(TEvent @event, string framework)
     233             :                         where TEvent : IEvent<TAuthenticationToken>
     234             :                 {
     235             :                         Type eventType = @event.GetType();
     236             : 
     237             :                         if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
     238             :                         {
     239             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     240             :                                 if (@event.Frameworks.Count() != 1)
     241             :                                 {
     242             :                                         Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, eventType.FullName));
     243             :                                         return false;
     244             :                                 }
     245             :                         }
     246             : 
     247             :                         PrepareEvent(@event, framework);
     248             :                         return true;
     249             :                 }
     250             : 
     251           0 :                 public virtual IEvent<TAuthenticationToken> ReceiveEvent(string messageBody, Func<IEvent<TAuthenticationToken>, bool?> receiveEventHandler, string messageId, Action skippedAction = null, Action lockRefreshAction = null)
     252             :                 {
     253             :                         IEvent<TAuthenticationToken> @event;
     254             :                         try
     255             :                         {
     256             :                                 @event = MessageSerialiser.DeserialiseEvent(messageBody);
     257             :                         }
     258             :                         catch (JsonSerializationException exception)
     259             :                         {
     260             :                                 JsonSerializationException checkException = exception;
     261             :                                 bool safeToExit = false;
     262             :                                 do
     263             :                                 {
     264             :                                         if (checkException.Message.StartsWith("Could not load assembly"))
     265             :                                         {
     266             :                                                 safeToExit = true;
     267             :                                                 break;
     268             :                                         }
     269             :                                 } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
     270             :                                 if (safeToExit)
     271             :                                 {
     272             :                                         const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
     273             :                                         Match match = new Regex(pattern).Match(exception.Message);
     274             :                                         if (match.Success)
     275             :                                         {
     276             :                                                 string[] typeParts = match.Value.Split(',');
     277             :                                                 if (typeParts.Length == 2)
     278             :                                                 {
     279             :                                                         string classType = typeParts[0];
     280             :                                                         bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
     281             : 
     282             :                                                         if (!isRequired)
     283             :                                                         {
     284             :                                                                 if (skippedAction != null)
     285             :                                                                         skippedAction();
     286             :                                                                 return null;
     287             :                                                         }
     288             :                                                 }
     289             :                                         }
     290             :                                 }
     291             :                                 throw;
     292             :                         }
     293             : 
     294             :                         string eventTypeName = @event.GetType().FullName;
     295             :                         CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
     296             :                         Logger.LogInfo(string.Format("An event message arrived with the {0} was of type {1}.", messageId, eventTypeName));
     297             : 
     298             :                         bool canRefresh;
     299             :                         if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", eventTypeName), out canRefresh))
     300             :                                 canRefresh = false;
     301             : 
     302             :                         if (canRefresh)
     303             :                         {
     304             :                                 if (lockRefreshAction == null)
     305             :                                         Logger.LogWarning(string.Format("An event message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, eventTypeName));
     306             :                                 else
     307             :                                         lockRefreshAction();
     308             :                         }
     309             : 
     310             :                         // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
     311             :                         bool? result = receiveEventHandler(@event);
     312             :                         if (result != null && !result.Value)
     313             :                                 if (skippedAction != null)
     314             :                                         skippedAction();
     315             : 
     316             :                         return @event;
     317             :                 }
     318             : 
     319           0 :                 public virtual void RefreshLock(CancellationTokenSource brokeredMessageRenewCancellationTokenSource, BrokeredMessage message, string type = "message")
     320             :                 {
     321             :                         Task.Factory.StartNewSafely(() =>
     322             :                         {
     323             :                                 // The capturing of ObjectDisposedException is because even the properties can throw it.
     324             :                                 try
     325             :                                 {
     326             :                                         long loop = long.MinValue;
     327             :                                         while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
     328             :                                         {
     329             :                                                 // Based on LockedUntilUtc property to determine if the lock expires soon
     330             :                                                 // We lock for 45 seconds to ensure any thread based issues are mitigated.
     331             :                                                 if (DateTime.UtcNow > message.LockedUntilUtc.AddSeconds(-45))
     332             :                                                 {
     333             :                                                         // If so, renew the lock
     334             :                                                         for (int i = 0; i < 10; i++)
     335             :                                                         {
     336             :                                                                 try
     337             :                                                                 {
     338             :                                                                         message.RenewLock();
     339             :                                                                         try
     340             :                                                                         {
     341             :                                                                                 Logger.LogDebug(string.Format("Renewed the lock on {1} '{0}'.", message.MessageId, type));
     342             :                                                                         }
     343             :                                                                         catch
     344             :                                                                         {
     345             :                                                                                 Trace.TraceError("Renewed the lock on {1} '{0}'.", message.MessageId, type);
     346             :                                                                         }
     347             : 
     348             :                                                                         break;
     349             :                                                                 }
     350             :                                                                 catch (ObjectDisposedException)
     351             :                                                                 {
     352             :                                                                         return;
     353             :                                                                 }
     354             :                                                                 catch (MessageLockLostException exception)
     355             :                                                                 {
     356             :                                                                         try
     357             :                                                                         {
     358             :                                                                                 Logger.LogWarning(string.Format("Renewing the lock on {1} '{0}' failed as the message lock was lost.", message.MessageId, type), exception: exception);
     359             :                                                                         }
     360             :                                                                         catch
     361             :                                                                         {
     362             :                                                                                 Trace.TraceError("Renewing the lock on {1} '{0}' failed as the message lock was lost.\r\n{2}", message.MessageId, type, exception.Message);
     363             :                                                                         }
     364             :                                                                         return;
     365             :                                                                 }
     366             :                                                                 catch (Exception exception)
     367             :                                                                 {
     368             :                                                                         try
     369             :                                                                         {
     370             :                                                                                 Logger.LogWarning(string.Format("Renewing the lock on {1} '{0}' failed.", message.MessageId, type), exception: exception);
     371             :                                                                         }
     372             :                                                                         catch
     373             :                                                                         {
     374             :                                                                                 Trace.TraceError("Renewing the lock on {1} '{0}' failed.\r\n{2}", message.MessageId, type, exception.Message);
     375             :                                                                         }
     376             :                                                                         if (i == 9)
     377             :                                                                                 return;
     378             :                                                                 }
     379             :                                                         }
     380             :                                                 }
     381             : 
     382             :                                                 if (loop++ % 5 == 0)
     383             :                                                         Thread.Yield();
     384             :                                                 else
     385             :                                                         Thread.Sleep(500);
     386             :                                                 if (loop == long.MaxValue)
     387             :                                                         loop = long.MinValue;
     388             :                                         }
     389             :                                         try
     390             :                                         {
     391             :                                                 brokeredMessageRenewCancellationTokenSource.Dispose();
     392             :                                         }
     393             :                                         catch (ObjectDisposedException) { }
     394             :                                 }
     395             :                                 catch (ObjectDisposedException) { }
     396             :                         }, brokeredMessageRenewCancellationTokenSource.Token);
     397             :                 }
     398             : 
     399             :                 /// <returns>
     400             :                 /// True indicates the <paramref name="event"/> was successfully handled by a handler.
     401             :                 /// False indicates the <paramref name="event"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
     402             :                 /// Null indicates the <paramref name="event"/> wasn't handled as it was already handled.
     403             :                 /// </returns>
     404           2 :                 public virtual bool? DefaultReceiveEvent(IEvent<TAuthenticationToken> @event, RouteManager routeManager, string framework)
     405             :                 {
     406             :                         Type eventType = @event.GetType();
     407             : 
     408             :                         if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
     409             :                         {
     410             :                                 // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
     411             :                                 if (@event.Frameworks.Count() != 1)
     412             :                                 {
     413             :                                         Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\DefaultReceiveEvent({1})", GetType().FullName, eventType.FullName));
     414             :                                         return null;
     415             :                                 }
     416             :                         }
     417             : 
     418             :                         CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
     419             :                         AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
     420             : 
     421             :                         bool isRequired = BusHelper.IsEventRequired(eventType);
     422             : 
     423             :                         IEnumerable<Action<IMessage>> handlers = routeManager.GetHandlers(@event, isRequired).Select(x => x.Delegate).ToList();
     424             :                         // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
     425             :                         if (!handlers.Any())
     426             :                         {
     427             :                                 Logger.LogDebug(string.Format("The event handler for '{0}' is not required.", eventType.FullName));
     428             :                                 return false;
     429             :                         }
     430             : 
     431             :                         foreach (Action<IMessage> handler in handlers)
     432             :                                 handler(@event);
     433             :                         return true;
     434             :                 }
     435             : 
     436           0 :                 public virtual void RegisterHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
     437             :                         where TMessage : IMessage
     438             :                 {
     439             :                         Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
     440             : 
     441             :                         routeManger.RegisterHandler(registerableHandler, targetedType);
     442             : 
     443             :                         telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
     444             :                         telemetryHelper.Flush();
     445             :                 }
     446             :         }
     447             : }

Generated by: LCOV version 1.10