Documentation Coverage Report
Current view: top level - Azure/Cqrs.Azure.ServiceBus - AzureBus.cs Hit Total Coverage
Version: 4.0 Artefacts: 14 15 93.3 %
Date: 2019-11-24 03:15:41

          Line data    Source code
       1             : #region Copyright
       2             : // // -----------------------------------------------------------------------
       3             : // // <copyright company="Chinchilla Software Limited">
       4             : // //   Copyright Chinchilla Software Limited. All rights reserved.
       5             : // // </copyright>
       6             : // // -----------------------------------------------------------------------
       7             : #endregion
       8             : 
       9             : using System;
      10             : using System.Collections.Concurrent;
      11             : using System.Collections.Generic;
      12             : using System.Threading.Tasks;
      13             : using Cqrs.Authentication;
      14             : using Cqrs.Configuration;
      15             : using Chinchilla.Logging;
      16             : using Cqrs.Commands;
      17             : using Cqrs.Events;
      18             : using Cqrs.Infrastructure;
      19             : #if NET452
      20             : using Microsoft.ServiceBus;
      21             : using Microsoft.ServiceBus.Messaging;
      22             : using Microsoft.Practices.EnterpriseLibrary.Common.Configuration;
      23             : using Microsoft.Practices.EnterpriseLibrary.WindowsAzure.TransientFaultHandling;
      24             : using RetryPolicy = Microsoft.Practices.TransientFaultHandling.RetryPolicy;
      25             : using IMessageReceiver = Microsoft.ServiceBus.Messaging.SubscriptionClient;
      26             : #endif
      27             : #if NETCOREAPP3_0
      28             : using Microsoft.Azure.ServiceBus;
      29             : using Microsoft.Azure.ServiceBus.Core;
      30             : using Microsoft.Azure.ServiceBus.Management;
      31             : using Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling;
      32             : using RetryPolicy = Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling.RetryPolicy;
      33             : #endif
      34             : 
      35             : namespace Cqrs.Azure.ServiceBus
      36             : {
      37             :         /// <summary>
      38             :         /// An Azure Bus such as a Service Bus or Event Hub.
      39             :         /// </summary>
      40             :         /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
      41             :         /// <remarks>
      42             :         /// https://markheath.net/post/migrating-to-new-servicebus-sdk
      43             :         /// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions#receive-messages-from-the-subscription
      44             :         /// https://stackoverflow.com/questions/47427361/azure-service-bus-read-messages-sent-by-net-core-2-with-brokeredmessage-getbo
      45             :         /// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
      46             :         /// </remarks>
      47             :         public abstract class AzureBus<TAuthenticationToken>
      48           1 :         {
      49             :                 /// <summary>
      50             :                 /// Gets or sets the connection string to the bus.
      51             :                 /// </summary>
      52             :                 protected string ConnectionString { get; set; }
      53             : 
      54             :                 /// <summary>
      55             :                 /// Gets or sets the <see cref="IMessageSerialiser{TAuthenticationToken}"/>.
      56             :                 /// </summary>
      57             :                 protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
      58             : 
      59             :                 /// <summary>
      60             :                 /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
      61             :                 /// </summary>
      62             :                 protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
      63             : 
      64             :                 /// <summary>
      65             :                 /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
      66             :                 /// </summary>
      67             :                 protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
      68             : 
      69             :                 /// <summary>
      70             :                 /// Gets or sets the <see cref="ILogger"/>.
      71             :                 /// </summary>
      72             :                 protected ILogger Logger { get; private set; }
      73             : 
      74             :                 /// <summary>
      75             :                 /// Gets or sets the <see cref="IConfigurationManager"/>.
      76             :                 /// </summary>
      77             :                 protected IConfigurationManager ConfigurationManager { get; private set; }
      78             : 
      79             :                 /// <summary>
      80             :                 /// Gets or sets the <see cref="IEvent{TAuthenticationToken}">events</see> to wait for before responding to the caller
      81             :                 /// keyed by the <see cref="ICommand{TAuthenticationToken}.Id"/>
      82             :                 /// </summary>
      83             :                 protected IDictionary<Guid, IList<IEvent<TAuthenticationToken>>> EventWaits { get; private set; }
      84             : 
      85             :                 /// <summary>
      86             :                 /// The default number of receivers to start and run.
      87             :                 /// </summary>
      88             :                 protected const int DefaultNumberOfReceiversCount = 1;
      89             : 
      90             :                 /// <summary>
      91             :                 /// The number of receivers to start and run.
      92             :                 /// </summary>
      93             :                 protected int NumberOfReceiversCount { get; set; }
      94             : 
      95             :                 /// <summary>
      96             :                 /// The default number for <see cref="MaximumConcurrentReceiverProcessesCount"/>.
      97             :                 /// </summary>
      98             :                 protected const int DefaultMaximumConcurrentReceiverProcessesCount = 1;
      99             : 
     100             : #if NET452
     101             :                 /// <summary>
     102             :                 /// The <see cref="OnMessageOptions.MaxConcurrentCalls"/> value.
     103             :                 /// </summary>
     104             : #endif
     105             : #if NETCOREAPP3_0
     106             :                 /// <summary>
     107             :                 /// Used by .NET Framework, but not .Net Core
     108             :                 /// </summary>
     109             : #endif
     110             :                 protected int MaximumConcurrentReceiverProcessesCount { get; set; }
     111             : 
     112             :                 /// <summary>
     113             :                 /// Indicates if the <typeparamref name="TAuthenticationToken"/> is a <see cref="Guid"/>.
     114             :                 /// </summary>
     115             :                 protected bool AuthenticationTokenIsGuid { get; private set; }
     116             : 
     117             :                 /// <summary>
     118             :                 /// Indicates if the <typeparamref name="TAuthenticationToken"/> is an <see cref="int"/>.
     119             :                 /// </summary>
     120             :                 protected bool AuthenticationTokenIsInt { get; private set; }
     121             : 
     122             :                 /// <summary>
     123             :                 /// Indicates if the <typeparamref name="TAuthenticationToken"/> is a <see cref="string"/>.
     124             :                 /// </summary>
     125             :                 protected bool AuthenticationTokenIsString { get; private set; }
     126             : 
     127             :                 /// <summary>
     128             :                 /// Instantiates a new instance of <see cref="AzureBus{TAuthenticationToken}"/>
     129             :                 /// </summary>
     130           1 :                 protected AzureBus(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, bool isAPublisher)
     131             :                 {
     132             :                         AuthenticationTokenIsGuid = typeof(TAuthenticationToken) == typeof(Guid);
     133             :                         AuthenticationTokenIsInt = typeof(TAuthenticationToken) == typeof(int);
     134             :                         AuthenticationTokenIsString = typeof(TAuthenticationToken) == typeof(string);
     135             : 
     136             :                         EventWaits = new ConcurrentDictionary<Guid, IList<IEvent<TAuthenticationToken>>>();
     137             : 
     138             :                         MessageSerialiser = messageSerialiser;
     139             :                         AuthenticationTokenHelper = authenticationTokenHelper;
     140             :                         CorrelationIdHelper = correlationIdHelper;
     141             :                         Logger = logger;
     142             :                         ConfigurationManager = configurationManager;
     143             : 
     144             :                         // ReSharper disable DoNotCallOverridableMethodsInConstructor
     145             :                         UpdateSettings();
     146             :                         if (isAPublisher)
     147             :                                 InstantiatePublishing();
     148             :                         // ReSharper restore DoNotCallOverridableMethodsInConstructor
     149             :                 }
     150             : 
     151             :                 /// <summary>
     152             :                 /// Sets <see cref="ConnectionString"/> from <see cref="GetConnectionString"/>.
     153             :                 /// </summary>
     154           1 :                 protected virtual void SetConnectionStrings()
     155             :                 {
     156             :                         ConnectionString = GetConnectionString();
     157             :                         Logger.LogSensitive(string.Format("Connection string settings set to {0}.", ConnectionString));
     158             :                 }
     159             : 
     160             :                 /// <summary>
     161             :                 /// Sets <see cref="NumberOfReceiversCount"/> from <see cref="GetCurrentNumberOfReceiversCount"/>.
     162             :                 /// </summary>
     163           1 :                 protected virtual void SetNumberOfReceiversCount()
     164             :                 {
     165             :                         NumberOfReceiversCount = GetCurrentNumberOfReceiversCount();
     166             :                         Logger.LogDebug(string.Format("Number of receivers settings set to {0}.", NumberOfReceiversCount));
     167             :                 }
     168             : 
     169             :                 /// <summary>
     170             :                 /// Sets <see cref="MaximumConcurrentReceiverProcessesCount"/> from <see cref="GetCurrentMaximumConcurrentReceiverProcessesCount"/>.
     171             :                 /// </summary>
     172           1 :                 protected virtual void SetMaximumConcurrentReceiverProcessesCount()
     173             :                 {
     174             :                         MaximumConcurrentReceiverProcessesCount = GetCurrentMaximumConcurrentReceiverProcessesCount();
     175             :                         Logger.LogDebug(string.Format("Number of receivers settings set to {0}.", MaximumConcurrentReceiverProcessesCount));
     176             :                 }
     177             : 
     178             :                 /// <summary>
     179             :                 /// Gets the connection string for the bus.
     180             :                 /// </summary>
     181           1 :                 protected abstract string GetConnectionString();
     182             : 
     183             :                 /// <summary>
     184             :                 /// Returns <see cref="DefaultNumberOfReceiversCount"/>.
     185             :                 /// </summary>
     186             :                 /// <returns><see cref="DefaultNumberOfReceiversCount"/>.</returns>
     187           1 :                 protected virtual int GetCurrentNumberOfReceiversCount()
     188             :                 {
     189             :                         return DefaultNumberOfReceiversCount;
     190             :                 }
     191             : 
     192             :                 /// <summary>
     193             :                 /// Returns <see cref="DefaultMaximumConcurrentReceiverProcessesCount"/>.
     194             :                 /// </summary>
     195             :                 /// <returns><see cref="DefaultMaximumConcurrentReceiverProcessesCount"/>.</returns>
     196           1 :                 protected virtual int GetCurrentMaximumConcurrentReceiverProcessesCount()
     197             :                 {
     198             :                         return DefaultMaximumConcurrentReceiverProcessesCount;
     199             :                 }
     200             : 
     201             :                 /// <summary>
     202             :                 /// Instantiate publishing on this bus.
     203             :                 /// </summary>
     204           1 :                 protected abstract void InstantiatePublishing();
     205             : 
     206             :                 /// <summary>
     207             :                 /// Instantiate receiving on this bus.
     208             :                 /// </summary>
     209           1 :                 protected abstract void InstantiateReceiving();
     210             : 
     211             : #if NET452
     212             :                 /// <summary>
     213             :                 /// Creates a new instance of <see cref="NamespaceManager"/> with the <see cref="ConnectionString"/>.
     214             :                 /// </summary>
     215             :                 protected virtual NamespaceManager GetManager()
     216             :                 {
     217             :                         NamespaceManager manager = NamespaceManager.CreateFromConnectionString(ConnectionString);
     218             : #endif
     219             : #if NETCOREAPP3_0
     220             :                 /// <summary>
     221             :                 /// Creates a new instance of <see cref="ManagementClient"/> with the <see cref="ConnectionString"/>.
     222             :                 /// </summary>
     223             :                 protected virtual ManagementClient GetManager()
     224             :                 {
     225             :                         var manager = new ManagementClient(ConnectionString);
     226             : #endif
     227             :                         return manager;
     228             :                 }
     229             : 
     230             :                 /// <summary>
     231             :                 /// Gets the default retry policy dedicated to handling transient conditions with Windows Azure Service Bus.
     232             :                 /// </summary>
     233             :                 protected virtual RetryPolicy AzureServiceBusRetryPolicy
     234             :                 {
     235             :                         get
     236             :                         {
     237             : #if NET452
     238             :                                 RetryManager retryManager = EnterpriseLibraryContainer.Current.GetInstance<RetryManager>();
     239             : #endif
     240             : #if NETCOREAPP3_0
     241             :                                 RetryManager retryManager = RetryManager.Instance;
     242             : #endif
     243             :                                 RetryPolicy retryPolicy = retryManager.GetDefaultAzureServiceBusRetryPolicy();
     244             :                                 retryPolicy.Retrying += (sender, args) =>
     245             :                                 {
     246             :                                         var message = string.Format("Retrying action - Count:{0}, Delay:{1}", args.CurrentRetryCount, args.Delay);
     247             :                                         Logger.LogWarning(message, "AzureServiceBusRetryPolicy", args.LastException);
     248             :                                 };
     249             :                                 return retryPolicy;
     250             :                         }
     251             :                 }
     252             : 
     253             :                 /// <summary>
     254             :                 /// Starts a new <see cref="Task"/> that periodically calls <see cref="ValidateSettingsHaveChanged"/>
     255             :                 /// and if there is a change, calls <see cref="TriggerSettingsChecking"/>.
     256             :                 /// </summary>
     257           1 :                 protected virtual void StartSettingsChecking()
     258             :                 {
     259             :                         Task.Factory.StartNewSafely(() =>
     260             :                         {
     261             :                                 SpinWait.SpinUntil(ValidateSettingsHaveChanged, sleepInMilliseconds: 1000);
     262             : 
     263             :                                 Logger.LogInfo("Connecting string settings for the Azure Service Bus changed and will now refresh.");
     264             : 
     265             :                                 // Update the connection string and trigger a restart;
     266             :                                 if (ValidateSettingsHaveChanged())
     267             :                                         TriggerSettingsChecking();
     268             :                         });
     269             :                 }
     270             : 
     271             :                 /// <summary>
     272             :                 /// Checks if the settings for
     273             :                 /// <see cref="ConnectionString"/>, <see cref="NumberOfReceiversCount"/>
     274             :                 /// or <see cref="MaximumConcurrentReceiverProcessesCount"/> have changed.
     275             :                 /// </summary>
     276             :                 /// <returns></returns>
     277           1 :                 protected virtual bool ValidateSettingsHaveChanged()
     278             :                 {
     279             :                         return ConnectionString != GetConnectionString()
     280             :                                 ||
     281             :                         NumberOfReceiversCount != GetCurrentNumberOfReceiversCount()
     282             :                                 ||
     283             :                         MaximumConcurrentReceiverProcessesCount != GetCurrentMaximumConcurrentReceiverProcessesCount();
     284             :                 }
     285             : 
     286             :                 /// <summary>
     287             :                 /// Calls 
     288             :                 /// <see cref="SetConnectionStrings"/>
     289             :                 /// <see cref="SetNumberOfReceiversCount"/> and 
     290             :                 /// <see cref="SetMaximumConcurrentReceiverProcessesCount"/>
     291             :                 /// </summary>
     292           1 :                 protected virtual void UpdateSettings()
     293             :                 {
     294             :                         SetConnectionStrings();
     295             :                         SetNumberOfReceiversCount();
     296             :                         SetMaximumConcurrentReceiverProcessesCount();
     297             :                 }
     298             : 
     299             :                 /// <summary>
     300             :                 /// Change the settings used by this bus.
     301             :                 /// </summary>
     302           1 :                 protected abstract void TriggerSettingsChecking();
     303             : 
     304             : #if NET452
     305             :                 /// <summary>
     306             :                 /// Sets the handler on <see cref="IMessageReceiver.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage})"/>.
     307             :                 /// </summary>
     308             : #endif
     309             : #if NETCOREAPP3_0
     310             :                 /// <summary>
     311             :                 /// Sets the handler on <see cref="IReceiverClient.RegisterMessageHandler(Func{Message, System.Threading.CancellationToken, Task}, MessageHandlerOptions)"/>.
     312             :                 /// </summary>
     313             : #endif
     314           0 :                 protected abstract void ApplyReceiverMessageHandler();
     315             :         }
     316             : }

Generated by: LCOV version 1.13