|           Line data    Source code 
       1             : #region Copyright
       2             : // // -----------------------------------------------------------------------
       3             : // // <copyright company="Chinchilla Software Limited">
       4             : // //   Copyright Chinchilla Software Limited. All rights reserved.
       5             : // // </copyright>
       6             : // // -----------------------------------------------------------------------
       7             : #endregion
       8             : 
       9             : using System;
      10             : using System.Collections.Generic;
      11             : using System.Configuration;
      12             : using System.Diagnostics;
      13             : using System.IO;
      14             : using System.Linq;
      15             : using System.Reflection;
      16             : using System.Security.Cryptography;
      17             : using System.Text;
      18             : using System.Threading;
      19             : using System.Threading.Tasks;
      20             : using cdmdotnet.Logging;
      21             : using Cqrs.Authentication;
      22             : using Cqrs.Bus;
      23             : using Cqrs.Configuration;
      24             : using Cqrs.Exceptions;
      25             : using Cqrs.Messages;
      26             : using Microsoft.ServiceBus;
      27             : using Microsoft.ServiceBus.Messaging;
      28             : 
      29             : namespace Cqrs.Azure.ServiceBus
      30             : {
      31             :         /// <summary>
      32             :         /// An <see cref="AzureBus{TAuthenticationToken}"/> that uses Azure Service Bus.
      33             :         /// </summary>
      34             :         /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
      35             :         public abstract class AzureServiceBus<TAuthenticationToken>
      36             :                 : AzureBus<TAuthenticationToken>
      37           1 :         {
      38             :                 /// <summary>
      39             :                 /// Gets the private <see cref="TopicClient"/> publisher.
      40             :                 /// </summary>
      41             :                 protected TopicClient PrivateServiceBusPublisher { get; private set; }
      42             : 
      43             :                 /// <summary>
      44             :                 /// Gets the public <see cref="TopicClient"/> publisher.
      45             :                 /// </summary>
      46             :                 protected TopicClient PublicServiceBusPublisher { get; private set; }
      47             : 
      48             :                 /// <summary>
      49             :                 /// Gets the private <see cref="SubscriptionClient"/> receivers.
      50             :                 /// </summary>
      51             :                 protected IDictionary<int, SubscriptionClient> PrivateServiceBusReceivers { get; private set; }
      52             : 
      53             :                 /// <summary>
      54             :                 /// Gets the public <see cref="SubscriptionClient"/> receivers.
      55             :                 /// </summary>
      56             :                 protected IDictionary<int, SubscriptionClient> PublicServiceBusReceivers { get; private set; }
      57             : 
      58             :                 /// <summary>
      59             :                 /// The name of the private topic.
      60             :                 /// </summary>
      61             :                 protected string PrivateTopicName { get; private set; }
      62             : 
      63             :                 /// <summary>
      64             :                 /// The name of the public topic.
      65             :                 /// </summary>
      66             :                 protected string PublicTopicName { get; private set; }
      67             : 
      68             :                 /// <summary>
      69             :                 /// The name of the subscription in the private topic.
      70             :                 /// </summary>
      71             :                 protected string PrivateTopicSubscriptionName { get; private set; }
      72             : 
      73             :                 /// <summary>
      74             :                 /// The name of the subscription in the public topic.
      75             :                 /// </summary>
      76             :                 protected string PublicTopicSubscriptionName { get; private set; }
      77             : 
      78             :                 /// <summary>
      79             :                 /// The configuration key for the message bus connection string as used by <see cref="IConfigurationManager"/>.
      80             :                 /// </summary>
      81             :                 protected abstract string MessageBusConnectionStringConfigurationKey { get; }
      82             : 
      83             :                 /// <summary>
      84             :                 /// The configuration key for the signing token as used by <see cref="IConfigurationManager"/>.
      85             :                 /// </summary>
      86             :                 protected abstract string SigningTokenConfigurationKey { get; }
      87             : 
      88             :                 /// <summary>
      89             :                 /// The configuration key for the name of the private topic as used by <see cref="IConfigurationManager"/>.
      90             :                 /// </summary>
      91             :                 protected abstract string PrivateTopicNameConfigurationKey { get; }
      92             : 
      93             :                 /// <summary>
      94             :                 /// The configuration key for the name of the public topic as used by <see cref="IConfigurationManager"/>.
      95             :                 /// </summary>
      96             :                 protected abstract string PublicTopicNameConfigurationKey { get; }
      97             : 
      98             :                 /// <summary>
      99             :                 /// The default name of the private topic if no <see cref="IConfigurationManager"/> value is set.
     100             :                 /// </summary>
     101             :                 protected abstract string DefaultPrivateTopicName { get; }
     102             : 
     103             :                 /// <summary>
     104             :                 /// The default name of the public topic if no <see cref="IConfigurationManager"/> value is set.
     105             :                 /// </summary>
     106             :                 protected abstract string DefaultPublicTopicName { get; }
     107             : 
     108             :                 /// <summary>
     109             :                 /// The configuration key for the name of the subscription in the private topic as used by <see cref="IConfigurationManager"/>.
     110             :                 /// </summary>
     111             :                 protected abstract string PrivateTopicSubscriptionNameConfigurationKey { get; }
     112             : 
     113             :                 /// <summary>
     114             :                 /// The configuration key for the name of the subscription in the public topic as used by <see cref="IConfigurationManager"/>.
     115             :                 /// </summary>
     116             :                 protected abstract string PublicTopicSubscriptionNameConfigurationKey { get; }
     117             : 
     118             :                 /// <summary>
     119             :                 /// The configuration key that
     120             :                 /// specifies if an <see cref="Exception"/> is thrown if the network lock is lost
     121             :                 /// as used by <see cref="IConfigurationManager"/>.
     122             :                 /// </summary>
     123             :                 protected abstract string ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey { get; }
     124             : 
     125             :                 /// <summary>
     126             :                 /// Specifies if an <see cref="Exception"/> is thrown if the network lock is lost.
     127             :                 /// </summary>
     128             :                 protected bool ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete { get; private set; }
     129             : 
     130             :                 /// <summary>
     131             :                 /// The default name of the subscription in the private topic if no <see cref="IConfigurationManager"/> value is set.
     132             :                 /// </summary>
     133             :                 protected const string DefaultPrivateTopicSubscriptionName = "Root";
     134             : 
     135             :                 /// <summary>
     136             :                 /// The default name of the subscription in the public topic if no <see cref="IConfigurationManager"/> value is set.
     137             :                 /// </summary>
     138             :                 protected const string DefaultPublicTopicSubscriptionName = "Root";
     139             : 
     140             :                 /// <summary>
     141             :                 /// The <see cref="Action{TBrokeredMessage}">handler</see> used for <see cref="SubscriptionClient.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage}, OnMessageOptions)"/> on each receiver.
     142             :                 /// </summary>
     143             :                 protected Action<BrokeredMessage> ReceiverMessageHandler { get; set; }
     144             : 
     145             :                 /// <summary>
     146             :                 /// The <see cref="OnMessageOptions" /> used for <see cref="SubscriptionClient.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage}, OnMessageOptions)"/> on each receiver.
     147             :                 /// </summary>
     148             :                 protected OnMessageOptions ReceiverMessageHandlerOptions { get; set; }
     149             : 
     150             :                 /// <summary>
     151             :                 /// Gets the <see cref="IBusHelper"/>.
     152             :                 /// </summary>
     153             :                 protected IBusHelper BusHelper { get; private set; }
     154             : 
     155             :                 /// <summary>
     156             :                 /// Gets the <see cref="IAzureBusHelper{TAuthenticationToken}"/>.
     157             :                 /// </summary>
     158             :                 protected IAzureBusHelper<TAuthenticationToken> AzureBusHelper { get; private set; }
     159             : 
     160             :                 /// <summary>
     161             :                 /// Gets the <see cref="ITelemetryHelper"/>.
     162             :                 /// </summary>
     163             :                 protected ITelemetryHelper TelemetryHelper { get; set; }
     164             : 
     165             :                 /// <summary>
     166             :                 /// The maximum number of time a retry is tried if a <see cref="System.TimeoutException"/> is thrown while sending messages.
     167             :                 /// </summary>
     168             :                 protected short TimeoutOnSendRetryMaximumCount { get; private set; }
     169             : 
     170             :                 /// <summary>
     171             :                 /// The <see cref="IHashAlgorithmFactory"/> to use to sign messages.
     172             :                 /// </summary>
     173             :                 protected IHashAlgorithmFactory Signer { get; private set; }
     174             : 
     175             :                 /// <summary>
     176             :                 /// A list of namespaces to exclude when trying to automatically determine the container.
     177             :                 /// </summary>
     178             :                 protected IList<string> ExclusionNamespaces { get; private set; }
     179             : 
     180             :                 /// <summary>
     181             :                 /// Instantiates a new instance of <see cref="AzureServiceBus{TAuthenticationToken}"/>
     182             :                 /// </summary>
     183           1 :                 protected AzureServiceBus(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper, IHashAlgorithmFactory hashAlgorithmFactory, bool isAPublisher)
     184             :                         : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, isAPublisher)
     185             :                 {
     186             :                         AzureBusHelper = azureBusHelper;
     187             :                         BusHelper = busHelper;
     188             :                         TelemetryHelper = new NullTelemetryHelper();
     189             :                         PrivateServiceBusReceivers = new Dictionary<int, SubscriptionClient>();
     190             :                         PublicServiceBusReceivers = new Dictionary<int, SubscriptionClient>();
     191             :                         TimeoutOnSendRetryMaximumCount = 1;
     192             :                         string timeoutOnSendRetryMaximumCountValue;
     193             :                         short timeoutOnSendRetryMaximumCount;
     194             :                         if (ConfigurationManager.TryGetSetting("Cqrs.Azure.Servicebus.TimeoutOnSendRetryMaximumCount", out timeoutOnSendRetryMaximumCountValue) && !string.IsNullOrWhiteSpace(timeoutOnSendRetryMaximumCountValue) && short.TryParse(timeoutOnSendRetryMaximumCountValue, out timeoutOnSendRetryMaximumCount))
     195             :                                 TimeoutOnSendRetryMaximumCount = timeoutOnSendRetryMaximumCount;
     196             :                         ExclusionNamespaces = new SynchronizedCollection<string> { "Cqrs", "System" };
     197             :                         Signer = hashAlgorithmFactory;
     198             :                 }
     199             : 
     200             :                 #region Overrides of AzureBus<TAuthenticationToken>
     201             : 
     202             :                 /// <summary>
     203             :                 /// Gets the connection string for the bus from <see cref="AzureBus{TAuthenticationToken}.ConfigurationManager"/>
     204             :                 /// </summary>
     205           1 :                 protected override string GetConnectionString()
     206             :                 {
     207             :                         string connectionString = ConfigurationManager.GetSetting(MessageBusConnectionStringConfigurationKey);
     208             :                         if (string.IsNullOrWhiteSpace(connectionString))
     209             :                                 throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and has a valid connection string value.", MessageBusConnectionStringConfigurationKey));
     210             :                         return connectionString;
     211             :                 }
     212             : 
     213             :                 #endregion
     214             : 
     215             :                 /// <summary>
     216             :                 /// Instantiate publishing on this bus by
     217             :                 /// calling <see cref="CheckPrivateTopicExists"/> and <see cref="CheckPublicTopicExists"/>
     218             :                 /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
     219             :                 /// </summary>
     220           1 :                 protected override void InstantiatePublishing()
     221             :                 {
     222             :                         NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
     223             :                         CheckPrivateTopicExists(namespaceManager);
     224             :                         CheckPublicTopicExists(namespaceManager);
     225             : 
     226             :                         PrivateServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PrivateTopicName);
     227             :                         PublicServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PublicTopicName);
     228             :                         StartSettingsChecking();
     229             :                 }
     230             : 
     231             :                 /// <summary>
     232             :                 /// Instantiate receiving on this bus by
     233             :                 /// calling <see cref="CheckPrivateTopicExists"/> and <see cref="CheckPublicTopicExists"/>
     234             :                 /// then InstantiateReceiving for private and public topics,
     235             :                 /// calls <see cref="CleanUpDeadLetters"/> for the private and public topics,
     236             :                 /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
     237             :                 /// </summary>
     238           1 :                 protected override void InstantiateReceiving()
     239             :                 {
     240             :                         NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
     241             : 
     242             :                         CheckPrivateTopicExists(namespaceManager);
     243             :                         CheckPublicTopicExists(namespaceManager);
     244             : 
     245             :                         try
     246             :                         {
     247             :                                 InstantiateReceiving(namespaceManager, PrivateServiceBusReceivers, PrivateTopicName, PrivateTopicSubscriptionName);
     248             :                         }
     249             :                         catch (UriFormatException exception)
     250             :                         {
     251             :                                 throw new InvalidConfigurationException("The connection string for one of the private Service Bus receivers may be invalid.", exception);
     252             :                         }
     253             :                         try
     254             :                         {
     255             :                                 InstantiateReceiving(namespaceManager, PublicServiceBusReceivers, PublicTopicName, PublicTopicSubscriptionName);
     256             :                         }
     257             :                         catch (UriFormatException exception)
     258             :                         {
     259             :                                 throw new InvalidConfigurationException("The connection string for one of the public Service Bus receivers may be invalid.", exception);
     260             :                         }
     261             : 
     262             :                         bool enableDeadLetterCleanUp;
     263             :                         string enableDeadLetterCleanUpValue = ConfigurationManager.GetSetting("Cqrs.Azure.Servicebus.EnableDeadLetterCleanUp");
     264             :                         if (bool.TryParse(enableDeadLetterCleanUpValue, out enableDeadLetterCleanUp) && enableDeadLetterCleanUp)
     265             :                         {
     266             :                                 CleanUpDeadLetters(PrivateTopicName, PrivateTopicSubscriptionName);
     267             :                                 CleanUpDeadLetters(PublicTopicName, PublicTopicSubscriptionName);
     268             :                         }
     269             : 
     270             :                         // If this is also a publisher, then it will the check over there and that will handle this
     271             :                         // we only need to check one of these
     272             :                         if (PublicServiceBusPublisher != null)
     273             :                                 return;
     274             : 
     275             :                         StartSettingsChecking();
     276             :                 }
     277             : 
     278             :                 /// <summary>
     279             :                 /// Creates <see cref="AzureBus{TAuthenticationToken}.NumberOfReceiversCount"/> <see cref="SubscriptionClient"/>.
     280             :                 /// If flushing is required, any flushed <see cref="SubscriptionClient"/> has <see cref="ClientEntity.Close()"/> called on it first.
     281             :                 /// </summary>
     282             :                 /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
     283             :                 /// <param name="serviceBusReceivers">The receivers collection to place <see cref="SubscriptionClient"/> instances into.</param>
     284             :                 /// <param name="topicName">The topic name.</param>
     285             :                 /// <param name="topicSubscriptionName">The topic subscription name.</param>
     286           1 :                 protected virtual void InstantiateReceiving(NamespaceManager namespaceManager, IDictionary<int, SubscriptionClient> serviceBusReceivers, string topicName, string topicSubscriptionName)
     287             :                 {
     288             :                         for (int i = 0; i < NumberOfReceiversCount; i++)
     289             :                         {
     290             :                                 SubscriptionClient serviceBusReceiver = SubscriptionClient.CreateFromConnectionString(ConnectionString, topicName, topicSubscriptionName);
     291             :                                 if (serviceBusReceivers.ContainsKey(i))
     292             :                                         serviceBusReceivers[i] = serviceBusReceiver;
     293             :                                 else
     294             :                                         serviceBusReceivers.Add(i, serviceBusReceiver);
     295             :                         }
     296             :                         // Remove any if the number has decreased
     297             :                         for (int i = NumberOfReceiversCount; i < serviceBusReceivers.Count; i++)
     298             :                         {
     299             :                                 SubscriptionClient serviceBusReceiver;
     300             :                                 if (serviceBusReceivers.TryGetValue(i, out serviceBusReceiver))
     301             :                                         serviceBusReceiver.Close();
     302             :                                 serviceBusReceivers.Remove(i);
     303             :                         }
     304             :                 }
     305             : 
     306             :                 /// <summary>
     307             :                 /// Checks if the private topic and subscription name exists as per <see cref="PrivateTopicName"/> and <see cref="PrivateTopicSubscriptionName"/>.
     308             :                 /// </summary>
     309             :                 /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
     310           1 :                 protected virtual void CheckPrivateTopicExists(NamespaceManager namespaceManager)
     311             :                 {
     312             :                         CheckTopicExists(namespaceManager, PrivateTopicName = ConfigurationManager.GetSetting(PrivateTopicNameConfigurationKey) ?? DefaultPrivateTopicName, PrivateTopicSubscriptionName = ConfigurationManager.GetSetting(PrivateTopicSubscriptionNameConfigurationKey) ?? DefaultPrivateTopicSubscriptionName);
     313             :                 }
     314             : 
     315             :                 /// <summary>
     316             :                 /// Checks if the public topic and subscription name exists as per <see cref="PublicTopicName"/> and <see cref="PublicTopicSubscriptionName"/>.
     317             :                 /// </summary>
     318             :                 /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
     319           1 :                 protected virtual void CheckPublicTopicExists(NamespaceManager namespaceManager)
     320             :                 {
     321             :                         CheckTopicExists(namespaceManager, PublicTopicName = ConfigurationManager.GetSetting(PublicTopicNameConfigurationKey) ?? DefaultPublicTopicName, PublicTopicSubscriptionName = ConfigurationManager.GetSetting(PublicTopicSubscriptionNameConfigurationKey) ?? DefaultPublicTopicSubscriptionName);
     322             :                 }
     323             : 
     324             :                 /// <summary>
     325             :                 /// Checks if a topic by the provided <paramref name="topicName"/> exists and
     326             :                 /// Checks if a subscription name by the provided <paramref name="subscriptionName"/> exists.
     327             :                 /// </summary>
     328           1 :                 protected virtual void CheckTopicExists(NamespaceManager namespaceManager, string topicName, string subscriptionName)
     329             :                 {
     330             :                         // Configure Queue Settings
     331             :                         var eventTopicDescription = new TopicDescription(topicName)
     332             :                         {
     333             :                                 MaxSizeInMegabytes = 5120,
     334             :                                 DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
     335             :                                 EnablePartitioning = true,
     336             :                                 EnableBatchedOperations = true
     337             :                         };
     338             :                         // Create the topic if it does not exist already
     339             :                         if (!namespaceManager.TopicExists(eventTopicDescription.Path))
     340             :                                 namespaceManager.CreateTopic(eventTopicDescription);
     341             : 
     342             :                         if (!namespaceManager.SubscriptionExists(eventTopicDescription.Path, subscriptionName))
     343             :                                 namespaceManager.CreateSubscription
     344             :                                 (
     345             :                                         new SubscriptionDescription(eventTopicDescription.Path, subscriptionName)
     346             :                                         {
     347             :                                                 DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
     348             :                                                 EnableBatchedOperations = true,
     349             :                                                 EnableDeadLetteringOnFilterEvaluationExceptions = true
     350             :                                         }
     351             :                                 );
     352             :                 }
     353             : 
     354             :                 /// <summary>
     355             :                 /// Triggers settings checking on both public and private publishers and receivers,
     356             :                 /// then calls <see cref="InstantiatePublishing"/> if <see cref="PublicServiceBusPublisher"/> is not null.
     357             :                 /// </summary>
     358           1 :                 protected override void TriggerSettingsChecking()
     359             :                 {
     360             :                         // First refresh the EventBlackListProcessing property
     361             :                         bool throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
     362             :                         if (!ConfigurationManager.TryGetSetting(ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey, out throwExceptionOnReceiverMessageLockLostExceptionDuringComplete))
     363             :                                 throwExceptionOnReceiverMessageLockLostExceptionDuringComplete = true;
     364             :                         ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete = throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
     365             : 
     366             :                         TriggerSettingsChecking(PrivateServiceBusPublisher, PrivateServiceBusReceivers);
     367             :                         TriggerSettingsChecking(PublicServiceBusPublisher, PublicServiceBusReceivers);
     368             : 
     369             :                         // Restart configuration, we order this intentionally with the publisher second as if this triggers the cancellation there's nothing else to process here
     370             :                         // we also only need to check one of the publishers
     371             :                         if (PublicServiceBusPublisher != null)
     372             :                         {
     373             :                                 Logger.LogDebug("Recursively calling into InstantiatePublishing.");
     374             :                                 InstantiatePublishing();
     375             :                         }
     376             :                 }
     377             : 
     378             :                 /// <summary>
     379             :                 /// Triggers settings checking on the provided <paramref name="serviceBusPublisher"/> and <paramref name="serviceBusReceivers"/>,
     380             :                 /// then calls <see cref="InstantiateReceiving()"/>.
     381             :                 /// </summary>
     382           1 :                 protected virtual void TriggerSettingsChecking(TopicClient serviceBusPublisher, IDictionary<int, SubscriptionClient> serviceBusReceivers)
     383             :                 {
     384             :                         // Let's wrap up using this message bus and start the switch
     385             :                         if (serviceBusPublisher != null)
     386             :                         {
     387             :                                 serviceBusPublisher.Close();
     388             :                                 Logger.LogDebug("Publishing service bus closed.");
     389             :                         }
     390             :                         foreach (SubscriptionClient serviceBusReceiver in serviceBusReceivers.Values)
     391             :                         {
     392             :                                 // Let's wrap up using this message bus and start the switch
     393             :                                 if (serviceBusReceiver != null)
     394             :                                 {
     395             :                                         serviceBusReceiver.Close();
     396             :                                         Logger.LogDebug("Receiving service bus closed.");
     397             :                                 }
     398             :                                 // Restart configuration, we order this intentionally with the receiver first as if this triggers the cancellation we know this isn't a publisher as well
     399             :                                 if (serviceBusReceiver != null)
     400             :                                 {
     401             :                                         Logger.LogDebug("Recursively calling into InstantiateReceiving.");
     402             :                                         InstantiateReceiving();
     403             : 
     404             :                                         // This will be the case of a connection setting change re-connection
     405             :                                         if (ReceiverMessageHandler != null && ReceiverMessageHandlerOptions != null)
     406             :                                         {
     407             :                                                 // Callback to handle received messages
     408             :                                                 Logger.LogDebug("Re-registering onMessage handler.");
     409             :                                                 ApplyReceiverMessageHandler();
     410             :                                         }
     411             :                                         else
     412             :                                                 Logger.LogWarning("No onMessage handler was found to re-bind.");
     413             :                                 }
     414             :                         }
     415             :                 }
     416             : 
     417             :                 /// <summary>
     418             :                 /// Registers the provided <paramref name="receiverMessageHandler"/> with the provided <paramref name="receiverMessageHandlerOptions"/>.
     419             :                 /// </summary>
     420           1 :                 protected virtual void RegisterReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
     421             :                 {
     422             :                         StoreReceiverMessageHandler(receiverMessageHandler, receiverMessageHandlerOptions);
     423             : 
     424             :                         ApplyReceiverMessageHandler();
     425             :                 }
     426             : 
     427             :                 /// <summary>
     428             :                 /// Stores the provided <paramref name="receiverMessageHandler"/> and <paramref name="receiverMessageHandlerOptions"/>.
     429             :                 /// </summary>
     430           1 :                 protected virtual void StoreReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
     431             :                 {
     432             :                         ReceiverMessageHandler = receiverMessageHandler;
     433             :                         ReceiverMessageHandlerOptions = receiverMessageHandlerOptions;
     434             :                 }
     435             : 
     436             :                 /// <summary>
     437             :                 /// Applies the stored ReceiverMessageHandler and ReceiverMessageHandlerOptions to all receivers in
     438             :                 /// <see cref="PrivateServiceBusReceivers"/> and <see cref="PublicServiceBusReceivers"/>.
     439             :                 /// </summary>
     440           1 :                 protected override void ApplyReceiverMessageHandler()
     441             :                 {
     442             :                         foreach (SubscriptionClient serviceBusReceiver in PrivateServiceBusReceivers.Values)
     443             :                                 serviceBusReceiver
     444             :                                         .OnMessage
     445             :                                         (
     446             :                                                 message =>
     447             :                                                 {
     448             :                                                         BusHelper.SetWasPrivateBusUsed(true);
     449             :                                                         ReceiverMessageHandler(message);
     450             :                                                 },
     451             :                                                 ReceiverMessageHandlerOptions
     452             :                                         );
     453             :                         foreach (SubscriptionClient serviceBusReceiver in PublicServiceBusReceivers.Values)
     454             :                                 serviceBusReceiver
     455             :                                         .OnMessage
     456             :                                         (
     457             :                                                 message =>
     458             :                                                 {
     459             :                                                         BusHelper.SetWasPrivateBusUsed(false);
     460             :                                                         ReceiverMessageHandler(message);
     461             :                                                 },
     462             :                                                 ReceiverMessageHandlerOptions
     463             :                                         );
     464             :                 }
     465             : 
     466             :                 /// <summary>
     467             :                 /// Using a <see cref="Task"/>, clears all dead letters from the topic and subscription of the 
     468             :                 /// provided <paramref name="topicName"/> and <paramref name="topicSubscriptionName"/>.
     469             :                 /// </summary>
     470             :                 /// <param name="topicName">The name of the topic.</param>
     471             :                 /// <param name="topicSubscriptionName">The name of the subscription.</param>
     472             :                 /// <returns></returns>
     473           1 :                 protected virtual CancellationTokenSource CleanUpDeadLetters(string topicName, string topicSubscriptionName)
     474             :                 {
     475             :                         var brokeredMessageRenewCancellationTokenSource = new CancellationTokenSource();
     476             :                         IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
     477             :                         int lockIssues = 0;
     478             : 
     479             :                         Action<BrokeredMessage, IMessage> leaveDeadlLetterInQueue = (deadLetterBrokeredMessage, deadLetterMessage) =>
     480             :                         {
     481             :                                 // Remove message from queue
     482             :                                 try
     483             :                                 {
     484             :                                         deadLetterBrokeredMessage.Abandon();
     485             :                                         lockIssues = 0;
     486             :                                 }
     487             :                                 catch (MessageLockLostException)
     488             :                                 {
     489             :                                         lockIssues++;
     490             :                                         Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
     491             :                                 }
     492             :                                 Logger.LogDebug(string.Format("A dead-letter message of type {0} arrived with the id '{1}' but left in the queue due to settings.", deadLetterMessage.GetType().FullName, deadLetterBrokeredMessage.MessageId));
     493             :                         };
     494             :                         Action<BrokeredMessage> removeDeadlLetterFromQueue = deadLetterBrokeredMessage =>
     495             :                         {
     496             :                                 // Remove message from queue
     497             :                                 try
     498             :                                 {
     499             :                                         deadLetterBrokeredMessage.Complete();
     500             :                                         lockIssues = 0;
     501             :                                 }
     502             :                                 catch (MessageLockLostException)
     503             :                                 {
     504             :                                         lockIssues++;
     505             :                                         Logger.LogWarning(string.Format("The lock supplied for complete for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
     506             :                                 }
     507             :                                 Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}' but was removed as processing was skipped due to settings.", deadLetterBrokeredMessage.MessageId));
     508             :                         };
     509             : 
     510             :                         Task.Factory.StartNewSafely(() =>
     511             :                         {
     512             :                                 int loop = 0;
     513             :                                 while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
     514             :                                 {
     515             :                                         lockIssues = 0;
     516             :                                         MessagingFactory factory = MessagingFactory.CreateFromConnectionString(ConnectionString);
     517             :                                         string deadLetterPath = SubscriptionClient.FormatDeadLetterPath(topicName, topicSubscriptionName);
     518             :                                         MessageReceiver client = factory.CreateMessageReceiver(deadLetterPath, ReceiveMode.PeekLock);
     519             : 
     520             :                                         IEnumerable<BrokeredMessage> brokeredMessages = client.ReceiveBatch(1000);
     521             : 
     522             :                                         foreach (BrokeredMessage brokeredMessage in brokeredMessages)
     523             :                                         {
     524             :                                                 if (lockIssues > 10)
     525             :                                                         break;
     526             :                                                 try
     527             :                                                 {
     528             :                                                         Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}'.", brokeredMessage.MessageId));
     529             :                                                         string messageBody = brokeredMessage.GetBody<string>();
     530             : 
     531             :                                                         // Closure protection
     532             :                                                         BrokeredMessage message = brokeredMessage;
     533             :                                                         try
     534             :                                                         {
     535             :                                                                 AzureBusHelper.ReceiveEvent
     536             :                                                                 (
     537             :                                                                         messageBody,
     538             :                                                                         @event =>
     539             :                                                                         {
     540             :                                                                                 bool isRequired = BusHelper.IsEventRequired(@event.GetType());
     541             :                                                                                 if (!isRequired)
     542             :                                                                                         removeDeadlLetterFromQueue(message);
     543             :                                                                                 else
     544             :                                                                                         leaveDeadlLetterInQueue(message, @event);
     545             :                                                                                 return true;
     546             :                                                                         },
     547             :                                                                         string.Format("id '{0}'", brokeredMessage.MessageId),
     548             :                                                                         ExtractSignature(message),
     549             :                                                                         SigningTokenConfigurationKey,
     550             :                                                                         () =>
     551             :                                                                         {
     552             :                                                                                 removeDeadlLetterFromQueue(message);
     553             :                                                                         },
     554             :                                                                         () => { }
     555             :                                                                 );
     556             :                                                         }
     557             :                                                         catch
     558             :                                                         {
     559             :                                                                 AzureBusHelper.ReceiveCommand
     560             :                                                                 (
     561             :                                                                         messageBody,
     562             :                                                                         command =>
     563             :                                                                         {
     564             :                                                                                 bool isRequired = BusHelper.IsEventRequired(command.GetType());
     565             :                                                                                 if (!isRequired)
     566             :                                                                                         removeDeadlLetterFromQueue(message);
     567             :                                                                                 else
     568             :                                                                                         leaveDeadlLetterInQueue(message, command);
     569             :                                                                                 return true;
     570             :                                                                         },
     571             :                                                                         string.Format("id '{0}'", brokeredMessage.MessageId),
     572             :                                                                         ExtractSignature(message),
     573             :                                                                         SigningTokenConfigurationKey,
     574             :                                                                         () =>
     575             :                                                                         {
     576             :                                                                                 removeDeadlLetterFromQueue(message);
     577             :                                                                         },
     578             :                                                                         () => { }
     579             :                                                                 );
     580             :                                                         }
     581             :                                                 }
     582             :                                                 catch (Exception exception)
     583             :                                                 {
     584             :                                                         TelemetryHelper.TrackException(exception, null, telemetryProperties);
     585             :                                                         // Indicates a problem, unlock message in queue
     586             :                                                         Logger.LogError(string.Format("A dead-letter message arrived with the id '{0}' but failed to be process.", brokeredMessage.MessageId), exception: exception);
     587             :                                                         try
     588             :                                                         {
     589             :                                                                 brokeredMessage.Abandon();
     590             :                                                         }
     591             :                                                         catch (MessageLockLostException)
     592             :                                                         {
     593             :                                                                 lockIssues++;
     594             :                                                                 Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", brokeredMessage.MessageId));
     595             :                                                         }
     596             :                                                 }
     597             :                                         }
     598             : 
     599             :                                         client.Close();
     600             : 
     601             :                                         if (loop++ % 5 == 0)
     602             :                                         {
     603             :                                                 loop = 0;
     604             :                                                 Thread.Yield();
     605             :                                         }
     606             :                                         else
     607             :                                                 Thread.Sleep(500);
     608             :                                 }
     609             :                                 try
     610             :                                 {
     611             :                                         brokeredMessageRenewCancellationTokenSource.Dispose();
     612             :                                 }
     613             :                                 catch (ObjectDisposedException) { }
     614             :                         }, brokeredMessageRenewCancellationTokenSource.Token);
     615             : 
     616             :                         return brokeredMessageRenewCancellationTokenSource;
     617             :                 }
     618             : 
     619             :                 /// <summary>
     620             :                 /// Create a <see cref="BrokeredMessage"/> with additional properties to aid routing and tracing
     621             :                 /// </summary>
     622           1 :                 protected virtual BrokeredMessage CreateBrokeredMessage<TMessage>(Func<TMessage, string> serialiserFunction, Type messageType, TMessage message)
     623             :                 {
     624             :                         string messageBody = serialiserFunction(message);
     625             :                         var brokeredMessage = new BrokeredMessage(messageBody)
     626             :                         {
     627             :                                 CorrelationId = CorrelationIdHelper.GetCorrelationId().ToString("N")
     628             :                         };
     629             :                         brokeredMessage.Properties.Add("CorrelationId", brokeredMessage.CorrelationId);
     630             :                         brokeredMessage.Properties.Add("Type", messageType.FullName);
     631             :                         brokeredMessage.Properties.Add("Source", string.Format("{0}/{1}/{2}/{3}", Logger.LoggerSettings.ModuleName, Logger.LoggerSettings.Instance, Logger.LoggerSettings.Environment, Logger.LoggerSettings.EnvironmentInstance));
     632             : 
     633             :                         // see https://github.com/Chinchilla-Software-Com/CQRS/wiki/Inter-process-function-security</remarks>
     634             :                         string configurationKey = string.Format("{0}.SigningToken", messageType.FullName);
     635             :                         string signingToken;
     636             :                         HashAlgorithm signer = Signer.Create();
     637             :                         if (!ConfigurationManager.TryGetSetting(configurationKey, out signingToken) || string.IsNullOrWhiteSpace(signingToken))
     638             :                                 if (!ConfigurationManager.TryGetSetting(SigningTokenConfigurationKey, out signingToken) || string.IsNullOrWhiteSpace(signingToken))
     639             :                                         signingToken = Guid.Empty.ToString("N");
     640             :                         if (!string.IsNullOrWhiteSpace(signingToken))
     641             :                                 using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", signingToken, messageBody))))
     642             :                                         brokeredMessage.Properties.Add("Signature", Convert.ToBase64String(signer.ComputeHash(hashStream)));
     643             : 
     644             :                         try
     645             :                         {
     646             :                                 var stackTrace = new StackTrace();
     647             :                                 StackFrame[] stackFrames = stackTrace.GetFrames();
     648             :                                 if (stackFrames != null)
     649             :                                 {
     650             :                                         foreach (StackFrame frame in stackFrames)
     651             :                                         {
     652             :                                                 MethodBase method = frame.GetMethod();
     653             :                                                 if (method.ReflectedType == null)
     654             :                                                         continue;
     655             : 
     656             :                                                 try
     657             :                                                 {
     658             :                                                         if (ExclusionNamespaces.All(@namespace => !method.ReflectedType.FullName.StartsWith(@namespace)))
     659             :                                                         {
     660             :                                                                 brokeredMessage.Properties.Add("Source-Method", string.Format("{0}.{1}", method.ReflectedType.FullName, method.Name));
     661             :                                                                 break;
     662             :                                                         }
     663             :                                                 }
     664             :                                                 catch
     665             :                                                 {
     666             :                                                         // Just move on
     667             :                                                 }
     668             :                                         }
     669             :                                 }
     670             :                         }
     671             :                         catch
     672             :                         {
     673             :                                 // Just move on
     674             :                         }
     675             : 
     676             :                         return brokeredMessage;
     677             :                 }
     678             : 
     679             :                 /// <summary>
     680             :                 /// Extract any telemetry properties from the provided <paramref name="message"/>.
     681             :                 /// </summary>
     682           1 :                 protected virtual IDictionary<string, string> ExtractTelemetryProperties(BrokeredMessage message, string baseCommunicationType)
     683             :                 {
     684             :                         IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", baseCommunicationType } };
     685             :                         object value;
     686             :                         if (message.Properties.TryGetValue("Type", out value))
     687             :                                 telemetryProperties.Add("MessageType", value.ToString());
     688             :                         if (message.Properties.TryGetValue("Source", out value))
     689             :                                 telemetryProperties.Add("MessageSource", value.ToString());
     690             :                         if (message.Properties.TryGetValue("Source-Method", out value))
     691             :                                 telemetryProperties.Add("MessageSourceMethod", value.ToString());
     692             :                         if (message.Properties.TryGetValue("CorrelationId", out value) && !telemetryProperties.ContainsKey("CorrelationId"))
     693             :                                 telemetryProperties.Add("CorrelationId", value.ToString());
     694             : 
     695             :                         return telemetryProperties;
     696             :                 }
     697             : 
     698             :                 /// <summary>
     699             :                 /// Extract the signature from the provided <paramref name="message"/>.
     700             :                 /// </summary>
     701           1 :                 protected virtual string ExtractSignature(BrokeredMessage message)
     702             :                 {
     703             :                         object value;
     704             :                         if (message.Properties.TryGetValue("Signature", out value))
     705             :                                 return value.ToString();
     706             :                         return null;
     707             :                 }
     708             :         }
     709             : }
 |