Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Threading.Tasks;
13 : using Cqrs.Authentication;
14 : using Cqrs.Configuration;
15 : using Chinchilla.Logging;
16 : using Cqrs.Commands;
17 : using Cqrs.Events;
18 : using Cqrs.Infrastructure;
19 : #if NET452
20 : using Microsoft.ServiceBus;
21 : using Microsoft.ServiceBus.Messaging;
22 : using Microsoft.Practices.EnterpriseLibrary.Common.Configuration;
23 : using Microsoft.Practices.EnterpriseLibrary.WindowsAzure.TransientFaultHandling;
24 : using RetryPolicy = Microsoft.Practices.TransientFaultHandling.RetryPolicy;
25 : using IMessageReceiver = Microsoft.ServiceBus.Messaging.SubscriptionClient;
26 : #endif
27 : #if NETCOREAPP3_0
28 : using Microsoft.Azure.ServiceBus;
29 : using Microsoft.Azure.ServiceBus.Core;
30 : using Microsoft.Azure.ServiceBus.Management;
31 : using Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling;
32 : using RetryPolicy = Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling.RetryPolicy;
33 : #endif
34 :
35 : namespace Cqrs.Azure.ServiceBus
36 : {
37 : /// <summary>
38 : /// An Azure Bus such as a Service Bus or Event Hub.
39 : /// </summary>
40 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
41 : /// <remarks>
42 : /// https://markheath.net/post/migrating-to-new-servicebus-sdk
43 : /// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions#receive-messages-from-the-subscription
44 : /// https://stackoverflow.com/questions/47427361/azure-service-bus-read-messages-sent-by-net-core-2-with-brokeredmessage-getbo
45 : /// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
46 : /// </remarks>
47 : public abstract class AzureBus<TAuthenticationToken>
48 1 : {
49 : /// <summary>
50 : /// Gets or sets the connection string to the bus.
51 : /// </summary>
52 : protected string ConnectionString { get; set; }
53 :
54 : /// <summary>
55 : /// Gets or sets the <see cref="IMessageSerialiser{TAuthenticationToken}"/>.
56 : /// </summary>
57 : protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
58 :
59 : /// <summary>
60 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
61 : /// </summary>
62 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
63 :
64 : /// <summary>
65 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
66 : /// </summary>
67 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
68 :
69 : /// <summary>
70 : /// Gets or sets the <see cref="ILogger"/>.
71 : /// </summary>
72 : protected ILogger Logger { get; private set; }
73 :
74 : /// <summary>
75 : /// Gets or sets the <see cref="IConfigurationManager"/>.
76 : /// </summary>
77 : protected IConfigurationManager ConfigurationManager { get; private set; }
78 :
79 : /// <summary>
80 : /// Gets or sets the <see cref="IEvent{TAuthenticationToken}">events</see> to wait for before responding to the caller
81 : /// keyed by the <see cref="ICommand{TAuthenticationToken}.Id"/>
82 : /// </summary>
83 : protected IDictionary<Guid, IList<IEvent<TAuthenticationToken>>> EventWaits { get; private set; }
84 :
85 : /// <summary>
86 : /// The default number of receivers to start and run.
87 : /// </summary>
88 : protected const int DefaultNumberOfReceiversCount = 1;
89 :
90 : /// <summary>
91 : /// The number of receivers to start and run.
92 : /// </summary>
93 : protected int NumberOfReceiversCount { get; set; }
94 :
95 : /// <summary>
96 : /// The default number for <see cref="MaximumConcurrentReceiverProcessesCount"/>.
97 : /// </summary>
98 : protected const int DefaultMaximumConcurrentReceiverProcessesCount = 1;
99 :
100 : #if NET452
101 : /// <summary>
102 : /// The <see cref="OnMessageOptions.MaxConcurrentCalls"/> value.
103 : /// </summary>
104 : #endif
105 : #if NETCOREAPP3_0
106 : /// <summary>
107 : /// Used by .NET Framework, but not .Net Core
108 : /// </summary>
109 : #endif
110 : protected int MaximumConcurrentReceiverProcessesCount { get; set; }
111 :
112 : /// <summary>
113 : /// Indicates if the <typeparamref name="TAuthenticationToken"/> is a <see cref="Guid"/>.
114 : /// </summary>
115 : protected bool AuthenticationTokenIsGuid { get; private set; }
116 :
117 : /// <summary>
118 : /// Indicates if the <typeparamref name="TAuthenticationToken"/> is an <see cref="int"/>.
119 : /// </summary>
120 : protected bool AuthenticationTokenIsInt { get; private set; }
121 :
122 : /// <summary>
123 : /// Indicates if the <typeparamref name="TAuthenticationToken"/> is a <see cref="string"/>.
124 : /// </summary>
125 : protected bool AuthenticationTokenIsString { get; private set; }
126 :
127 : /// <summary>
128 : /// Instantiates a new instance of <see cref="AzureBus{TAuthenticationToken}"/>
129 : /// </summary>
130 1 : protected AzureBus(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, bool isAPublisher)
131 : {
132 : AuthenticationTokenIsGuid = typeof(TAuthenticationToken) == typeof(Guid);
133 : AuthenticationTokenIsInt = typeof(TAuthenticationToken) == typeof(int);
134 : AuthenticationTokenIsString = typeof(TAuthenticationToken) == typeof(string);
135 :
136 : EventWaits = new ConcurrentDictionary<Guid, IList<IEvent<TAuthenticationToken>>>();
137 :
138 : MessageSerialiser = messageSerialiser;
139 : AuthenticationTokenHelper = authenticationTokenHelper;
140 : CorrelationIdHelper = correlationIdHelper;
141 : Logger = logger;
142 : ConfigurationManager = configurationManager;
143 :
144 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
145 : UpdateSettings();
146 : if (isAPublisher)
147 : InstantiatePublishing();
148 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
149 : }
150 :
151 : /// <summary>
152 : /// Sets <see cref="ConnectionString"/> from <see cref="GetConnectionString"/>.
153 : /// </summary>
154 1 : protected virtual void SetConnectionStrings()
155 : {
156 : ConnectionString = GetConnectionString();
157 : Logger.LogSensitive(string.Format("Connection string settings set to {0}.", ConnectionString));
158 : }
159 :
160 : /// <summary>
161 : /// Sets <see cref="NumberOfReceiversCount"/> from <see cref="GetCurrentNumberOfReceiversCount"/>.
162 : /// </summary>
163 1 : protected virtual void SetNumberOfReceiversCount()
164 : {
165 : NumberOfReceiversCount = GetCurrentNumberOfReceiversCount();
166 : Logger.LogDebug(string.Format("Number of receivers settings set to {0}.", NumberOfReceiversCount));
167 : }
168 :
169 : /// <summary>
170 : /// Sets <see cref="MaximumConcurrentReceiverProcessesCount"/> from <see cref="GetCurrentMaximumConcurrentReceiverProcessesCount"/>.
171 : /// </summary>
172 1 : protected virtual void SetMaximumConcurrentReceiverProcessesCount()
173 : {
174 : MaximumConcurrentReceiverProcessesCount = GetCurrentMaximumConcurrentReceiverProcessesCount();
175 : Logger.LogDebug(string.Format("Number of receivers settings set to {0}.", MaximumConcurrentReceiverProcessesCount));
176 : }
177 :
178 : /// <summary>
179 : /// Gets the connection string for the bus.
180 : /// </summary>
181 1 : protected abstract string GetConnectionString();
182 :
183 : /// <summary>
184 : /// Returns <see cref="DefaultNumberOfReceiversCount"/>.
185 : /// </summary>
186 : /// <returns><see cref="DefaultNumberOfReceiversCount"/>.</returns>
187 1 : protected virtual int GetCurrentNumberOfReceiversCount()
188 : {
189 : return DefaultNumberOfReceiversCount;
190 : }
191 :
192 : /// <summary>
193 : /// Returns <see cref="DefaultMaximumConcurrentReceiverProcessesCount"/>.
194 : /// </summary>
195 : /// <returns><see cref="DefaultMaximumConcurrentReceiverProcessesCount"/>.</returns>
196 1 : protected virtual int GetCurrentMaximumConcurrentReceiverProcessesCount()
197 : {
198 : return DefaultMaximumConcurrentReceiverProcessesCount;
199 : }
200 :
201 : /// <summary>
202 : /// Instantiate publishing on this bus.
203 : /// </summary>
204 1 : protected abstract void InstantiatePublishing();
205 :
206 : /// <summary>
207 : /// Instantiate receiving on this bus.
208 : /// </summary>
209 1 : protected abstract void InstantiateReceiving();
210 :
211 : #if NET452
212 : /// <summary>
213 : /// Creates a new instance of <see cref="NamespaceManager"/> with the <see cref="ConnectionString"/>.
214 : /// </summary>
215 : protected virtual NamespaceManager GetManager()
216 : {
217 : NamespaceManager manager = NamespaceManager.CreateFromConnectionString(ConnectionString);
218 : #endif
219 : #if NETCOREAPP3_0
220 : /// <summary>
221 : /// Creates a new instance of <see cref="ManagementClient"/> with the <see cref="ConnectionString"/>.
222 : /// </summary>
223 : protected virtual ManagementClient GetManager()
224 : {
225 : var manager = new ManagementClient(ConnectionString);
226 : #endif
227 : return manager;
228 : }
229 :
230 : /// <summary>
231 : /// Gets the default retry policy dedicated to handling transient conditions with Windows Azure Service Bus.
232 : /// </summary>
233 : protected virtual RetryPolicy AzureServiceBusRetryPolicy
234 : {
235 : get
236 : {
237 : #if NET452
238 : RetryManager retryManager = EnterpriseLibraryContainer.Current.GetInstance<RetryManager>();
239 : #endif
240 : #if NETCOREAPP3_0
241 : RetryManager retryManager = RetryManager.Instance;
242 : #endif
243 : RetryPolicy retryPolicy = retryManager.GetDefaultAzureServiceBusRetryPolicy();
244 : retryPolicy.Retrying += (sender, args) =>
245 : {
246 : var message = string.Format("Retrying action - Count:{0}, Delay:{1}", args.CurrentRetryCount, args.Delay);
247 : Logger.LogWarning(message, "AzureServiceBusRetryPolicy", args.LastException);
248 : };
249 : return retryPolicy;
250 : }
251 : }
252 :
253 : /// <summary>
254 : /// Starts a new <see cref="Task"/> that periodically calls <see cref="ValidateSettingsHaveChanged"/>
255 : /// and if there is a change, calls <see cref="TriggerSettingsChecking"/>.
256 : /// </summary>
257 1 : protected virtual void StartSettingsChecking()
258 : {
259 : Task.Factory.StartNewSafely(() =>
260 : {
261 : SpinWait.SpinUntil(ValidateSettingsHaveChanged, sleepInMilliseconds: 1000);
262 :
263 : Logger.LogInfo("Connecting string settings for the Azure Service Bus changed and will now refresh.");
264 :
265 : // Update the connection string and trigger a restart;
266 : if (ValidateSettingsHaveChanged())
267 : TriggerSettingsChecking();
268 : });
269 : }
270 :
271 : /// <summary>
272 : /// Checks if the settings for
273 : /// <see cref="ConnectionString"/>, <see cref="NumberOfReceiversCount"/>
274 : /// or <see cref="MaximumConcurrentReceiverProcessesCount"/> have changed.
275 : /// </summary>
276 : /// <returns></returns>
277 1 : protected virtual bool ValidateSettingsHaveChanged()
278 : {
279 : return ConnectionString != GetConnectionString()
280 : ||
281 : NumberOfReceiversCount != GetCurrentNumberOfReceiversCount()
282 : ||
283 : MaximumConcurrentReceiverProcessesCount != GetCurrentMaximumConcurrentReceiverProcessesCount();
284 : }
285 :
286 : /// <summary>
287 : /// Calls
288 : /// <see cref="SetConnectionStrings"/>
289 : /// <see cref="SetNumberOfReceiversCount"/> and
290 : /// <see cref="SetMaximumConcurrentReceiverProcessesCount"/>
291 : /// </summary>
292 1 : protected virtual void UpdateSettings()
293 : {
294 : SetConnectionStrings();
295 : SetNumberOfReceiversCount();
296 : SetMaximumConcurrentReceiverProcessesCount();
297 : }
298 :
299 : /// <summary>
300 : /// Change the settings used by this bus.
301 : /// </summary>
302 1 : protected abstract void TriggerSettingsChecking();
303 :
304 : #if NET452
305 : /// <summary>
306 : /// Sets the handler on <see cref="IMessageReceiver.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage})"/>.
307 : /// </summary>
308 : #endif
309 : #if NETCOREAPP3_0
310 : /// <summary>
311 : /// Sets the handler on <see cref="IReceiverClient.RegisterMessageHandler(Func{Message, System.Threading.CancellationToken, Task}, MessageHandlerOptions)"/>.
312 : /// </summary>
313 : #endif
314 0 : protected abstract void ApplyReceiverMessageHandler();
315 : }
316 : }
|