Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Threading.Tasks;
13 : using Cqrs.Authentication;
14 : using Cqrs.Configuration;
15 : using cdmdotnet.Logging;
16 : using Cqrs.Commands;
17 : using Cqrs.Events;
18 : using Cqrs.Infrastructure;
19 : using Microsoft.ServiceBus;
20 : using Microsoft.Practices.EnterpriseLibrary.Common.Configuration;
21 : using Microsoft.Practices.EnterpriseLibrary.WindowsAzure.TransientFaultHandling;
22 : using Microsoft.ServiceBus.Messaging;
23 : using RetryPolicy = Microsoft.Practices.TransientFaultHandling.RetryPolicy;
24 :
25 : namespace Cqrs.Azure.ServiceBus
26 : {
27 : /// <summary>
28 : /// An Azure Bus such as a Service Bus or Event Hub.
29 : /// </summary>
30 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
31 : public abstract class AzureBus<TAuthenticationToken>
32 1 : {
33 : /// <summary>
34 : /// Gets or sets the connection string to the bus.
35 : /// </summary>
36 : protected string ConnectionString { get; set; }
37 :
38 : /// <summary>
39 : /// Gets or sets the <see cref="IMessageSerialiser{TAuthenticationToken}"/>.
40 : /// </summary>
41 : protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
42 :
43 : /// <summary>
44 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
45 : /// </summary>
46 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
47 :
48 : /// <summary>
49 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
50 : /// </summary>
51 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
52 :
53 : /// <summary>
54 : /// Gets or sets the <see cref="ILogger"/>.
55 : /// </summary>
56 : protected ILogger Logger { get; private set; }
57 :
58 : /// <summary>
59 : /// Gets or sets the <see cref="IConfigurationManager"/>.
60 : /// </summary>
61 : protected IConfigurationManager ConfigurationManager { get; private set; }
62 :
63 : /// <summary>
64 : /// Gets or sets the <see cref="IEvent{TAuthenticationToken}">events</see> to wait for before responding to the caller
65 : /// keyed by the <see cref="ICommand{TAuthenticationToken}.Id"/>
66 : /// </summary>
67 : protected IDictionary<Guid, IList<IEvent<TAuthenticationToken>>> EventWaits { get; private set; }
68 :
69 : /// <summary>
70 : /// The default number of receivers to start and run.
71 : /// </summary>
72 : protected const int DefaultNumberOfReceiversCount = 1;
73 :
74 : /// <summary>
75 : /// The number of receivers to start and run.
76 : /// </summary>
77 : protected int NumberOfReceiversCount { get; set; }
78 :
79 : /// <summary>
80 : /// The default number for <see cref="MaximumConcurrentReceiverProcessesCount"/>.
81 : /// </summary>
82 : protected const int DefaultMaximumConcurrentReceiverProcessesCount = 1;
83 :
84 : /// <summary>
85 : /// The <see cref="OnMessageOptions.MaxConcurrentCalls"/> value.
86 : /// </summary>
87 : protected int MaximumConcurrentReceiverProcessesCount { get; set; }
88 :
89 : /// <summary>
90 : /// Instantiates a new instance of <see cref="AzureBus{TAuthenticationToken}"/>
91 : /// </summary>
92 1 : protected AzureBus(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, bool isAPublisher)
93 : {
94 : EventWaits = new ConcurrentDictionary<Guid, IList<IEvent<TAuthenticationToken>>>();
95 :
96 : MessageSerialiser = messageSerialiser;
97 : AuthenticationTokenHelper = authenticationTokenHelper;
98 : CorrelationIdHelper = correlationIdHelper;
99 : Logger = logger;
100 : ConfigurationManager = configurationManager;
101 :
102 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
103 : UpdateSettings();
104 : if (isAPublisher)
105 : InstantiatePublishing();
106 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
107 : }
108 :
109 : /// <summary>
110 : /// Sets <see cref="ConnectionString"/> from <see cref="GetConnectionString"/>.
111 : /// </summary>
112 1 : protected virtual void SetConnectionStrings()
113 : {
114 : ConnectionString = GetConnectionString();
115 : Logger.LogSensitive(string.Format("Connection string settings set to {0}.", ConnectionString));
116 : }
117 :
118 : /// <summary>
119 : /// Sets <see cref="NumberOfReceiversCount"/> from <see cref="GetCurrentNumberOfReceiversCount"/>.
120 : /// </summary>
121 1 : protected virtual void SetNumberOfReceiversCount()
122 : {
123 : NumberOfReceiversCount = GetCurrentNumberOfReceiversCount();
124 : Logger.LogDebug(string.Format("Number of receivers settings set to {0}.", NumberOfReceiversCount));
125 : }
126 :
127 : /// <summary>
128 : /// Sets <see cref="MaximumConcurrentReceiverProcessesCount"/> from <see cref="GetCurrentMaximumConcurrentReceiverProcessesCount"/>.
129 : /// </summary>
130 1 : protected virtual void SetMaximumConcurrentReceiverProcessesCount()
131 : {
132 : MaximumConcurrentReceiverProcessesCount = GetCurrentMaximumConcurrentReceiverProcessesCount();
133 : Logger.LogDebug(string.Format("Number of receivers settings set to {0}.", MaximumConcurrentReceiverProcessesCount));
134 : }
135 :
136 : /// <summary>
137 : /// Gets the connection string for the bus.
138 : /// </summary>
139 1 : protected abstract string GetConnectionString();
140 :
141 : /// <summary>
142 : /// Returns <see cref="DefaultNumberOfReceiversCount"/>.
143 : /// </summary>
144 : /// <returns><see cref="DefaultNumberOfReceiversCount"/>.</returns>
145 1 : protected virtual int GetCurrentNumberOfReceiversCount()
146 : {
147 : return DefaultNumberOfReceiversCount;
148 : }
149 :
150 : /// <summary>
151 : /// Returns <see cref="DefaultMaximumConcurrentReceiverProcessesCount"/>.
152 : /// </summary>
153 : /// <returns><see cref="DefaultMaximumConcurrentReceiverProcessesCount"/>.</returns>
154 1 : protected virtual int GetCurrentMaximumConcurrentReceiverProcessesCount()
155 : {
156 : return DefaultMaximumConcurrentReceiverProcessesCount;
157 : }
158 :
159 : /// <summary>
160 : /// Instantiate publishing on this bus.
161 : /// </summary>
162 1 : protected abstract void InstantiatePublishing();
163 :
164 : /// <summary>
165 : /// Instantiate receiving on this bus.
166 : /// </summary>
167 1 : protected abstract void InstantiateReceiving();
168 :
169 : /// <summary>
170 : /// Creates a new instance of <see cref="NamespaceManager"/> with the <see cref="ConnectionString"/>.
171 : /// </summary>
172 1 : protected virtual NamespaceManager GetNamespaceManager()
173 : {
174 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
175 : return namespaceManager;
176 : }
177 :
178 : /// <summary>
179 : /// Gets the default retry policy dedicated to handling transient conditions with Windows Azure Service Bus.
180 : /// </summary>
181 : protected virtual RetryPolicy AzureServiceBusRetryPolicy
182 : {
183 : get
184 : {
185 : RetryManager retryManager = EnterpriseLibraryContainer.Current.GetInstance<RetryManager>();
186 : RetryPolicy retryPolicy = retryManager.GetDefaultAzureServiceBusRetryPolicy();
187 : retryPolicy.Retrying += (sender, args) =>
188 : {
189 : var message = string.Format("Retrying action - Count:{0}, Delay:{1}", args.CurrentRetryCount, args.Delay);
190 : Logger.LogWarning(message, "AzureServiceBusRetryPolicy", args.LastException);
191 : };
192 : return retryPolicy;
193 : }
194 : }
195 :
196 : /// <summary>
197 : /// Starts a new <see cref="Task"/> that periodically calls <see cref="ValidateSettingsHaveChanged"/>
198 : /// and if there is a change, calls <see cref="TriggerSettingsChecking"/>.
199 : /// </summary>
200 1 : protected virtual void StartSettingsChecking()
201 : {
202 : Task.Factory.StartNewSafely(() =>
203 : {
204 : SpinWait.SpinUntil(ValidateSettingsHaveChanged, sleepInMilliseconds: 1000);
205 :
206 : Logger.LogInfo("Connecting string settings for the Azure Service Bus changed and will now refresh.");
207 :
208 : // Update the connection string and trigger a restart;
209 : if (ValidateSettingsHaveChanged())
210 : TriggerSettingsChecking();
211 : });
212 : }
213 :
214 : /// <summary>
215 : /// Checks if the settings for
216 : /// <see cref="ConnectionString"/>, <see cref="NumberOfReceiversCount"/>
217 : /// or <see cref="MaximumConcurrentReceiverProcessesCount"/> have changed.
218 : /// </summary>
219 : /// <returns></returns>
220 1 : protected virtual bool ValidateSettingsHaveChanged()
221 : {
222 : return ConnectionString != GetConnectionString()
223 : ||
224 : NumberOfReceiversCount != GetCurrentNumberOfReceiversCount()
225 : ||
226 : MaximumConcurrentReceiverProcessesCount != GetCurrentMaximumConcurrentReceiverProcessesCount();
227 : }
228 :
229 : /// <summary>
230 : /// Calls
231 : /// <see cref="SetConnectionStrings"/>
232 : /// <see cref="SetNumberOfReceiversCount"/> and
233 : /// <see cref="SetMaximumConcurrentReceiverProcessesCount"/>
234 : /// </summary>
235 1 : protected virtual void UpdateSettings()
236 : {
237 : SetConnectionStrings();
238 : SetNumberOfReceiversCount();
239 : SetMaximumConcurrentReceiverProcessesCount();
240 : }
241 :
242 : /// <summary>
243 : /// Change the settings used by this bus.
244 : /// </summary>
245 1 : protected abstract void TriggerSettingsChecking();
246 :
247 : /// <summary>
248 : /// Sets the handler on <see cref="SubscriptionClient.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage})"/>.
249 : /// </summary>
250 1 : protected abstract void ApplyReceiverMessageHandler();
251 : }
252 : }
|