LCOV - code coverage report
Current view: top level - Azure/Cqrs.Azure.ServiceBus - AzureServiceBus.cs Hit Total Coverage
Test: doc-coverage.info Lines: 0 15 0.0 %
Date: 2017-07-26

          Line data    Source code
       1             : #region Copyright
       2             : // // -----------------------------------------------------------------------
       3             : // // <copyright company="cdmdotnet Limited">
       4             : // //   Copyright cdmdotnet Limited. All rights reserved.
       5             : // // </copyright>
       6             : // // -----------------------------------------------------------------------
       7             : #endregion
       8             : 
       9             : using System;
      10             : using System.Collections.Generic;
      11             : using System.Configuration;
      12             : using System.Threading;
      13             : using System.Threading.Tasks;
      14             : using cdmdotnet.Logging;
      15             : using Cqrs.Authentication;
      16             : using Cqrs.Bus;
      17             : using Cqrs.Configuration;
      18             : using Cqrs.Messages;
      19             : using Microsoft.ServiceBus;
      20             : using Microsoft.ServiceBus.Messaging;
      21             : 
      22             : namespace Cqrs.Azure.ServiceBus
      23             : {
      24             :         public abstract class AzureServiceBus<TAuthenticationToken> : AzureBus<TAuthenticationToken>
      25           0 :         {
      26             :                 protected TopicClient PrivateServiceBusPublisher { get; private set; }
      27             : 
      28             :                 protected TopicClient PublicServiceBusPublisher { get; private set; }
      29             : 
      30             :                 protected IDictionary<int, SubscriptionClient> PrivateServiceBusReceivers { get; private set; }
      31             : 
      32             :                 protected IDictionary<int, SubscriptionClient> PublicServiceBusReceivers { get; private set; }
      33             : 
      34             :                 protected string PrivateTopicName { get; private set; }
      35             : 
      36             :                 protected string PublicTopicName { get; private set; }
      37             : 
      38             :                 protected string PrivateTopicSubscriptionName { get; private set; }
      39             : 
      40             :                 protected string PublicTopicSubscriptionName { get; private set; }
      41             : 
      42             :                 protected abstract string MessageBusConnectionStringConfigurationKey { get; }
      43             : 
      44             :                 protected abstract string PrivateTopicNameConfigurationKey { get; }
      45             : 
      46             :                 protected abstract string PublicTopicNameConfigurationKey { get; }
      47             : 
      48             :                 protected abstract string DefaultPrivateTopicName { get; }
      49             : 
      50             :                 protected abstract string DefaultPublicTopicName { get; }
      51             : 
      52             :                 protected abstract string PrivateTopicSubscriptionNameConfigurationKey { get; }
      53             : 
      54             :                 protected abstract string PublicTopicSubscriptionNameConfigurationKey { get; }
      55             : 
      56             :                 protected abstract string ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey { get; }
      57             : 
      58             :                 protected bool ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete { get; private set; }
      59             : 
      60             :                 protected const string DefaultPrivateTopicSubscriptionName = "Root";
      61             : 
      62             :                 protected const string DefaultPublicTopicSubscriptionName = "Root";
      63             : 
      64             :                 protected Action<BrokeredMessage> ReceiverMessageHandler { get; set; }
      65             : 
      66             :                 protected OnMessageOptions ReceiverMessageHandlerOptions { get; set; }
      67             : 
      68             :                 protected IBusHelper BusHelper { get; private set; }
      69             : 
      70             :                 protected IAzureBusHelper<TAuthenticationToken> AzureBusHelper { get; private set; }
      71             : 
      72             :                 protected ITelemetryHelper TelemetryHelper { get; set; }
      73             : 
      74           0 :                 protected AzureServiceBus(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper, bool isAPublisher)
      75             :                         : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, isAPublisher)
      76             :                 {
      77             :                         AzureBusHelper = azureBusHelper;
      78             :                         BusHelper = busHelper;
      79             :                         TelemetryHelper = new NullTelemetryHelper();
      80             :                         PrivateServiceBusReceivers = new Dictionary<int, SubscriptionClient>();
      81             :                         PublicServiceBusReceivers = new Dictionary<int, SubscriptionClient>();
      82             :                 }
      83             : 
      84             :                 #region Overrides of AzureBus<TAuthenticationToken>
      85             : 
      86           0 :                 protected override string GetConnectionString()
      87             :                 {
      88             :                         string connectionString = ConfigurationManager.GetSetting(MessageBusConnectionStringConfigurationKey);
      89             :                         if (string.IsNullOrWhiteSpace(connectionString))
      90             :                                 throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and has a valid connection string value.", MessageBusConnectionStringConfigurationKey));
      91             :                         return connectionString;
      92             :                 }
      93             : 
      94             :                 #endregion
      95             : 
      96           0 :                 protected override void InstantiatePublishing()
      97             :                 {
      98             :                         NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
      99             :                         CheckPrivateEventTopicExists(namespaceManager);
     100             :                         CheckPublicTopicExists(namespaceManager);
     101             : 
     102             :                         PrivateServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PrivateTopicName);
     103             :                         PublicServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PublicTopicName);
     104             :                         StartSettingsChecking();
     105             :                 }
     106             : 
     107           0 :                 protected override void InstantiateReceiving()
     108             :                 {
     109             :                         NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
     110             : 
     111             :                         CheckPrivateEventTopicExists(namespaceManager);
     112             :                         CheckPublicTopicExists(namespaceManager);
     113             : 
     114             :                         InstantiateReceiving(namespaceManager, PrivateServiceBusReceivers, PrivateTopicName, PrivateTopicSubscriptionName);
     115             :                         InstantiateReceiving(namespaceManager, PublicServiceBusReceivers, PublicTopicName, PublicTopicSubscriptionName);
     116             : 
     117             :                         bool enableDeadLetterCleanUp;
     118             :                         string enableDeadLetterCleanUpValue = ConfigurationManager.GetSetting("Cqrs.Azure.Servicebus.EnableDeadLetterCleanUp");
     119             :                         if (bool.TryParse(enableDeadLetterCleanUpValue, out enableDeadLetterCleanUp) && enableDeadLetterCleanUp)
     120             :                         {
     121             :                                 CleanUpDeadLetters(PrivateTopicName, PrivateTopicSubscriptionName);
     122             :                                 CleanUpDeadLetters(PublicTopicName, PublicTopicSubscriptionName);
     123             :                         }
     124             : 
     125             :                         // If this is also a publisher, then it will the check over there and that will handle this
     126             :                         // we only need to check one of these
     127             :                         if (PublicServiceBusPublisher != null)
     128             :                                 return;
     129             : 
     130             :                         StartSettingsChecking();
     131             :                 }
     132             : 
     133           0 :                 protected virtual void InstantiateReceiving(NamespaceManager namespaceManager, IDictionary<int, SubscriptionClient> serviceBusReceivers, string topicName, string topicSubscriptionName)
     134             :                 {
     135             :                         for (int i = 0; i < NumberOfReceiversCount; i++)
     136             :                         {
     137             :                                 SubscriptionClient serviceBusReceiver = SubscriptionClient.CreateFromConnectionString(ConnectionString, topicName, topicSubscriptionName);
     138             :                                 if (serviceBusReceivers.ContainsKey(i))
     139             :                                         serviceBusReceivers[i] = serviceBusReceiver;
     140             :                                 else
     141             :                                         serviceBusReceivers.Add(i, serviceBusReceiver);
     142             :                         }
     143             :                         // Remove any if the number has decreased
     144             :                         for (int i = NumberOfReceiversCount; i < serviceBusReceivers.Count; i++)
     145             :                                 serviceBusReceivers.Remove(i + 1);
     146             :                 }
     147             : 
     148           0 :                 protected virtual void CheckPrivateEventTopicExists(NamespaceManager namespaceManager)
     149             :                 {
     150             :                         CheckTopicExists(namespaceManager, PrivateTopicName = ConfigurationManager.GetSetting(PrivateTopicNameConfigurationKey) ?? DefaultPrivateTopicName, PrivateTopicSubscriptionName = ConfigurationManager.GetSetting(PrivateTopicSubscriptionNameConfigurationKey) ?? DefaultPrivateTopicSubscriptionName);
     151             :                 }
     152             : 
     153           0 :                 protected virtual void CheckPublicTopicExists(NamespaceManager namespaceManager)
     154             :                 {
     155             :                         CheckTopicExists(namespaceManager, PublicTopicName = ConfigurationManager.GetSetting(PublicTopicNameConfigurationKey) ?? DefaultPublicTopicName, PublicTopicSubscriptionName = ConfigurationManager.GetSetting(PublicTopicSubscriptionNameConfigurationKey) ?? DefaultPublicTopicSubscriptionName);
     156             :                 }
     157             : 
     158           0 :                 protected virtual void CheckTopicExists(NamespaceManager namespaceManager, string eventTopicName, string eventSubscriptionNames)
     159             :                 {
     160             :                         // Configure Queue Settings
     161             :                         var eventTopicDescription = new TopicDescription(eventTopicName)
     162             :                         {
     163             :                                 MaxSizeInMegabytes = 5120,
     164             :                                 DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
     165             :                                 EnablePartitioning = true,
     166             :                                 EnableBatchedOperations = true
     167             :                         };
     168             :                         // Create the topic if it does not exist already
     169             :                         if (!namespaceManager.TopicExists(eventTopicDescription.Path))
     170             :                                 namespaceManager.CreateTopic(eventTopicDescription);
     171             : 
     172             :                         if (!namespaceManager.SubscriptionExists(eventTopicDescription.Path, eventSubscriptionNames))
     173             :                                 namespaceManager.CreateSubscription
     174             :                                 (
     175             :                                         new SubscriptionDescription(eventTopicDescription.Path, eventSubscriptionNames)
     176             :                                         {
     177             :                                                 DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
     178             :                                                 EnableBatchedOperations = true,
     179             :                                                 EnableDeadLetteringOnFilterEvaluationExceptions = true
     180             :                                         }
     181             :                                 );
     182             :                 }
     183             : 
     184           0 :                 protected override void TriggerSettingsChecking()
     185             :                 {
     186             :                         // First refresh the EventBlackListProcessing property
     187             :                         bool throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
     188             :                         if (!ConfigurationManager.TryGetSetting(ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey, out throwExceptionOnReceiverMessageLockLostExceptionDuringComplete))
     189             :                                 throwExceptionOnReceiverMessageLockLostExceptionDuringComplete = true;
     190             :                         ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete = throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
     191             : 
     192             :                         TriggerSettingsChecking(PrivateServiceBusPublisher, PrivateServiceBusReceivers);
     193             :                         TriggerSettingsChecking(PublicServiceBusPublisher, PublicServiceBusReceivers);
     194             : 
     195             :                         // Restart configuration, we order this intentionally with the publisher second as if this triggers the cancellation there's nothing else to process here
     196             :                         // we also only need to check one of the publishers
     197             :                         if (PublicServiceBusPublisher != null)
     198             :                         {
     199             :                                 Logger.LogDebug("Recursively calling into InstantiatePublishing.");
     200             :                                 InstantiatePublishing();
     201             :                         }
     202             :                 }
     203             : 
     204           0 :                 protected virtual void TriggerSettingsChecking(TopicClient serviceBusPublisher, IDictionary<int, SubscriptionClient> serviceBusReceivers)
     205             :                 {
     206             :                         // Let's wrap up using this message bus and start the switch
     207             :                         if (serviceBusPublisher != null)
     208             :                         {
     209             :                                 serviceBusPublisher.Close();
     210             :                                 Logger.LogDebug("Publishing service bus closed.");
     211             :                         }
     212             :                         foreach (SubscriptionClient serviceBusReceiver in serviceBusReceivers.Values)
     213             :                         {
     214             :                                 // Let's wrap up using this message bus and start the switch
     215             :                                 if (serviceBusReceiver != null)
     216             :                                 {
     217             :                                         serviceBusReceiver.Close();
     218             :                                         Logger.LogDebug("Receiving service bus closed.");
     219             :                                 }
     220             :                                 // Restart configuration, we order this intentionally with the receiver first as if this triggers the cancellation we know this isn't a publisher as well
     221             :                                 if (serviceBusReceiver != null)
     222             :                                 {
     223             :                                         Logger.LogDebug("Recursively calling into InstantiateReceiving.");
     224             :                                         InstantiateReceiving();
     225             : 
     226             :                                         // This will be the case of a connection setting change re-connection
     227             :                                         if (ReceiverMessageHandler != null && ReceiverMessageHandlerOptions != null)
     228             :                                         {
     229             :                                                 // Callback to handle received messages
     230             :                                                 Logger.LogDebug("Re-registering onMessage handler.");
     231             :                                                 ApplyReceiverMessageHandler();
     232             :                                         }
     233             :                                         else
     234             :                                                 Logger.LogWarning("No onMessage handler was found to re-bind.");
     235             :                                 }
     236             :                         }
     237             :                 }
     238             : 
     239           0 :                 protected virtual void RegisterReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
     240             :                 {
     241             :                         StoreReceiverMessageHandler(receiverMessageHandler, receiverMessageHandlerOptions);
     242             : 
     243             :                         ApplyReceiverMessageHandler();
     244             :                 }
     245             : 
     246           0 :                 protected virtual void StoreReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
     247             :                 {
     248             :                         ReceiverMessageHandler = receiverMessageHandler;
     249             :                         ReceiverMessageHandlerOptions = receiverMessageHandlerOptions;
     250             :                 }
     251             : 
     252           0 :                 protected override void ApplyReceiverMessageHandler()
     253             :                 {
     254             :                         foreach (SubscriptionClient serviceBusReceiver in PrivateServiceBusReceivers.Values)
     255             :                                 serviceBusReceiver.OnMessage(ReceiverMessageHandler, ReceiverMessageHandlerOptions);
     256             :                         foreach (SubscriptionClient serviceBusReceiver in PublicServiceBusReceivers.Values)
     257             :                                 serviceBusReceiver.OnMessage(ReceiverMessageHandler, ReceiverMessageHandlerOptions);
     258             :                 }
     259             : 
     260           0 :                 protected virtual CancellationTokenSource CleanUpDeadLetters(string topicName, string topicSubscriptionName)
     261             :                 {
     262             :                         var brokeredMessageRenewCancellationTokenSource = new CancellationTokenSource();
     263             :                         IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
     264             :                         int lockIssues = 0;
     265             : 
     266             :                         Action<BrokeredMessage, IMessage> leaveDeadlLetterInQueue = (deadLetterBrokeredMessage, deadLetterMessage) =>
     267             :                         {
     268             :                                 // Remove message from queue
     269             :                                 try
     270             :                                 {
     271             :                                         deadLetterBrokeredMessage.Abandon();
     272             :                                         lockIssues = 0;
     273             :                                 }
     274             :                                 catch (MessageLockLostException)
     275             :                                 {
     276             :                                         lockIssues++;
     277             :                                         Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
     278             :                                 }
     279             :                                 Logger.LogDebug(string.Format("A dead-letter message of type {0} arrived with the id '{1}' but left in the queue due to settings.", deadLetterMessage.GetType().FullName, deadLetterBrokeredMessage.MessageId));
     280             :                         };
     281             :                         Action<BrokeredMessage> removeDeadlLetterFromQueue = deadLetterBrokeredMessage =>
     282             :                         {
     283             :                                 // Remove message from queue
     284             :                                 try
     285             :                                 {
     286             :                                         deadLetterBrokeredMessage.Complete();
     287             :                                         lockIssues = 0;
     288             :                                 }
     289             :                                 catch (MessageLockLostException)
     290             :                                 {
     291             :                                         lockIssues++;
     292             :                                         Logger.LogWarning(string.Format("The lock supplied for complete for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
     293             :                                 }
     294             :                                 Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}' but was removed as processing was skipped due to settings.", deadLetterBrokeredMessage.MessageId));
     295             :                         };
     296             : 
     297             :                         Task.Factory.StartNewSafely(() =>
     298             :                         {
     299             :                                 int loop = 0;
     300             :                                 while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
     301             :                                 {
     302             :                                         lockIssues = 0;
     303             :                                         MessagingFactory factory = MessagingFactory.CreateFromConnectionString(ConnectionString);
     304             :                                         string deadLetterPath = SubscriptionClient.FormatDeadLetterPath(topicName, topicSubscriptionName);
     305             :                                         MessageReceiver client = factory.CreateMessageReceiver(deadLetterPath, ReceiveMode.PeekLock);
     306             : 
     307             :                                         IEnumerable<BrokeredMessage> brokeredMessages = client.ReceiveBatch(1000);
     308             : 
     309             :                                         foreach (BrokeredMessage brokeredMessage in brokeredMessages)
     310             :                                         {
     311             :                                                 if (lockIssues > 10)
     312             :                                                         break;
     313             :                                                 try
     314             :                                                 {
     315             :                                                         Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}'.", brokeredMessage.MessageId));
     316             :                                                         string messageBody = brokeredMessage.GetBody<string>();
     317             : 
     318             :                                                         // Closure protection
     319             :                                                         BrokeredMessage message = brokeredMessage;
     320             :                                                         try
     321             :                                                         {
     322             :                                                                 AzureBusHelper.ReceiveEvent
     323             :                                                                 (
     324             :                                                                         messageBody,
     325             :                                                                         @event =>
     326             :                                                                         {
     327             :                                                                                 bool isRequired = BusHelper.IsEventRequired(@event.GetType());
     328             :                                                                                 if (!isRequired)
     329             :                                                                                         removeDeadlLetterFromQueue(message);
     330             :                                                                                 else
     331             :                                                                                         leaveDeadlLetterInQueue(message, @event);
     332             :                                                                                 return true;
     333             :                                                                         },
     334             :                                                                         string.Format("id '{0}'", brokeredMessage.MessageId),
     335             :                                                                         () =>
     336             :                                                                         {
     337             :                                                                                 removeDeadlLetterFromQueue(message);
     338             :                                                                         },
     339             :                                                                         () => { }
     340             :                                                                 );
     341             :                                                         }
     342             :                                                         catch
     343             :                                                         {
     344             :                                                                 AzureBusHelper.ReceiveCommand
     345             :                                                                 (
     346             :                                                                         messageBody,
     347             :                                                                         command =>
     348             :                                                                         {
     349             :                                                                                 bool isRequired = BusHelper.IsEventRequired(command.GetType());
     350             :                                                                                 if (!isRequired)
     351             :                                                                                         removeDeadlLetterFromQueue(message);
     352             :                                                                                 else
     353             :                                                                                         leaveDeadlLetterInQueue(message, command);
     354             :                                                                                 return true;
     355             :                                                                         },
     356             :                                                                         string.Format("id '{0}'", brokeredMessage.MessageId),
     357             :                                                                         () =>
     358             :                                                                         {
     359             :                                                                                 removeDeadlLetterFromQueue(message);
     360             :                                                                         },
     361             :                                                                         () => { }
     362             :                                                                 );
     363             :                                                         }
     364             :                                                 }
     365             :                                                 catch (Exception exception)
     366             :                                                 {
     367             :                                                         TelemetryHelper.TrackException(exception, null, telemetryProperties);
     368             :                                                         // Indicates a problem, unlock message in queue
     369             :                                                         Logger.LogError(string.Format("A dead-letter message arrived with the id '{0}' but failed to be process.", brokeredMessage.MessageId), exception: exception);
     370             :                                                         try
     371             :                                                         {
     372             :                                                                 brokeredMessage.Abandon();
     373             :                                                         }
     374             :                                                         catch (MessageLockLostException)
     375             :                                                         {
     376             :                                                                 lockIssues++;
     377             :                                                                 Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", brokeredMessage.MessageId));
     378             :                                                         }
     379             :                                                 }
     380             :                                         }
     381             : 
     382             :                                         client.Close();
     383             : 
     384             :                                         if (loop++ % 5 == 0)
     385             :                                         {
     386             :                                                 loop = 0;
     387             :                                                 Thread.Yield();
     388             :                                         }
     389             :                                         else
     390             :                                                 Thread.Sleep(500);
     391             :                                 }
     392             :                                 try
     393             :                                 {
     394             :                                         brokeredMessageRenewCancellationTokenSource.Dispose();
     395             :                                 }
     396             :                                 catch (ObjectDisposedException) { }
     397             :                         }, brokeredMessageRenewCancellationTokenSource.Token);
     398             : 
     399             :                         return brokeredMessageRenewCancellationTokenSource;
     400             :                 }
     401             :         }
     402             : }

Generated by: LCOV version 1.10