Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Threading.Tasks;
13 : using Cqrs.Authentication;
14 : using Cqrs.Configuration;
15 : using cdmdotnet.Logging;
16 : using Cqrs.Events;
17 : using Cqrs.Infrastructure;
18 : using Microsoft.ServiceBus;
19 : using Microsoft.Practices.EnterpriseLibrary.Common.Configuration;
20 : using Microsoft.Practices.EnterpriseLibrary.WindowsAzure.TransientFaultHandling;
21 : using RetryPolicy = Microsoft.Practices.TransientFaultHandling.RetryPolicy;
22 :
23 : namespace Cqrs.Azure.ServiceBus
24 : {
25 : public abstract class AzureBus<TAuthenticationToken>
26 0 : {
27 : protected string ConnectionString { get; set; }
28 :
29 : protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
30 :
31 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
32 :
33 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
34 :
35 : protected ILogger Logger { get; private set; }
36 :
37 : protected IConfigurationManager ConfigurationManager { get; private set; }
38 :
39 : protected IDictionary<Guid, IList<IEvent<TAuthenticationToken>>> EventWaits { get; private set; }
40 :
41 : protected const int DefaultNumberOfReceiversCount = 1;
42 :
43 : protected int NumberOfReceiversCount { get; set; }
44 :
45 : protected const int DefaultMaximumConcurrentReceiverProcessesCount = 1;
46 :
47 : protected int MaximumConcurrentReceiverProcessesCount { get; set; }
48 :
49 0 : protected AzureBus(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, bool isAPublisher)
50 : {
51 : EventWaits = new ConcurrentDictionary<Guid, IList<IEvent<TAuthenticationToken>>>();
52 :
53 : MessageSerialiser = messageSerialiser;
54 : AuthenticationTokenHelper = authenticationTokenHelper;
55 : CorrelationIdHelper = correlationIdHelper;
56 : Logger = logger;
57 : ConfigurationManager = configurationManager;
58 :
59 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
60 : UpdateSettings();
61 : if (isAPublisher)
62 : InstantiatePublishing();
63 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
64 : }
65 :
66 0 : protected virtual void SetConnectionStrings()
67 : {
68 : ConnectionString = GetConnectionString();
69 : Logger.LogSensitive(string.Format("Connection string settings set to {0}.", ConnectionString));
70 : }
71 :
72 0 : protected virtual void SetNumberOfReceiversCount()
73 : {
74 : NumberOfReceiversCount = GetCurrentNumberOfReceiversCount();
75 : Logger.LogDebug(string.Format("Number of receivers settings set to {0}.", NumberOfReceiversCount));
76 : }
77 :
78 0 : protected virtual void SetMaximumConcurrentReceiverProcessesCount()
79 : {
80 : MaximumConcurrentReceiverProcessesCount = GetCurrentMaximumConcurrentReceiverProcessesCount();
81 : Logger.LogDebug(string.Format("Number of receivers settings set to {0}.", MaximumConcurrentReceiverProcessesCount));
82 : }
83 :
84 0 : protected abstract string GetConnectionString();
85 :
86 0 : protected virtual int GetCurrentNumberOfReceiversCount()
87 : {
88 : return DefaultNumberOfReceiversCount;
89 : }
90 :
91 0 : protected virtual int GetCurrentMaximumConcurrentReceiverProcessesCount()
92 : {
93 : return DefaultMaximumConcurrentReceiverProcessesCount;
94 : }
95 :
96 0 : protected abstract void InstantiatePublishing();
97 :
98 0 : protected abstract void InstantiateReceiving();
99 :
100 0 : protected virtual NamespaceManager GetNamespaceManager()
101 : {
102 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
103 : return namespaceManager;
104 : }
105 :
106 : /// <summary>
107 : /// Gets the default retry policy dedicated to handling transient conditions with Windows Azure Service Bus.
108 : /// </summary>
109 : protected virtual RetryPolicy AzureServiceBusRetryPolicy
110 : {
111 : get
112 : {
113 : RetryManager retryManager = EnterpriseLibraryContainer.Current.GetInstance<RetryManager>();
114 : RetryPolicy retryPolicy = retryManager.GetDefaultAzureServiceBusRetryPolicy();
115 : retryPolicy.Retrying += (sender, args) =>
116 : {
117 : var message = string.Format("Retrying action - Count:{0}, Delay:{1}", args.CurrentRetryCount, args.Delay);
118 : Logger.LogWarning(message, "AzureServiceBusRetryPolicy", args.LastException);
119 : };
120 : return retryPolicy;
121 : }
122 : }
123 :
124 0 : protected virtual void StartSettingsChecking()
125 : {
126 : Task.Factory.StartNewSafely(() =>
127 : {
128 : SpinWait.SpinUntil(ValidateSettingsHaveChanged, sleepInMilliseconds: 1000);
129 :
130 : Logger.LogInfo("Connecting string settings for the Azure Service Bus changed and will now refresh.");
131 :
132 : // Update the connection string and trigger a restart;
133 : if (ValidateSettingsHaveChanged())
134 : TriggerSettingsChecking();
135 : });
136 : }
137 :
138 0 : protected virtual bool ValidateSettingsHaveChanged()
139 : {
140 : return ConnectionString != GetConnectionString()
141 : ||
142 : NumberOfReceiversCount != GetCurrentNumberOfReceiversCount()
143 : ||
144 : MaximumConcurrentReceiverProcessesCount != GetCurrentMaximumConcurrentReceiverProcessesCount();
145 : }
146 :
147 0 : protected virtual void UpdateSettings()
148 : {
149 : SetConnectionStrings();
150 : SetNumberOfReceiversCount();
151 : SetMaximumConcurrentReceiverProcessesCount();
152 : }
153 :
154 0 : protected abstract void TriggerSettingsChecking();
155 :
156 0 : protected abstract void ApplyReceiverMessageHandler();
157 : }
158 : }
|