Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Threading.Tasks;
13 : using Cqrs.Authentication;
14 : using Cqrs.Configuration;
15 : using cdmdotnet.Logging;
16 : using Cqrs.Commands;
17 : using Cqrs.Events;
18 : using Cqrs.Infrastructure;
19 : using Microsoft.ServiceBus;
20 : using Microsoft.Practices.EnterpriseLibrary.Common.Configuration;
21 : using Microsoft.Practices.EnterpriseLibrary.WindowsAzure.TransientFaultHandling;
22 : using Microsoft.ServiceBus.Messaging;
23 : using RetryPolicy = Microsoft.Practices.TransientFaultHandling.RetryPolicy;
24 :
25 : namespace Cqrs.Azure.ServiceBus
26 : {
27 : /// <summary>
28 : /// An Azure Bus such as a Service Bus or Event Hub.
29 : /// </summary>
30 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
31 : public abstract class AzureBus<TAuthenticationToken>
32 1 : {
33 : /// <summary>
34 : /// Gets or sets the connection string to the bus.
35 : /// </summary>
36 : protected string ConnectionString { get; set; }
37 :
38 : /// <summary>
39 : /// Gets or sets the <see cref="IMessageSerialiser{TAuthenticationToken}"/>.
40 : /// </summary>
41 : protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
42 :
43 : /// <summary>
44 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
45 : /// </summary>
46 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
47 :
48 : /// <summary>
49 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
50 : /// </summary>
51 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
52 :
53 : /// <summary>
54 : /// Gets or sets the <see cref="ILogger"/>.
55 : /// </summary>
56 : protected ILogger Logger { get; private set; }
57 :
58 : /// <summary>
59 : /// Gets or sets the <see cref="IConfigurationManager"/>.
60 : /// </summary>
61 : protected IConfigurationManager ConfigurationManager { get; private set; }
62 :
63 : /// <summary>
64 : /// Gets or sets the <see cref="IEvent{TAuthenticationToken}">events</see> to wait for before responding to the caller
65 : /// keyed by the <see cref="ICommand{TAuthenticationToken}.Id"/>
66 : /// </summary>
67 : protected IDictionary<Guid, IList<IEvent<TAuthenticationToken>>> EventWaits { get; private set; }
68 :
69 : /// <summary>
70 : /// The default number of receivers to start and run.
71 : /// </summary>
72 : protected const int DefaultNumberOfReceiversCount = 1;
73 :
74 : /// <summary>
75 : /// The number of receivers to start and run.
76 : /// </summary>
77 : protected int NumberOfReceiversCount { get; set; }
78 :
79 : /// <summary>
80 : /// The default number for <see cref="MaximumConcurrentReceiverProcessesCount"/>.
81 : /// </summary>
82 : protected const int DefaultMaximumConcurrentReceiverProcessesCount = 1;
83 :
84 : /// <summary>
85 : /// The <see cref="OnMessageOptions.MaxConcurrentCalls"/> value.
86 : /// </summary>
87 : protected int MaximumConcurrentReceiverProcessesCount { get; set; }
88 :
89 : /// <summary>
90 : /// Indicates if the <typeparamref name="TAuthenticationToken"/> is a <see cref="Guid"/>.
91 : /// </summary>
92 : protected bool AuthenticationTokenIsGuid { get; private set; }
93 :
94 : /// <summary>
95 : /// Indicates if the <typeparamref name="TAuthenticationToken"/> is an <see cref="int"/>.
96 : /// </summary>
97 : protected bool AuthenticationTokenIsInt { get; private set; }
98 :
99 : /// <summary>
100 : /// Indicates if the <typeparamref name="TAuthenticationToken"/> is a <see cref="string"/>.
101 : /// </summary>
102 : protected bool AuthenticationTokenIsString { get; private set; }
103 :
104 : /// <summary>
105 : /// Instantiates a new instance of <see cref="AzureBus{TAuthenticationToken}"/>
106 : /// </summary>
107 1 : protected AzureBus(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, bool isAPublisher)
108 : {
109 : AuthenticationTokenIsGuid = typeof(TAuthenticationToken) == typeof(Guid);
110 : AuthenticationTokenIsInt = typeof(TAuthenticationToken) == typeof(int);
111 : AuthenticationTokenIsString = typeof(TAuthenticationToken) == typeof(string);
112 :
113 : EventWaits = new ConcurrentDictionary<Guid, IList<IEvent<TAuthenticationToken>>>();
114 :
115 : MessageSerialiser = messageSerialiser;
116 : AuthenticationTokenHelper = authenticationTokenHelper;
117 : CorrelationIdHelper = correlationIdHelper;
118 : Logger = logger;
119 : ConfigurationManager = configurationManager;
120 :
121 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
122 : UpdateSettings();
123 : if (isAPublisher)
124 : InstantiatePublishing();
125 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
126 : }
127 :
128 : /// <summary>
129 : /// Sets <see cref="ConnectionString"/> from <see cref="GetConnectionString"/>.
130 : /// </summary>
131 1 : protected virtual void SetConnectionStrings()
132 : {
133 : ConnectionString = GetConnectionString();
134 : Logger.LogSensitive(string.Format("Connection string settings set to {0}.", ConnectionString));
135 : }
136 :
137 : /// <summary>
138 : /// Sets <see cref="NumberOfReceiversCount"/> from <see cref="GetCurrentNumberOfReceiversCount"/>.
139 : /// </summary>
140 1 : protected virtual void SetNumberOfReceiversCount()
141 : {
142 : NumberOfReceiversCount = GetCurrentNumberOfReceiversCount();
143 : Logger.LogDebug(string.Format("Number of receivers settings set to {0}.", NumberOfReceiversCount));
144 : }
145 :
146 : /// <summary>
147 : /// Sets <see cref="MaximumConcurrentReceiverProcessesCount"/> from <see cref="GetCurrentMaximumConcurrentReceiverProcessesCount"/>.
148 : /// </summary>
149 1 : protected virtual void SetMaximumConcurrentReceiverProcessesCount()
150 : {
151 : MaximumConcurrentReceiverProcessesCount = GetCurrentMaximumConcurrentReceiverProcessesCount();
152 : Logger.LogDebug(string.Format("Number of receivers settings set to {0}.", MaximumConcurrentReceiverProcessesCount));
153 : }
154 :
155 : /// <summary>
156 : /// Gets the connection string for the bus.
157 : /// </summary>
158 1 : protected abstract string GetConnectionString();
159 :
160 : /// <summary>
161 : /// Returns <see cref="DefaultNumberOfReceiversCount"/>.
162 : /// </summary>
163 : /// <returns><see cref="DefaultNumberOfReceiversCount"/>.</returns>
164 1 : protected virtual int GetCurrentNumberOfReceiversCount()
165 : {
166 : return DefaultNumberOfReceiversCount;
167 : }
168 :
169 : /// <summary>
170 : /// Returns <see cref="DefaultMaximumConcurrentReceiverProcessesCount"/>.
171 : /// </summary>
172 : /// <returns><see cref="DefaultMaximumConcurrentReceiverProcessesCount"/>.</returns>
173 1 : protected virtual int GetCurrentMaximumConcurrentReceiverProcessesCount()
174 : {
175 : return DefaultMaximumConcurrentReceiverProcessesCount;
176 : }
177 :
178 : /// <summary>
179 : /// Instantiate publishing on this bus.
180 : /// </summary>
181 1 : protected abstract void InstantiatePublishing();
182 :
183 : /// <summary>
184 : /// Instantiate receiving on this bus.
185 : /// </summary>
186 1 : protected abstract void InstantiateReceiving();
187 :
188 : /// <summary>
189 : /// Creates a new instance of <see cref="NamespaceManager"/> with the <see cref="ConnectionString"/>.
190 : /// </summary>
191 1 : protected virtual NamespaceManager GetNamespaceManager()
192 : {
193 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
194 : return namespaceManager;
195 : }
196 :
197 : /// <summary>
198 : /// Gets the default retry policy dedicated to handling transient conditions with Windows Azure Service Bus.
199 : /// </summary>
200 : protected virtual RetryPolicy AzureServiceBusRetryPolicy
201 : {
202 : get
203 : {
204 : RetryManager retryManager = EnterpriseLibraryContainer.Current.GetInstance<RetryManager>();
205 : RetryPolicy retryPolicy = retryManager.GetDefaultAzureServiceBusRetryPolicy();
206 : retryPolicy.Retrying += (sender, args) =>
207 : {
208 : var message = string.Format("Retrying action - Count:{0}, Delay:{1}", args.CurrentRetryCount, args.Delay);
209 : Logger.LogWarning(message, "AzureServiceBusRetryPolicy", args.LastException);
210 : };
211 : return retryPolicy;
212 : }
213 : }
214 :
215 : /// <summary>
216 : /// Starts a new <see cref="Task"/> that periodically calls <see cref="ValidateSettingsHaveChanged"/>
217 : /// and if there is a change, calls <see cref="TriggerSettingsChecking"/>.
218 : /// </summary>
219 1 : protected virtual void StartSettingsChecking()
220 : {
221 : Task.Factory.StartNewSafely(() =>
222 : {
223 : SpinWait.SpinUntil(ValidateSettingsHaveChanged, sleepInMilliseconds: 1000);
224 :
225 : Logger.LogInfo("Connecting string settings for the Azure Service Bus changed and will now refresh.");
226 :
227 : // Update the connection string and trigger a restart;
228 : if (ValidateSettingsHaveChanged())
229 : TriggerSettingsChecking();
230 : });
231 : }
232 :
233 : /// <summary>
234 : /// Checks if the settings for
235 : /// <see cref="ConnectionString"/>, <see cref="NumberOfReceiversCount"/>
236 : /// or <see cref="MaximumConcurrentReceiverProcessesCount"/> have changed.
237 : /// </summary>
238 : /// <returns></returns>
239 1 : protected virtual bool ValidateSettingsHaveChanged()
240 : {
241 : return ConnectionString != GetConnectionString()
242 : ||
243 : NumberOfReceiversCount != GetCurrentNumberOfReceiversCount()
244 : ||
245 : MaximumConcurrentReceiverProcessesCount != GetCurrentMaximumConcurrentReceiverProcessesCount();
246 : }
247 :
248 : /// <summary>
249 : /// Calls
250 : /// <see cref="SetConnectionStrings"/>
251 : /// <see cref="SetNumberOfReceiversCount"/> and
252 : /// <see cref="SetMaximumConcurrentReceiverProcessesCount"/>
253 : /// </summary>
254 1 : protected virtual void UpdateSettings()
255 : {
256 : SetConnectionStrings();
257 : SetNumberOfReceiversCount();
258 : SetMaximumConcurrentReceiverProcessesCount();
259 : }
260 :
261 : /// <summary>
262 : /// Change the settings used by this bus.
263 : /// </summary>
264 1 : protected abstract void TriggerSettingsChecking();
265 :
266 : /// <summary>
267 : /// Sets the handler on <see cref="SubscriptionClient.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage})"/>.
268 : /// </summary>
269 1 : protected abstract void ApplyReceiverMessageHandler();
270 : }
271 : }
|