Documentation Coverage Report
Current view: top level - Azure/Cqrs.Azure.ServiceBus - AzureServiceBus.cs Hit Total Coverage
Version: 2.2 Artefacts: 15 15 100.0 %
Date: 2017-09-22

          Line data    Source code
       1             : #region Copyright
       2             : // // -----------------------------------------------------------------------
       3             : // // <copyright company="Chinchilla Software Limited">
       4             : // //   Copyright Chinchilla Software Limited. All rights reserved.
       5             : // // </copyright>
       6             : // // -----------------------------------------------------------------------
       7             : #endregion
       8             : 
       9             : using System;
      10             : using System.Collections.Generic;
      11             : using System.Configuration;
      12             : using System.Threading;
      13             : using System.Threading.Tasks;
      14             : using cdmdotnet.Logging;
      15             : using Cqrs.Authentication;
      16             : using Cqrs.Bus;
      17             : using Cqrs.Configuration;
      18             : using Cqrs.Exceptions;
      19             : using Cqrs.Messages;
      20             : using Microsoft.ServiceBus;
      21             : using Microsoft.ServiceBus.Messaging;
      22             : 
      23             : namespace Cqrs.Azure.ServiceBus
      24             : {
      25             :         /// <summary>
      26             :         /// An <see cref="AzureBus{TAuthenticationToken}"/> that uses Azure Service Bus.
      27             :         /// </summary>
      28             :         /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
      29             :         public abstract class AzureServiceBus<TAuthenticationToken> : AzureBus<TAuthenticationToken>
      30           1 :         {
      31             :                 /// <summary>
      32             :                 /// Gets the private <see cref="TopicClient"/> publisher.
      33             :                 /// </summary>
      34             :                 protected TopicClient PrivateServiceBusPublisher { get; private set; }
      35             : 
      36             :                 /// <summary>
      37             :                 /// Gets the public <see cref="TopicClient"/> publisher.
      38             :                 /// </summary>
      39             :                 protected TopicClient PublicServiceBusPublisher { get; private set; }
      40             : 
      41             :                 /// <summary>
      42             :                 /// Gets the private <see cref="SubscriptionClient"/> receivers.
      43             :                 /// </summary>
      44             :                 protected IDictionary<int, SubscriptionClient> PrivateServiceBusReceivers { get; private set; }
      45             : 
      46             :                 /// <summary>
      47             :                 /// Gets the public <see cref="SubscriptionClient"/> receivers.
      48             :                 /// </summary>
      49             :                 protected IDictionary<int, SubscriptionClient> PublicServiceBusReceivers { get; private set; }
      50             : 
      51             :                 /// <summary>
      52             :                 /// The name of the private topic.
      53             :                 /// </summary>
      54             :                 protected string PrivateTopicName { get; private set; }
      55             : 
      56             :                 /// <summary>
      57             :                 /// The name of the public topic.
      58             :                 /// </summary>
      59             :                 protected string PublicTopicName { get; private set; }
      60             : 
      61             :                 /// <summary>
      62             :                 /// The name of the subscription in the private topic.
      63             :                 /// </summary>
      64             :                 protected string PrivateTopicSubscriptionName { get; private set; }
      65             : 
      66             :                 /// <summary>
      67             :                 /// The name of the subscription in the public topic.
      68             :                 /// </summary>
      69             :                 protected string PublicTopicSubscriptionName { get; private set; }
      70             : 
      71             :                 /// <summary>
      72             :                 /// The configuration key for the message bus connection string as used by <see cref="IConfigurationManager"/>.
      73             :                 /// </summary>
      74             :                 protected abstract string MessageBusConnectionStringConfigurationKey { get; }
      75             : 
      76             :                 /// <summary>
      77             :                 /// The configuration key for the name of the private topic as used by <see cref="IConfigurationManager"/>.
      78             :                 /// </summary>
      79             :                 protected abstract string PrivateTopicNameConfigurationKey { get; }
      80             : 
      81             :                 /// <summary>
      82             :                 /// The configuration key for the name of the public topic as used by <see cref="IConfigurationManager"/>.
      83             :                 /// </summary>
      84             :                 protected abstract string PublicTopicNameConfigurationKey { get; }
      85             : 
      86             :                 /// <summary>
      87             :                 /// The default name of the private topic if no <see cref="IConfigurationManager"/> value is set.
      88             :                 /// </summary>
      89             :                 protected abstract string DefaultPrivateTopicName { get; }
      90             : 
      91             :                 /// <summary>
      92             :                 /// The default name of the public topic if no <see cref="IConfigurationManager"/> value is set.
      93             :                 /// </summary>
      94             :                 protected abstract string DefaultPublicTopicName { get; }
      95             : 
      96             :                 /// <summary>
      97             :                 /// The configuration key for the name of the subscription in the private topic as used by <see cref="IConfigurationManager"/>.
      98             :                 /// </summary>
      99             :                 protected abstract string PrivateTopicSubscriptionNameConfigurationKey { get; }
     100             : 
     101             :                 /// <summary>
     102             :                 /// The configuration key for the name of the subscription in the public topic as used by <see cref="IConfigurationManager"/>.
     103             :                 /// </summary>
     104             :                 protected abstract string PublicTopicSubscriptionNameConfigurationKey { get; }
     105             : 
     106             :                 /// <summary>
     107             :                 /// The configuration key that
     108             :                 /// specifies if an <see cref="Exception"/> is thrown if the network lock is lost
     109             :                 /// as used by <see cref="IConfigurationManager"/>.
     110             :                 /// </summary>
     111             :                 protected abstract string ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey { get; }
     112             : 
     113             :                 /// <summary>
     114             :                 /// Specifies if an <see cref="Exception"/> is thrown if the network lock is lost.
     115             :                 /// </summary>
     116             :                 protected bool ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete { get; private set; }
     117             : 
     118             :                 /// <summary>
     119             :                 /// The default name of the subscription in the private topic if no <see cref="IConfigurationManager"/> value is set.
     120             :                 /// </summary>
     121             :                 protected const string DefaultPrivateTopicSubscriptionName = "Root";
     122             : 
     123             :                 /// <summary>
     124             :                 /// The default name of the subscription in the public topic if no <see cref="IConfigurationManager"/> value is set.
     125             :                 /// </summary>
     126             :                 protected const string DefaultPublicTopicSubscriptionName = "Root";
     127             : 
     128             :                 /// <summary>
     129             :                 /// The <see cref="Action{TBrokeredMessage}">handler</see> used for <see cref="SubscriptionClient.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage}, OnMessageOptions)"/> on each receiver.
     130             :                 /// </summary>
     131             :                 protected Action<BrokeredMessage> ReceiverMessageHandler { get; set; }
     132             : 
     133             :                 /// <summary>
     134             :                 /// The <see cref="OnMessageOptions" /> used for <see cref="SubscriptionClient.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage}, OnMessageOptions)"/> on each receiver.
     135             :                 /// </summary>
     136             :                 protected OnMessageOptions ReceiverMessageHandlerOptions { get; set; }
     137             : 
     138             :                 /// <summary>
     139             :                 /// Gets the <see cref="IBusHelper"/>.
     140             :                 /// </summary>
     141             :                 protected IBusHelper BusHelper { get; private set; }
     142             : 
     143             :                 /// <summary>
     144             :                 /// Gets the <see cref="IAzureBusHelper{TAuthenticationToken}"/>.
     145             :                 /// </summary>
     146             :                 protected IAzureBusHelper<TAuthenticationToken> AzureBusHelper { get; private set; }
     147             : 
     148             :                 /// <summary>
     149             :                 /// Gets the <see cref="ITelemetryHelper"/>.
     150             :                 /// </summary>
     151             :                 protected ITelemetryHelper TelemetryHelper { get; set; }
     152             : 
     153             :                 /// <summary>
     154             :                 /// Instantiates a new instance of <see cref="AzureServiceBus{TAuthenticationToken}"/>
     155             :                 /// </summary>
     156           1 :                 protected AzureServiceBus(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper, bool isAPublisher)
     157             :                         : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, isAPublisher)
     158             :                 {
     159             :                         AzureBusHelper = azureBusHelper;
     160             :                         BusHelper = busHelper;
     161             :                         TelemetryHelper = new NullTelemetryHelper();
     162             :                         PrivateServiceBusReceivers = new Dictionary<int, SubscriptionClient>();
     163             :                         PublicServiceBusReceivers = new Dictionary<int, SubscriptionClient>();
     164             :                 }
     165             : 
     166             :                 #region Overrides of AzureBus<TAuthenticationToken>
     167             : 
     168             :                 /// <summary>
     169             :                 /// Gets the connection string for the bus from <see cref="AzureBus{TAuthenticationToken}.ConfigurationManager"/>
     170             :                 /// </summary>
     171           1 :                 protected override string GetConnectionString()
     172             :                 {
     173             :                         string connectionString = ConfigurationManager.GetSetting(MessageBusConnectionStringConfigurationKey);
     174             :                         if (string.IsNullOrWhiteSpace(connectionString))
     175             :                                 throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and has a valid connection string value.", MessageBusConnectionStringConfigurationKey));
     176             :                         return connectionString;
     177             :                 }
     178             : 
     179             :                 #endregion
     180             : 
     181             :                 /// <summary>
     182             :                 /// Instantiate publishing on this bus by
     183             :                 /// calling <see cref="CheckPrivateTopicExists"/> and <see cref="CheckPublicTopicExists"/>
     184             :                 /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
     185             :                 /// </summary>
     186           1 :                 protected override void InstantiatePublishing()
     187             :                 {
     188             :                         NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
     189             :                         CheckPrivateTopicExists(namespaceManager);
     190             :                         CheckPublicTopicExists(namespaceManager);
     191             : 
     192             :                         PrivateServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PrivateTopicName);
     193             :                         PublicServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PublicTopicName);
     194             :                         StartSettingsChecking();
     195             :                 }
     196             : 
     197             :                 /// <summary>
     198             :                 /// Instantiate receiving on this bus by
     199             :                 /// calling <see cref="CheckPrivateTopicExists"/> and <see cref="CheckPublicTopicExists"/>
     200             :                 /// then InstantiateReceiving for private and public topics,
     201             :                 /// calls <see cref="CleanUpDeadLetters"/> for the private and public topics,
     202             :                 /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
     203             :                 /// </summary>
     204           1 :                 protected override void InstantiateReceiving()
     205             :                 {
     206             :                         NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
     207             : 
     208             :                         CheckPrivateTopicExists(namespaceManager);
     209             :                         CheckPublicTopicExists(namespaceManager);
     210             : 
     211             :                         try
     212             :                         {
     213             :                                 InstantiateReceiving(namespaceManager, PrivateServiceBusReceivers, PrivateTopicName, PrivateTopicSubscriptionName);
     214             :                         }
     215             :                         catch (UriFormatException exception)
     216             :                         {
     217             :                                 throw new InvalidConfigurationException("The connection string for one of the private Service Bus receivers may be invalid.", exception);
     218             :                         }
     219             :                         try
     220             :                         {
     221             :                                 InstantiateReceiving(namespaceManager, PublicServiceBusReceivers, PublicTopicName, PublicTopicSubscriptionName);
     222             :                         }
     223             :                         catch (UriFormatException exception)
     224             :                         {
     225             :                                 throw new InvalidConfigurationException("The connection string for one of the public Service Bus receivers may be invalid.", exception);
     226             :                         }
     227             : 
     228             :                         bool enableDeadLetterCleanUp;
     229             :                         string enableDeadLetterCleanUpValue = ConfigurationManager.GetSetting("Cqrs.Azure.Servicebus.EnableDeadLetterCleanUp");
     230             :                         if (bool.TryParse(enableDeadLetterCleanUpValue, out enableDeadLetterCleanUp) && enableDeadLetterCleanUp)
     231             :                         {
     232             :                                 CleanUpDeadLetters(PrivateTopicName, PrivateTopicSubscriptionName);
     233             :                                 CleanUpDeadLetters(PublicTopicName, PublicTopicSubscriptionName);
     234             :                         }
     235             : 
     236             :                         // If this is also a publisher, then it will the check over there and that will handle this
     237             :                         // we only need to check one of these
     238             :                         if (PublicServiceBusPublisher != null)
     239             :                                 return;
     240             : 
     241             :                         StartSettingsChecking();
     242             :                 }
     243             : 
     244             :                 /// <summary>
     245             :                 /// Creates <see cref="AzureBus{TAuthenticationToken}.NumberOfReceiversCount"/> <see cref="SubscriptionClient"/>.
     246             :                 /// If flushing is required, any flushed <see cref="SubscriptionClient"/> has <see cref="ClientEntity.Close()"/> called on it first.
     247             :                 /// </summary>
     248             :                 /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
     249             :                 /// <param name="serviceBusReceivers">The receivers collection to place <see cref="SubscriptionClient"/> instances into.</param>
     250             :                 /// <param name="topicName">The topic name.</param>
     251             :                 /// <param name="topicSubscriptionName">The topic subscription name.</param>
     252           1 :                 protected virtual void InstantiateReceiving(NamespaceManager namespaceManager, IDictionary<int, SubscriptionClient> serviceBusReceivers, string topicName, string topicSubscriptionName)
     253             :                 {
     254             :                         for (int i = 0; i < NumberOfReceiversCount; i++)
     255             :                         {
     256             :                                 SubscriptionClient serviceBusReceiver = SubscriptionClient.CreateFromConnectionString(ConnectionString, topicName, topicSubscriptionName);
     257             :                                 if (serviceBusReceivers.ContainsKey(i))
     258             :                                         serviceBusReceivers[i] = serviceBusReceiver;
     259             :                                 else
     260             :                                         serviceBusReceivers.Add(i, serviceBusReceiver);
     261             :                         }
     262             :                         // Remove any if the number has decreased
     263             :                         for (int i = NumberOfReceiversCount; i < serviceBusReceivers.Count; i++)
     264             :                         {
     265             :                                 SubscriptionClient serviceBusReceiver;
     266             :                                 if (serviceBusReceivers.TryGetValue(i, out serviceBusReceiver))
     267             :                                         serviceBusReceiver.Close();
     268             :                                 serviceBusReceivers.Remove(i);
     269             :                         }
     270             :                 }
     271             : 
     272             :                 /// <summary>
     273             :                 /// Checks if the private topic and subscription name exists as per <see cref="PrivateTopicName"/> and <see cref="PrivateTopicSubscriptionName"/>.
     274             :                 /// </summary>
     275             :                 /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
     276           1 :                 protected virtual void CheckPrivateTopicExists(NamespaceManager namespaceManager)
     277             :                 {
     278             :                         CheckTopicExists(namespaceManager, PrivateTopicName = ConfigurationManager.GetSetting(PrivateTopicNameConfigurationKey) ?? DefaultPrivateTopicName, PrivateTopicSubscriptionName = ConfigurationManager.GetSetting(PrivateTopicSubscriptionNameConfigurationKey) ?? DefaultPrivateTopicSubscriptionName);
     279             :                 }
     280             : 
     281             :                 /// <summary>
     282             :                 /// Checks if the public topic and subscription name exists as per <see cref="PublicTopicName"/> and <see cref="PublicTopicSubscriptionName"/>.
     283             :                 /// </summary>
     284             :                 /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
     285           1 :                 protected virtual void CheckPublicTopicExists(NamespaceManager namespaceManager)
     286             :                 {
     287             :                         CheckTopicExists(namespaceManager, PublicTopicName = ConfigurationManager.GetSetting(PublicTopicNameConfigurationKey) ?? DefaultPublicTopicName, PublicTopicSubscriptionName = ConfigurationManager.GetSetting(PublicTopicSubscriptionNameConfigurationKey) ?? DefaultPublicTopicSubscriptionName);
     288             :                 }
     289             : 
     290             :                 /// <summary>
     291             :                 /// Checks if a topic by the provided <paramref name="topicName"/> exists and
     292             :                 /// Checks if a subscription name by the provided <paramref name="subscriptionName"/> exists.
     293             :                 /// </summary>
     294           1 :                 protected virtual void CheckTopicExists(NamespaceManager namespaceManager, string topicName, string subscriptionName)
     295             :                 {
     296             :                         // Configure Queue Settings
     297             :                         var eventTopicDescription = new TopicDescription(topicName)
     298             :                         {
     299             :                                 MaxSizeInMegabytes = 5120,
     300             :                                 DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
     301             :                                 EnablePartitioning = true,
     302             :                                 EnableBatchedOperations = true
     303             :                         };
     304             :                         // Create the topic if it does not exist already
     305             :                         if (!namespaceManager.TopicExists(eventTopicDescription.Path))
     306             :                                 namespaceManager.CreateTopic(eventTopicDescription);
     307             : 
     308             :                         if (!namespaceManager.SubscriptionExists(eventTopicDescription.Path, subscriptionName))
     309             :                                 namespaceManager.CreateSubscription
     310             :                                 (
     311             :                                         new SubscriptionDescription(eventTopicDescription.Path, subscriptionName)
     312             :                                         {
     313             :                                                 DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
     314             :                                                 EnableBatchedOperations = true,
     315             :                                                 EnableDeadLetteringOnFilterEvaluationExceptions = true
     316             :                                         }
     317             :                                 );
     318             :                 }
     319             : 
     320             :                 /// <summary>
     321             :                 /// Triggers settings checking on both public and private publishers and receivers,
     322             :                 /// then calls <see cref="InstantiatePublishing"/> if <see cref="PublicServiceBusPublisher"/> is not null.
     323             :                 /// </summary>
     324           1 :                 protected override void TriggerSettingsChecking()
     325             :                 {
     326             :                         // First refresh the EventBlackListProcessing property
     327             :                         bool throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
     328             :                         if (!ConfigurationManager.TryGetSetting(ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey, out throwExceptionOnReceiverMessageLockLostExceptionDuringComplete))
     329             :                                 throwExceptionOnReceiverMessageLockLostExceptionDuringComplete = true;
     330             :                         ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete = throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
     331             : 
     332             :                         TriggerSettingsChecking(PrivateServiceBusPublisher, PrivateServiceBusReceivers);
     333             :                         TriggerSettingsChecking(PublicServiceBusPublisher, PublicServiceBusReceivers);
     334             : 
     335             :                         // Restart configuration, we order this intentionally with the publisher second as if this triggers the cancellation there's nothing else to process here
     336             :                         // we also only need to check one of the publishers
     337             :                         if (PublicServiceBusPublisher != null)
     338             :                         {
     339             :                                 Logger.LogDebug("Recursively calling into InstantiatePublishing.");
     340             :                                 InstantiatePublishing();
     341             :                         }
     342             :                 }
     343             : 
     344             :                 /// <summary>
     345             :                 /// Triggers settings checking on the provided <paramref name="serviceBusPublisher"/> and <paramref name="serviceBusReceivers"/>,
     346             :                 /// then calls <see cref="InstantiateReceiving()"/>.
     347             :                 /// </summary>
     348           1 :                 protected virtual void TriggerSettingsChecking(TopicClient serviceBusPublisher, IDictionary<int, SubscriptionClient> serviceBusReceivers)
     349             :                 {
     350             :                         // Let's wrap up using this message bus and start the switch
     351             :                         if (serviceBusPublisher != null)
     352             :                         {
     353             :                                 serviceBusPublisher.Close();
     354             :                                 Logger.LogDebug("Publishing service bus closed.");
     355             :                         }
     356             :                         foreach (SubscriptionClient serviceBusReceiver in serviceBusReceivers.Values)
     357             :                         {
     358             :                                 // Let's wrap up using this message bus and start the switch
     359             :                                 if (serviceBusReceiver != null)
     360             :                                 {
     361             :                                         serviceBusReceiver.Close();
     362             :                                         Logger.LogDebug("Receiving service bus closed.");
     363             :                                 }
     364             :                                 // Restart configuration, we order this intentionally with the receiver first as if this triggers the cancellation we know this isn't a publisher as well
     365             :                                 if (serviceBusReceiver != null)
     366             :                                 {
     367             :                                         Logger.LogDebug("Recursively calling into InstantiateReceiving.");
     368             :                                         InstantiateReceiving();
     369             : 
     370             :                                         // This will be the case of a connection setting change re-connection
     371             :                                         if (ReceiverMessageHandler != null && ReceiverMessageHandlerOptions != null)
     372             :                                         {
     373             :                                                 // Callback to handle received messages
     374             :                                                 Logger.LogDebug("Re-registering onMessage handler.");
     375             :                                                 ApplyReceiverMessageHandler();
     376             :                                         }
     377             :                                         else
     378             :                                                 Logger.LogWarning("No onMessage handler was found to re-bind.");
     379             :                                 }
     380             :                         }
     381             :                 }
     382             : 
     383             :                 /// <summary>
     384             :                 /// Registers the provided <paramref name="receiverMessageHandler"/> with the provided <paramref name="receiverMessageHandlerOptions"/>.
     385             :                 /// </summary>
     386           1 :                 protected virtual void RegisterReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
     387             :                 {
     388             :                         StoreReceiverMessageHandler(receiverMessageHandler, receiverMessageHandlerOptions);
     389             : 
     390             :                         ApplyReceiverMessageHandler();
     391             :                 }
     392             : 
     393             :                 /// <summary>
     394             :                 /// Stores the provided <paramref name="receiverMessageHandler"/> and <paramref name="receiverMessageHandlerOptions"/>.
     395             :                 /// </summary>
     396           1 :                 protected virtual void StoreReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
     397             :                 {
     398             :                         ReceiverMessageHandler = receiverMessageHandler;
     399             :                         ReceiverMessageHandlerOptions = receiverMessageHandlerOptions;
     400             :                 }
     401             : 
     402             :                 /// <summary>
     403             :                 /// Applies the stored ReceiverMessageHandler and ReceiverMessageHandlerOptions to all receivers in
     404             :                 /// <see cref="PrivateServiceBusReceivers"/> and <see cref="PublicServiceBusReceivers"/>.
     405             :                 /// </summary>
     406           1 :                 protected override void ApplyReceiverMessageHandler()
     407             :                 {
     408             :                         foreach (SubscriptionClient serviceBusReceiver in PrivateServiceBusReceivers.Values)
     409             :                                 serviceBusReceiver.OnMessage(ReceiverMessageHandler, ReceiverMessageHandlerOptions);
     410             :                         foreach (SubscriptionClient serviceBusReceiver in PublicServiceBusReceivers.Values)
     411             :                                 serviceBusReceiver.OnMessage(ReceiverMessageHandler, ReceiverMessageHandlerOptions);
     412             :                 }
     413             : 
     414             :                 /// <summary>
     415             :                 /// Using a <see cref="Task"/>, clears all dead letters from the topic and subscription of the 
     416             :                 /// provided <paramref name="topicName"/> and <paramref name="topicSubscriptionName"/>.
     417             :                 /// </summary>
     418             :                 /// <param name="topicName">The name of the topic.</param>
     419             :                 /// <param name="topicSubscriptionName">The name of the subscription.</param>
     420             :                 /// <returns></returns>
     421           1 :                 protected virtual CancellationTokenSource CleanUpDeadLetters(string topicName, string topicSubscriptionName)
     422             :                 {
     423             :                         var brokeredMessageRenewCancellationTokenSource = new CancellationTokenSource();
     424             :                         IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
     425             :                         int lockIssues = 0;
     426             : 
     427             :                         Action<BrokeredMessage, IMessage> leaveDeadlLetterInQueue = (deadLetterBrokeredMessage, deadLetterMessage) =>
     428             :                         {
     429             :                                 // Remove message from queue
     430             :                                 try
     431             :                                 {
     432             :                                         deadLetterBrokeredMessage.Abandon();
     433             :                                         lockIssues = 0;
     434             :                                 }
     435             :                                 catch (MessageLockLostException)
     436             :                                 {
     437             :                                         lockIssues++;
     438             :                                         Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
     439             :                                 }
     440             :                                 Logger.LogDebug(string.Format("A dead-letter message of type {0} arrived with the id '{1}' but left in the queue due to settings.", deadLetterMessage.GetType().FullName, deadLetterBrokeredMessage.MessageId));
     441             :                         };
     442             :                         Action<BrokeredMessage> removeDeadlLetterFromQueue = deadLetterBrokeredMessage =>
     443             :                         {
     444             :                                 // Remove message from queue
     445             :                                 try
     446             :                                 {
     447             :                                         deadLetterBrokeredMessage.Complete();
     448             :                                         lockIssues = 0;
     449             :                                 }
     450             :                                 catch (MessageLockLostException)
     451             :                                 {
     452             :                                         lockIssues++;
     453             :                                         Logger.LogWarning(string.Format("The lock supplied for complete for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
     454             :                                 }
     455             :                                 Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}' but was removed as processing was skipped due to settings.", deadLetterBrokeredMessage.MessageId));
     456             :                         };
     457             : 
     458             :                         Task.Factory.StartNewSafely(() =>
     459             :                         {
     460             :                                 int loop = 0;
     461             :                                 while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
     462             :                                 {
     463             :                                         lockIssues = 0;
     464             :                                         MessagingFactory factory = MessagingFactory.CreateFromConnectionString(ConnectionString);
     465             :                                         string deadLetterPath = SubscriptionClient.FormatDeadLetterPath(topicName, topicSubscriptionName);
     466             :                                         MessageReceiver client = factory.CreateMessageReceiver(deadLetterPath, ReceiveMode.PeekLock);
     467             : 
     468             :                                         IEnumerable<BrokeredMessage> brokeredMessages = client.ReceiveBatch(1000);
     469             : 
     470             :                                         foreach (BrokeredMessage brokeredMessage in brokeredMessages)
     471             :                                         {
     472             :                                                 if (lockIssues > 10)
     473             :                                                         break;
     474             :                                                 try
     475             :                                                 {
     476             :                                                         Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}'.", brokeredMessage.MessageId));
     477             :                                                         string messageBody = brokeredMessage.GetBody<string>();
     478             : 
     479             :                                                         // Closure protection
     480             :                                                         BrokeredMessage message = brokeredMessage;
     481             :                                                         try
     482             :                                                         {
     483             :                                                                 AzureBusHelper.ReceiveEvent
     484             :                                                                 (
     485             :                                                                         messageBody,
     486             :                                                                         @event =>
     487             :                                                                         {
     488             :                                                                                 bool isRequired = BusHelper.IsEventRequired(@event.GetType());
     489             :                                                                                 if (!isRequired)
     490             :                                                                                         removeDeadlLetterFromQueue(message);
     491             :                                                                                 else
     492             :                                                                                         leaveDeadlLetterInQueue(message, @event);
     493             :                                                                                 return true;
     494             :                                                                         },
     495             :                                                                         string.Format("id '{0}'", brokeredMessage.MessageId),
     496             :                                                                         () =>
     497             :                                                                         {
     498             :                                                                                 removeDeadlLetterFromQueue(message);
     499             :                                                                         },
     500             :                                                                         () => { }
     501             :                                                                 );
     502             :                                                         }
     503             :                                                         catch
     504             :                                                         {
     505             :                                                                 AzureBusHelper.ReceiveCommand
     506             :                                                                 (
     507             :                                                                         messageBody,
     508             :                                                                         command =>
     509             :                                                                         {
     510             :                                                                                 bool isRequired = BusHelper.IsEventRequired(command.GetType());
     511             :                                                                                 if (!isRequired)
     512             :                                                                                         removeDeadlLetterFromQueue(message);
     513             :                                                                                 else
     514             :                                                                                         leaveDeadlLetterInQueue(message, command);
     515             :                                                                                 return true;
     516             :                                                                         },
     517             :                                                                         string.Format("id '{0}'", brokeredMessage.MessageId),
     518             :                                                                         () =>
     519             :                                                                         {
     520             :                                                                                 removeDeadlLetterFromQueue(message);
     521             :                                                                         },
     522             :                                                                         () => { }
     523             :                                                                 );
     524             :                                                         }
     525             :                                                 }
     526             :                                                 catch (Exception exception)
     527             :                                                 {
     528             :                                                         TelemetryHelper.TrackException(exception, null, telemetryProperties);
     529             :                                                         // Indicates a problem, unlock message in queue
     530             :                                                         Logger.LogError(string.Format("A dead-letter message arrived with the id '{0}' but failed to be process.", brokeredMessage.MessageId), exception: exception);
     531             :                                                         try
     532             :                                                         {
     533             :                                                                 brokeredMessage.Abandon();
     534             :                                                         }
     535             :                                                         catch (MessageLockLostException)
     536             :                                                         {
     537             :                                                                 lockIssues++;
     538             :                                                                 Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", brokeredMessage.MessageId));
     539             :                                                         }
     540             :                                                 }
     541             :                                         }
     542             : 
     543             :                                         client.Close();
     544             : 
     545             :                                         if (loop++ % 5 == 0)
     546             :                                         {
     547             :                                                 loop = 0;
     548             :                                                 Thread.Yield();
     549             :                                         }
     550             :                                         else
     551             :                                                 Thread.Sleep(500);
     552             :                                 }
     553             :                                 try
     554             :                                 {
     555             :                                         brokeredMessageRenewCancellationTokenSource.Dispose();
     556             :                                 }
     557             :                                 catch (ObjectDisposedException) { }
     558             :                         }, brokeredMessageRenewCancellationTokenSource.Token);
     559             : 
     560             :                         return brokeredMessageRenewCancellationTokenSource;
     561             :                 }
     562             :         }
     563             : }

Generated by: LCOV version 1.10