Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Configuration;
12 : using System.Threading;
13 : using System.Threading.Tasks;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Bus;
17 : using Cqrs.Configuration;
18 : using Cqrs.Messages;
19 : using Microsoft.ServiceBus;
20 : using Microsoft.ServiceBus.Messaging;
21 :
22 : namespace Cqrs.Azure.ServiceBus
23 : {
24 : public abstract class AzureServiceBus<TAuthenticationToken> : AzureBus<TAuthenticationToken>
25 0 : {
26 : protected TopicClient PrivateServiceBusPublisher { get; private set; }
27 :
28 : protected TopicClient PublicServiceBusPublisher { get; private set; }
29 :
30 : protected IDictionary<int, SubscriptionClient> PrivateServiceBusReceivers { get; private set; }
31 :
32 : protected IDictionary<int, SubscriptionClient> PublicServiceBusReceivers { get; private set; }
33 :
34 : protected string PrivateTopicName { get; private set; }
35 :
36 : protected string PublicTopicName { get; private set; }
37 :
38 : protected string PrivateTopicSubscriptionName { get; private set; }
39 :
40 : protected string PublicTopicSubscriptionName { get; private set; }
41 :
42 : protected abstract string MessageBusConnectionStringConfigurationKey { get; }
43 :
44 : protected abstract string PrivateTopicNameConfigurationKey { get; }
45 :
46 : protected abstract string PublicTopicNameConfigurationKey { get; }
47 :
48 : protected abstract string DefaultPrivateTopicName { get; }
49 :
50 : protected abstract string DefaultPublicTopicName { get; }
51 :
52 : protected abstract string PrivateTopicSubscriptionNameConfigurationKey { get; }
53 :
54 : protected abstract string PublicTopicSubscriptionNameConfigurationKey { get; }
55 :
56 : protected abstract string ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey { get; }
57 :
58 : protected bool ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete { get; private set; }
59 :
60 : protected const string DefaultPrivateTopicSubscriptionName = "Root";
61 :
62 : protected const string DefaultPublicTopicSubscriptionName = "Root";
63 :
64 : protected Action<BrokeredMessage> ReceiverMessageHandler { get; set; }
65 :
66 : protected OnMessageOptions ReceiverMessageHandlerOptions { get; set; }
67 :
68 : protected IBusHelper BusHelper { get; private set; }
69 :
70 : protected IAzureBusHelper<TAuthenticationToken> AzureBusHelper { get; private set; }
71 :
72 : protected ITelemetryHelper TelemetryHelper { get; set; }
73 :
74 0 : protected AzureServiceBus(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper, bool isAPublisher)
75 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, isAPublisher)
76 : {
77 : AzureBusHelper = azureBusHelper;
78 : BusHelper = busHelper;
79 : TelemetryHelper = new NullTelemetryHelper();
80 : PrivateServiceBusReceivers = new Dictionary<int, SubscriptionClient>();
81 : PublicServiceBusReceivers = new Dictionary<int, SubscriptionClient>();
82 : }
83 :
84 : #region Overrides of AzureBus<TAuthenticationToken>
85 :
86 0 : protected override string GetConnectionString()
87 : {
88 : string connectionString = ConfigurationManager.GetSetting(MessageBusConnectionStringConfigurationKey);
89 : if (string.IsNullOrWhiteSpace(connectionString))
90 : throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and has a valid connection string value.", MessageBusConnectionStringConfigurationKey));
91 : return connectionString;
92 : }
93 :
94 : #endregion
95 :
96 0 : protected override void InstantiatePublishing()
97 : {
98 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
99 : CheckPrivateEventTopicExists(namespaceManager);
100 : CheckPublicTopicExists(namespaceManager);
101 :
102 : PrivateServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PrivateTopicName);
103 : PublicServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PublicTopicName);
104 : StartSettingsChecking();
105 : }
106 :
107 0 : protected override void InstantiateReceiving()
108 : {
109 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
110 :
111 : CheckPrivateEventTopicExists(namespaceManager);
112 : CheckPublicTopicExists(namespaceManager);
113 :
114 : InstantiateReceiving(namespaceManager, PrivateServiceBusReceivers, PrivateTopicName, PrivateTopicSubscriptionName);
115 : InstantiateReceiving(namespaceManager, PublicServiceBusReceivers, PublicTopicName, PublicTopicSubscriptionName);
116 :
117 : bool enableDeadLetterCleanUp;
118 : string enableDeadLetterCleanUpValue = ConfigurationManager.GetSetting("Cqrs.Azure.Servicebus.EnableDeadLetterCleanUp");
119 : if (bool.TryParse(enableDeadLetterCleanUpValue, out enableDeadLetterCleanUp) && enableDeadLetterCleanUp)
120 : {
121 : CleanUpDeadLetters(PrivateTopicName, PrivateTopicSubscriptionName);
122 : CleanUpDeadLetters(PublicTopicName, PublicTopicSubscriptionName);
123 : }
124 :
125 : // If this is also a publisher, then it will the check over there and that will handle this
126 : // we only need to check one of these
127 : if (PublicServiceBusPublisher != null)
128 : return;
129 :
130 : StartSettingsChecking();
131 : }
132 :
133 0 : protected virtual void InstantiateReceiving(NamespaceManager namespaceManager, IDictionary<int, SubscriptionClient> serviceBusReceivers, string topicName, string topicSubscriptionName)
134 : {
135 : for (int i = 0; i < NumberOfReceiversCount; i++)
136 : {
137 : SubscriptionClient serviceBusReceiver = SubscriptionClient.CreateFromConnectionString(ConnectionString, topicName, topicSubscriptionName);
138 : if (serviceBusReceivers.ContainsKey(i))
139 : serviceBusReceivers[i] = serviceBusReceiver;
140 : else
141 : serviceBusReceivers.Add(i, serviceBusReceiver);
142 : }
143 : // Remove any if the number has decreased
144 : for (int i = NumberOfReceiversCount; i < serviceBusReceivers.Count; i++)
145 : serviceBusReceivers.Remove(i + 1);
146 : }
147 :
148 0 : protected virtual void CheckPrivateEventTopicExists(NamespaceManager namespaceManager)
149 : {
150 : CheckTopicExists(namespaceManager, PrivateTopicName = ConfigurationManager.GetSetting(PrivateTopicNameConfigurationKey) ?? DefaultPrivateTopicName, PrivateTopicSubscriptionName = ConfigurationManager.GetSetting(PrivateTopicSubscriptionNameConfigurationKey) ?? DefaultPrivateTopicSubscriptionName);
151 : }
152 :
153 0 : protected virtual void CheckPublicTopicExists(NamespaceManager namespaceManager)
154 : {
155 : CheckTopicExists(namespaceManager, PublicTopicName = ConfigurationManager.GetSetting(PublicTopicNameConfigurationKey) ?? DefaultPublicTopicName, PublicTopicSubscriptionName = ConfigurationManager.GetSetting(PublicTopicSubscriptionNameConfigurationKey) ?? DefaultPublicTopicSubscriptionName);
156 : }
157 :
158 0 : protected virtual void CheckTopicExists(NamespaceManager namespaceManager, string eventTopicName, string eventSubscriptionNames)
159 : {
160 : // Configure Queue Settings
161 : var eventTopicDescription = new TopicDescription(eventTopicName)
162 : {
163 : MaxSizeInMegabytes = 5120,
164 : DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
165 : EnablePartitioning = true,
166 : EnableBatchedOperations = true
167 : };
168 : // Create the topic if it does not exist already
169 : if (!namespaceManager.TopicExists(eventTopicDescription.Path))
170 : namespaceManager.CreateTopic(eventTopicDescription);
171 :
172 : if (!namespaceManager.SubscriptionExists(eventTopicDescription.Path, eventSubscriptionNames))
173 : namespaceManager.CreateSubscription
174 : (
175 : new SubscriptionDescription(eventTopicDescription.Path, eventSubscriptionNames)
176 : {
177 : DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
178 : EnableBatchedOperations = true,
179 : EnableDeadLetteringOnFilterEvaluationExceptions = true
180 : }
181 : );
182 : }
183 :
184 0 : protected override void TriggerSettingsChecking()
185 : {
186 : // First refresh the EventBlackListProcessing property
187 : bool throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
188 : if (!ConfigurationManager.TryGetSetting(ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey, out throwExceptionOnReceiverMessageLockLostExceptionDuringComplete))
189 : throwExceptionOnReceiverMessageLockLostExceptionDuringComplete = true;
190 : ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete = throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
191 :
192 : TriggerSettingsChecking(PrivateServiceBusPublisher, PrivateServiceBusReceivers);
193 : TriggerSettingsChecking(PublicServiceBusPublisher, PublicServiceBusReceivers);
194 :
195 : // Restart configuration, we order this intentionally with the publisher second as if this triggers the cancellation there's nothing else to process here
196 : // we also only need to check one of the publishers
197 : if (PublicServiceBusPublisher != null)
198 : {
199 : Logger.LogDebug("Recursively calling into InstantiatePublishing.");
200 : InstantiatePublishing();
201 : }
202 : }
203 :
204 0 : protected virtual void TriggerSettingsChecking(TopicClient serviceBusPublisher, IDictionary<int, SubscriptionClient> serviceBusReceivers)
205 : {
206 : // Let's wrap up using this message bus and start the switch
207 : if (serviceBusPublisher != null)
208 : {
209 : serviceBusPublisher.Close();
210 : Logger.LogDebug("Publishing service bus closed.");
211 : }
212 : foreach (SubscriptionClient serviceBusReceiver in serviceBusReceivers.Values)
213 : {
214 : // Let's wrap up using this message bus and start the switch
215 : if (serviceBusReceiver != null)
216 : {
217 : serviceBusReceiver.Close();
218 : Logger.LogDebug("Receiving service bus closed.");
219 : }
220 : // Restart configuration, we order this intentionally with the receiver first as if this triggers the cancellation we know this isn't a publisher as well
221 : if (serviceBusReceiver != null)
222 : {
223 : Logger.LogDebug("Recursively calling into InstantiateReceiving.");
224 : InstantiateReceiving();
225 :
226 : // This will be the case of a connection setting change re-connection
227 : if (ReceiverMessageHandler != null && ReceiverMessageHandlerOptions != null)
228 : {
229 : // Callback to handle received messages
230 : Logger.LogDebug("Re-registering onMessage handler.");
231 : ApplyReceiverMessageHandler();
232 : }
233 : else
234 : Logger.LogWarning("No onMessage handler was found to re-bind.");
235 : }
236 : }
237 : }
238 :
239 0 : protected virtual void RegisterReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
240 : {
241 : StoreReceiverMessageHandler(receiverMessageHandler, receiverMessageHandlerOptions);
242 :
243 : ApplyReceiverMessageHandler();
244 : }
245 :
246 0 : protected virtual void StoreReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
247 : {
248 : ReceiverMessageHandler = receiverMessageHandler;
249 : ReceiverMessageHandlerOptions = receiverMessageHandlerOptions;
250 : }
251 :
252 0 : protected override void ApplyReceiverMessageHandler()
253 : {
254 : foreach (SubscriptionClient serviceBusReceiver in PrivateServiceBusReceivers.Values)
255 : serviceBusReceiver.OnMessage(ReceiverMessageHandler, ReceiverMessageHandlerOptions);
256 : foreach (SubscriptionClient serviceBusReceiver in PublicServiceBusReceivers.Values)
257 : serviceBusReceiver.OnMessage(ReceiverMessageHandler, ReceiverMessageHandlerOptions);
258 : }
259 :
260 0 : protected virtual CancellationTokenSource CleanUpDeadLetters(string topicName, string topicSubscriptionName)
261 : {
262 : var brokeredMessageRenewCancellationTokenSource = new CancellationTokenSource();
263 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
264 : int lockIssues = 0;
265 :
266 : Action<BrokeredMessage, IMessage> leaveDeadlLetterInQueue = (deadLetterBrokeredMessage, deadLetterMessage) =>
267 : {
268 : // Remove message from queue
269 : try
270 : {
271 : deadLetterBrokeredMessage.Abandon();
272 : lockIssues = 0;
273 : }
274 : catch (MessageLockLostException)
275 : {
276 : lockIssues++;
277 : Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
278 : }
279 : Logger.LogDebug(string.Format("A dead-letter message of type {0} arrived with the id '{1}' but left in the queue due to settings.", deadLetterMessage.GetType().FullName, deadLetterBrokeredMessage.MessageId));
280 : };
281 : Action<BrokeredMessage> removeDeadlLetterFromQueue = deadLetterBrokeredMessage =>
282 : {
283 : // Remove message from queue
284 : try
285 : {
286 : deadLetterBrokeredMessage.Complete();
287 : lockIssues = 0;
288 : }
289 : catch (MessageLockLostException)
290 : {
291 : lockIssues++;
292 : Logger.LogWarning(string.Format("The lock supplied for complete for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
293 : }
294 : Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}' but was removed as processing was skipped due to settings.", deadLetterBrokeredMessage.MessageId));
295 : };
296 :
297 : Task.Factory.StartNewSafely(() =>
298 : {
299 : int loop = 0;
300 : while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
301 : {
302 : lockIssues = 0;
303 : MessagingFactory factory = MessagingFactory.CreateFromConnectionString(ConnectionString);
304 : string deadLetterPath = SubscriptionClient.FormatDeadLetterPath(topicName, topicSubscriptionName);
305 : MessageReceiver client = factory.CreateMessageReceiver(deadLetterPath, ReceiveMode.PeekLock);
306 :
307 : IEnumerable<BrokeredMessage> brokeredMessages = client.ReceiveBatch(1000);
308 :
309 : foreach (BrokeredMessage brokeredMessage in brokeredMessages)
310 : {
311 : if (lockIssues > 10)
312 : break;
313 : try
314 : {
315 : Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}'.", brokeredMessage.MessageId));
316 : string messageBody = brokeredMessage.GetBody<string>();
317 :
318 : // Closure protection
319 : BrokeredMessage message = brokeredMessage;
320 : try
321 : {
322 : AzureBusHelper.ReceiveEvent
323 : (
324 : messageBody,
325 : @event =>
326 : {
327 : bool isRequired = BusHelper.IsEventRequired(@event.GetType());
328 : if (!isRequired)
329 : removeDeadlLetterFromQueue(message);
330 : else
331 : leaveDeadlLetterInQueue(message, @event);
332 : return true;
333 : },
334 : string.Format("id '{0}'", brokeredMessage.MessageId),
335 : () =>
336 : {
337 : removeDeadlLetterFromQueue(message);
338 : },
339 : () => { }
340 : );
341 : }
342 : catch
343 : {
344 : AzureBusHelper.ReceiveCommand
345 : (
346 : messageBody,
347 : command =>
348 : {
349 : bool isRequired = BusHelper.IsEventRequired(command.GetType());
350 : if (!isRequired)
351 : removeDeadlLetterFromQueue(message);
352 : else
353 : leaveDeadlLetterInQueue(message, command);
354 : return true;
355 : },
356 : string.Format("id '{0}'", brokeredMessage.MessageId),
357 : () =>
358 : {
359 : removeDeadlLetterFromQueue(message);
360 : },
361 : () => { }
362 : );
363 : }
364 : }
365 : catch (Exception exception)
366 : {
367 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
368 : // Indicates a problem, unlock message in queue
369 : Logger.LogError(string.Format("A dead-letter message arrived with the id '{0}' but failed to be process.", brokeredMessage.MessageId), exception: exception);
370 : try
371 : {
372 : brokeredMessage.Abandon();
373 : }
374 : catch (MessageLockLostException)
375 : {
376 : lockIssues++;
377 : Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", brokeredMessage.MessageId));
378 : }
379 : }
380 : }
381 :
382 : client.Close();
383 :
384 : if (loop++ % 5 == 0)
385 : {
386 : loop = 0;
387 : Thread.Yield();
388 : }
389 : else
390 : Thread.Sleep(500);
391 : }
392 : try
393 : {
394 : brokeredMessageRenewCancellationTokenSource.Dispose();
395 : }
396 : catch (ObjectDisposedException) { }
397 : }, brokeredMessageRenewCancellationTokenSource.Token);
398 :
399 : return brokeredMessageRenewCancellationTokenSource;
400 : }
401 : }
402 : }
|