Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Configuration;
12 : using System.Diagnostics;
13 : using System.IO;
14 : using System.Linq;
15 : using System.Reflection;
16 : using System.Security.Cryptography;
17 : using System.Text;
18 : using System.Threading;
19 : using System.Threading.Tasks;
20 : using cdmdotnet.Logging;
21 : using Cqrs.Authentication;
22 : using Cqrs.Bus;
23 : using Cqrs.Configuration;
24 : using Cqrs.Exceptions;
25 : using Cqrs.Messages;
26 : using Microsoft.ServiceBus;
27 : using Microsoft.ServiceBus.Messaging;
28 :
29 : namespace Cqrs.Azure.ServiceBus
30 : {
31 : /// <summary>
32 : /// An <see cref="AzureBus{TAuthenticationToken}"/> that uses Azure Service Bus.
33 : /// </summary>
34 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
35 : public abstract class AzureServiceBus<TAuthenticationToken>
36 : : AzureBus<TAuthenticationToken>
37 1 : {
38 : /// <summary>
39 : /// Gets the private <see cref="TopicClient"/> publisher.
40 : /// </summary>
41 : protected TopicClient PrivateServiceBusPublisher { get; private set; }
42 :
43 : /// <summary>
44 : /// Gets the public <see cref="TopicClient"/> publisher.
45 : /// </summary>
46 : protected TopicClient PublicServiceBusPublisher { get; private set; }
47 :
48 : /// <summary>
49 : /// Gets the private <see cref="SubscriptionClient"/> receivers.
50 : /// </summary>
51 : protected IDictionary<int, SubscriptionClient> PrivateServiceBusReceivers { get; private set; }
52 :
53 : /// <summary>
54 : /// Gets the public <see cref="SubscriptionClient"/> receivers.
55 : /// </summary>
56 : protected IDictionary<int, SubscriptionClient> PublicServiceBusReceivers { get; private set; }
57 :
58 : /// <summary>
59 : /// The name of the private topic.
60 : /// </summary>
61 : protected string PrivateTopicName { get; private set; }
62 :
63 : /// <summary>
64 : /// The name of the public topic.
65 : /// </summary>
66 : protected string PublicTopicName { get; private set; }
67 :
68 : /// <summary>
69 : /// The name of the subscription in the private topic.
70 : /// </summary>
71 : protected string PrivateTopicSubscriptionName { get; private set; }
72 :
73 : /// <summary>
74 : /// The name of the subscription in the public topic.
75 : /// </summary>
76 : protected string PublicTopicSubscriptionName { get; private set; }
77 :
78 : /// <summary>
79 : /// The configuration key for the message bus connection string as used by <see cref="IConfigurationManager"/>.
80 : /// </summary>
81 : protected abstract string MessageBusConnectionStringConfigurationKey { get; }
82 :
83 : /// <summary>
84 : /// The configuration key for the signing token as used by <see cref="IConfigurationManager"/>.
85 : /// </summary>
86 : protected abstract string SigningTokenConfigurationKey { get; }
87 :
88 : /// <summary>
89 : /// The configuration key for the name of the private topic as used by <see cref="IConfigurationManager"/>.
90 : /// </summary>
91 : protected abstract string PrivateTopicNameConfigurationKey { get; }
92 :
93 : /// <summary>
94 : /// The configuration key for the name of the public topic as used by <see cref="IConfigurationManager"/>.
95 : /// </summary>
96 : protected abstract string PublicTopicNameConfigurationKey { get; }
97 :
98 : /// <summary>
99 : /// The default name of the private topic if no <see cref="IConfigurationManager"/> value is set.
100 : /// </summary>
101 : protected abstract string DefaultPrivateTopicName { get; }
102 :
103 : /// <summary>
104 : /// The default name of the public topic if no <see cref="IConfigurationManager"/> value is set.
105 : /// </summary>
106 : protected abstract string DefaultPublicTopicName { get; }
107 :
108 : /// <summary>
109 : /// The configuration key for the name of the subscription in the private topic as used by <see cref="IConfigurationManager"/>.
110 : /// </summary>
111 : protected abstract string PrivateTopicSubscriptionNameConfigurationKey { get; }
112 :
113 : /// <summary>
114 : /// The configuration key for the name of the subscription in the public topic as used by <see cref="IConfigurationManager"/>.
115 : /// </summary>
116 : protected abstract string PublicTopicSubscriptionNameConfigurationKey { get; }
117 :
118 : /// <summary>
119 : /// The configuration key that
120 : /// specifies if an <see cref="Exception"/> is thrown if the network lock is lost
121 : /// as used by <see cref="IConfigurationManager"/>.
122 : /// </summary>
123 : protected abstract string ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey { get; }
124 :
125 : /// <summary>
126 : /// Specifies if an <see cref="Exception"/> is thrown if the network lock is lost.
127 : /// </summary>
128 : protected bool ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete { get; private set; }
129 :
130 : /// <summary>
131 : /// The default name of the subscription in the private topic if no <see cref="IConfigurationManager"/> value is set.
132 : /// </summary>
133 : protected const string DefaultPrivateTopicSubscriptionName = "Root";
134 :
135 : /// <summary>
136 : /// The default name of the subscription in the public topic if no <see cref="IConfigurationManager"/> value is set.
137 : /// </summary>
138 : protected const string DefaultPublicTopicSubscriptionName = "Root";
139 :
140 : /// <summary>
141 : /// The <see cref="Action{TBrokeredMessage}">handler</see> used for <see cref="SubscriptionClient.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage}, OnMessageOptions)"/> on each receiver.
142 : /// </summary>
143 : protected Action<BrokeredMessage> ReceiverMessageHandler { get; set; }
144 :
145 : /// <summary>
146 : /// The <see cref="OnMessageOptions" /> used for <see cref="SubscriptionClient.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage}, OnMessageOptions)"/> on each receiver.
147 : /// </summary>
148 : protected OnMessageOptions ReceiverMessageHandlerOptions { get; set; }
149 :
150 : /// <summary>
151 : /// Gets the <see cref="IBusHelper"/>.
152 : /// </summary>
153 : protected IBusHelper BusHelper { get; private set; }
154 :
155 : /// <summary>
156 : /// Gets the <see cref="IAzureBusHelper{TAuthenticationToken}"/>.
157 : /// </summary>
158 : protected IAzureBusHelper<TAuthenticationToken> AzureBusHelper { get; private set; }
159 :
160 : /// <summary>
161 : /// Gets the <see cref="ITelemetryHelper"/>.
162 : /// </summary>
163 : protected ITelemetryHelper TelemetryHelper { get; set; }
164 :
165 : /// <summary>
166 : /// The maximum number of time a retry is tried if a <see cref="System.TimeoutException"/> is thrown while sending messages.
167 : /// </summary>
168 : protected short TimeoutOnSendRetryMaximumCount { get; private set; }
169 :
170 : /// <summary>
171 : /// The <see cref="IHashAlgorithmFactory"/> to use to sign messages.
172 : /// </summary>
173 : protected IHashAlgorithmFactory Signer { get; private set; }
174 :
175 : /// <summary>
176 : /// A list of namespaces to exclude when trying to automatically determine the container.
177 : /// </summary>
178 : protected IList<string> ExclusionNamespaces { get; private set; }
179 :
180 : /// <summary>
181 : /// Instantiates a new instance of <see cref="AzureServiceBus{TAuthenticationToken}"/>
182 : /// </summary>
183 1 : protected AzureServiceBus(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper, IHashAlgorithmFactory hashAlgorithmFactory, bool isAPublisher)
184 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, isAPublisher)
185 : {
186 : AzureBusHelper = azureBusHelper;
187 : BusHelper = busHelper;
188 : TelemetryHelper = new NullTelemetryHelper();
189 : PrivateServiceBusReceivers = new Dictionary<int, SubscriptionClient>();
190 : PublicServiceBusReceivers = new Dictionary<int, SubscriptionClient>();
191 : TimeoutOnSendRetryMaximumCount = 1;
192 : string timeoutOnSendRetryMaximumCountValue;
193 : short timeoutOnSendRetryMaximumCount;
194 : if (ConfigurationManager.TryGetSetting("Cqrs.Azure.Servicebus.TimeoutOnSendRetryMaximumCount", out timeoutOnSendRetryMaximumCountValue) && !string.IsNullOrWhiteSpace(timeoutOnSendRetryMaximumCountValue) && short.TryParse(timeoutOnSendRetryMaximumCountValue, out timeoutOnSendRetryMaximumCount))
195 : TimeoutOnSendRetryMaximumCount = timeoutOnSendRetryMaximumCount;
196 : ExclusionNamespaces = new SynchronizedCollection<string> { "Cqrs", "System" };
197 : Signer = hashAlgorithmFactory;
198 : }
199 :
200 : #region Overrides of AzureBus<TAuthenticationToken>
201 :
202 : /// <summary>
203 : /// Gets the connection string for the bus from <see cref="AzureBus{TAuthenticationToken}.ConfigurationManager"/>
204 : /// </summary>
205 1 : protected override string GetConnectionString()
206 : {
207 : string connectionString = ConfigurationManager.GetSetting(MessageBusConnectionStringConfigurationKey);
208 : if (string.IsNullOrWhiteSpace(connectionString))
209 : throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and has a valid connection string value.", MessageBusConnectionStringConfigurationKey));
210 : return connectionString;
211 : }
212 :
213 : #endregion
214 :
215 : /// <summary>
216 : /// Instantiate publishing on this bus by
217 : /// calling <see cref="CheckPrivateTopicExists"/> and <see cref="CheckPublicTopicExists"/>
218 : /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
219 : /// </summary>
220 1 : protected override void InstantiatePublishing()
221 : {
222 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
223 : CheckPrivateTopicExists(namespaceManager);
224 : CheckPublicTopicExists(namespaceManager);
225 :
226 : PrivateServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PrivateTopicName);
227 : PublicServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PublicTopicName);
228 : StartSettingsChecking();
229 : }
230 :
231 : /// <summary>
232 : /// Instantiate receiving on this bus by
233 : /// calling <see cref="CheckPrivateTopicExists"/> and <see cref="CheckPublicTopicExists"/>
234 : /// then InstantiateReceiving for private and public topics,
235 : /// calls <see cref="CleanUpDeadLetters"/> for the private and public topics,
236 : /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
237 : /// </summary>
238 1 : protected override void InstantiateReceiving()
239 : {
240 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
241 :
242 : CheckPrivateTopicExists(namespaceManager);
243 : CheckPublicTopicExists(namespaceManager);
244 :
245 : try
246 : {
247 : InstantiateReceiving(namespaceManager, PrivateServiceBusReceivers, PrivateTopicName, PrivateTopicSubscriptionName);
248 : }
249 : catch (UriFormatException exception)
250 : {
251 : throw new InvalidConfigurationException("The connection string for one of the private Service Bus receivers may be invalid.", exception);
252 : }
253 : try
254 : {
255 : InstantiateReceiving(namespaceManager, PublicServiceBusReceivers, PublicTopicName, PublicTopicSubscriptionName);
256 : }
257 : catch (UriFormatException exception)
258 : {
259 : throw new InvalidConfigurationException("The connection string for one of the public Service Bus receivers may be invalid.", exception);
260 : }
261 :
262 : bool enableDeadLetterCleanUp;
263 : string enableDeadLetterCleanUpValue = ConfigurationManager.GetSetting("Cqrs.Azure.Servicebus.EnableDeadLetterCleanUp");
264 : if (bool.TryParse(enableDeadLetterCleanUpValue, out enableDeadLetterCleanUp) && enableDeadLetterCleanUp)
265 : {
266 : CleanUpDeadLetters(PrivateTopicName, PrivateTopicSubscriptionName);
267 : CleanUpDeadLetters(PublicTopicName, PublicTopicSubscriptionName);
268 : }
269 :
270 : // If this is also a publisher, then it will the check over there and that will handle this
271 : // we only need to check one of these
272 : if (PublicServiceBusPublisher != null)
273 : return;
274 :
275 : StartSettingsChecking();
276 : }
277 :
278 : /// <summary>
279 : /// Creates <see cref="AzureBus{TAuthenticationToken}.NumberOfReceiversCount"/> <see cref="SubscriptionClient"/>.
280 : /// If flushing is required, any flushed <see cref="SubscriptionClient"/> has <see cref="ClientEntity.Close()"/> called on it first.
281 : /// </summary>
282 : /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
283 : /// <param name="serviceBusReceivers">The receivers collection to place <see cref="SubscriptionClient"/> instances into.</param>
284 : /// <param name="topicName">The topic name.</param>
285 : /// <param name="topicSubscriptionName">The topic subscription name.</param>
286 1 : protected virtual void InstantiateReceiving(NamespaceManager namespaceManager, IDictionary<int, SubscriptionClient> serviceBusReceivers, string topicName, string topicSubscriptionName)
287 : {
288 : for (int i = 0; i < NumberOfReceiversCount; i++)
289 : {
290 : SubscriptionClient serviceBusReceiver = SubscriptionClient.CreateFromConnectionString(ConnectionString, topicName, topicSubscriptionName);
291 : if (serviceBusReceivers.ContainsKey(i))
292 : serviceBusReceivers[i] = serviceBusReceiver;
293 : else
294 : serviceBusReceivers.Add(i, serviceBusReceiver);
295 : }
296 : // Remove any if the number has decreased
297 : for (int i = NumberOfReceiversCount; i < serviceBusReceivers.Count; i++)
298 : {
299 : SubscriptionClient serviceBusReceiver;
300 : if (serviceBusReceivers.TryGetValue(i, out serviceBusReceiver))
301 : serviceBusReceiver.Close();
302 : serviceBusReceivers.Remove(i);
303 : }
304 : }
305 :
306 : /// <summary>
307 : /// Checks if the private topic and subscription name exists as per <see cref="PrivateTopicName"/> and <see cref="PrivateTopicSubscriptionName"/>.
308 : /// </summary>
309 : /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
310 1 : protected virtual void CheckPrivateTopicExists(NamespaceManager namespaceManager)
311 : {
312 : CheckTopicExists(namespaceManager, PrivateTopicName = ConfigurationManager.GetSetting(PrivateTopicNameConfigurationKey) ?? DefaultPrivateTopicName, PrivateTopicSubscriptionName = ConfigurationManager.GetSetting(PrivateTopicSubscriptionNameConfigurationKey) ?? DefaultPrivateTopicSubscriptionName);
313 : }
314 :
315 : /// <summary>
316 : /// Checks if the public topic and subscription name exists as per <see cref="PublicTopicName"/> and <see cref="PublicTopicSubscriptionName"/>.
317 : /// </summary>
318 : /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
319 1 : protected virtual void CheckPublicTopicExists(NamespaceManager namespaceManager)
320 : {
321 : CheckTopicExists(namespaceManager, PublicTopicName = ConfigurationManager.GetSetting(PublicTopicNameConfigurationKey) ?? DefaultPublicTopicName, PublicTopicSubscriptionName = ConfigurationManager.GetSetting(PublicTopicSubscriptionNameConfigurationKey) ?? DefaultPublicTopicSubscriptionName);
322 : }
323 :
324 : /// <summary>
325 : /// Checks if a topic by the provided <paramref name="topicName"/> exists and
326 : /// Checks if a subscription name by the provided <paramref name="subscriptionName"/> exists.
327 : /// </summary>
328 1 : protected virtual void CheckTopicExists(NamespaceManager namespaceManager, string topicName, string subscriptionName)
329 : {
330 : // Configure Queue Settings
331 : var eventTopicDescription = new TopicDescription(topicName)
332 : {
333 : MaxSizeInMegabytes = 5120,
334 : DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
335 : EnablePartitioning = true,
336 : EnableBatchedOperations = true
337 : };
338 : // Create the topic if it does not exist already
339 : if (!namespaceManager.TopicExists(eventTopicDescription.Path))
340 : namespaceManager.CreateTopic(eventTopicDescription);
341 :
342 : if (!namespaceManager.SubscriptionExists(eventTopicDescription.Path, subscriptionName))
343 : namespaceManager.CreateSubscription
344 : (
345 : new SubscriptionDescription(eventTopicDescription.Path, subscriptionName)
346 : {
347 : DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
348 : EnableBatchedOperations = true,
349 : EnableDeadLetteringOnFilterEvaluationExceptions = true
350 : }
351 : );
352 : }
353 :
354 : /// <summary>
355 : /// Triggers settings checking on both public and private publishers and receivers,
356 : /// then calls <see cref="InstantiatePublishing"/> if <see cref="PublicServiceBusPublisher"/> is not null.
357 : /// </summary>
358 1 : protected override void TriggerSettingsChecking()
359 : {
360 : // First refresh the EventBlackListProcessing property
361 : bool throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
362 : if (!ConfigurationManager.TryGetSetting(ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey, out throwExceptionOnReceiverMessageLockLostExceptionDuringComplete))
363 : throwExceptionOnReceiverMessageLockLostExceptionDuringComplete = true;
364 : ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete = throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
365 :
366 : TriggerSettingsChecking(PrivateServiceBusPublisher, PrivateServiceBusReceivers);
367 : TriggerSettingsChecking(PublicServiceBusPublisher, PublicServiceBusReceivers);
368 :
369 : // Restart configuration, we order this intentionally with the publisher second as if this triggers the cancellation there's nothing else to process here
370 : // we also only need to check one of the publishers
371 : if (PublicServiceBusPublisher != null)
372 : {
373 : Logger.LogDebug("Recursively calling into InstantiatePublishing.");
374 : InstantiatePublishing();
375 : }
376 : }
377 :
378 : /// <summary>
379 : /// Triggers settings checking on the provided <paramref name="serviceBusPublisher"/> and <paramref name="serviceBusReceivers"/>,
380 : /// then calls <see cref="InstantiateReceiving()"/>.
381 : /// </summary>
382 1 : protected virtual void TriggerSettingsChecking(TopicClient serviceBusPublisher, IDictionary<int, SubscriptionClient> serviceBusReceivers)
383 : {
384 : // Let's wrap up using this message bus and start the switch
385 : if (serviceBusPublisher != null)
386 : {
387 : serviceBusPublisher.Close();
388 : Logger.LogDebug("Publishing service bus closed.");
389 : }
390 : foreach (SubscriptionClient serviceBusReceiver in serviceBusReceivers.Values)
391 : {
392 : // Let's wrap up using this message bus and start the switch
393 : if (serviceBusReceiver != null)
394 : {
395 : serviceBusReceiver.Close();
396 : Logger.LogDebug("Receiving service bus closed.");
397 : }
398 : // Restart configuration, we order this intentionally with the receiver first as if this triggers the cancellation we know this isn't a publisher as well
399 : if (serviceBusReceiver != null)
400 : {
401 : Logger.LogDebug("Recursively calling into InstantiateReceiving.");
402 : InstantiateReceiving();
403 :
404 : // This will be the case of a connection setting change re-connection
405 : if (ReceiverMessageHandler != null && ReceiverMessageHandlerOptions != null)
406 : {
407 : // Callback to handle received messages
408 : Logger.LogDebug("Re-registering onMessage handler.");
409 : ApplyReceiverMessageHandler();
410 : }
411 : else
412 : Logger.LogWarning("No onMessage handler was found to re-bind.");
413 : }
414 : }
415 : }
416 :
417 : /// <summary>
418 : /// Registers the provided <paramref name="receiverMessageHandler"/> with the provided <paramref name="receiverMessageHandlerOptions"/>.
419 : /// </summary>
420 1 : protected virtual void RegisterReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
421 : {
422 : StoreReceiverMessageHandler(receiverMessageHandler, receiverMessageHandlerOptions);
423 :
424 : ApplyReceiverMessageHandler();
425 : }
426 :
427 : /// <summary>
428 : /// Stores the provided <paramref name="receiverMessageHandler"/> and <paramref name="receiverMessageHandlerOptions"/>.
429 : /// </summary>
430 1 : protected virtual void StoreReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
431 : {
432 : ReceiverMessageHandler = receiverMessageHandler;
433 : ReceiverMessageHandlerOptions = receiverMessageHandlerOptions;
434 : }
435 :
436 : /// <summary>
437 : /// Applies the stored ReceiverMessageHandler and ReceiverMessageHandlerOptions to all receivers in
438 : /// <see cref="PrivateServiceBusReceivers"/> and <see cref="PublicServiceBusReceivers"/>.
439 : /// </summary>
440 1 : protected override void ApplyReceiverMessageHandler()
441 : {
442 : foreach (SubscriptionClient serviceBusReceiver in PrivateServiceBusReceivers.Values)
443 : serviceBusReceiver
444 : .OnMessage
445 : (
446 : message =>
447 : {
448 : BusHelper.SetWasPrivateBusUsed(true);
449 : ReceiverMessageHandler(message);
450 : },
451 : ReceiverMessageHandlerOptions
452 : );
453 : foreach (SubscriptionClient serviceBusReceiver in PublicServiceBusReceivers.Values)
454 : serviceBusReceiver
455 : .OnMessage
456 : (
457 : message =>
458 : {
459 : BusHelper.SetWasPrivateBusUsed(false);
460 : ReceiverMessageHandler(message);
461 : },
462 : ReceiverMessageHandlerOptions
463 : );
464 : }
465 :
466 : /// <summary>
467 : /// Using a <see cref="Task"/>, clears all dead letters from the topic and subscription of the
468 : /// provided <paramref name="topicName"/> and <paramref name="topicSubscriptionName"/>.
469 : /// </summary>
470 : /// <param name="topicName">The name of the topic.</param>
471 : /// <param name="topicSubscriptionName">The name of the subscription.</param>
472 : /// <returns></returns>
473 1 : protected virtual CancellationTokenSource CleanUpDeadLetters(string topicName, string topicSubscriptionName)
474 : {
475 : var brokeredMessageRenewCancellationTokenSource = new CancellationTokenSource();
476 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
477 : int lockIssues = 0;
478 :
479 : Action<BrokeredMessage, IMessage> leaveDeadlLetterInQueue = (deadLetterBrokeredMessage, deadLetterMessage) =>
480 : {
481 : // Remove message from queue
482 : try
483 : {
484 : deadLetterBrokeredMessage.Abandon();
485 : lockIssues = 0;
486 : }
487 : catch (MessageLockLostException)
488 : {
489 : lockIssues++;
490 : Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
491 : }
492 : Logger.LogDebug(string.Format("A dead-letter message of type {0} arrived with the id '{1}' but left in the queue due to settings.", deadLetterMessage.GetType().FullName, deadLetterBrokeredMessage.MessageId));
493 : };
494 : Action<BrokeredMessage> removeDeadlLetterFromQueue = deadLetterBrokeredMessage =>
495 : {
496 : // Remove message from queue
497 : try
498 : {
499 : deadLetterBrokeredMessage.Complete();
500 : lockIssues = 0;
501 : }
502 : catch (MessageLockLostException)
503 : {
504 : lockIssues++;
505 : Logger.LogWarning(string.Format("The lock supplied for complete for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
506 : }
507 : Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}' but was removed as processing was skipped due to settings.", deadLetterBrokeredMessage.MessageId));
508 : };
509 :
510 : Task.Factory.StartNewSafely(() =>
511 : {
512 : int loop = 0;
513 : while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
514 : {
515 : lockIssues = 0;
516 : MessagingFactory factory = MessagingFactory.CreateFromConnectionString(ConnectionString);
517 : string deadLetterPath = SubscriptionClient.FormatDeadLetterPath(topicName, topicSubscriptionName);
518 : MessageReceiver client = factory.CreateMessageReceiver(deadLetterPath, ReceiveMode.PeekLock);
519 :
520 : IEnumerable<BrokeredMessage> brokeredMessages = client.ReceiveBatch(1000);
521 :
522 : foreach (BrokeredMessage brokeredMessage in brokeredMessages)
523 : {
524 : if (lockIssues > 10)
525 : break;
526 : try
527 : {
528 : Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}'.", brokeredMessage.MessageId));
529 : string messageBody = brokeredMessage.GetBody<string>();
530 :
531 : // Closure protection
532 : BrokeredMessage message = brokeredMessage;
533 : try
534 : {
535 : AzureBusHelper.ReceiveEvent
536 : (
537 : messageBody,
538 : @event =>
539 : {
540 : bool isRequired = BusHelper.IsEventRequired(@event.GetType());
541 : if (!isRequired)
542 : removeDeadlLetterFromQueue(message);
543 : else
544 : leaveDeadlLetterInQueue(message, @event);
545 : return true;
546 : },
547 : string.Format("id '{0}'", brokeredMessage.MessageId),
548 : ExtractSignature(message),
549 : SigningTokenConfigurationKey,
550 : () =>
551 : {
552 : removeDeadlLetterFromQueue(message);
553 : },
554 : () => { }
555 : );
556 : }
557 : catch
558 : {
559 : AzureBusHelper.ReceiveCommand
560 : (
561 : messageBody,
562 : command =>
563 : {
564 : bool isRequired = BusHelper.IsEventRequired(command.GetType());
565 : if (!isRequired)
566 : removeDeadlLetterFromQueue(message);
567 : else
568 : leaveDeadlLetterInQueue(message, command);
569 : return true;
570 : },
571 : string.Format("id '{0}'", brokeredMessage.MessageId),
572 : ExtractSignature(message),
573 : SigningTokenConfigurationKey,
574 : () =>
575 : {
576 : removeDeadlLetterFromQueue(message);
577 : },
578 : () => { }
579 : );
580 : }
581 : }
582 : catch (Exception exception)
583 : {
584 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
585 : // Indicates a problem, unlock message in queue
586 : Logger.LogError(string.Format("A dead-letter message arrived with the id '{0}' but failed to be process.", brokeredMessage.MessageId), exception: exception);
587 : try
588 : {
589 : brokeredMessage.Abandon();
590 : }
591 : catch (MessageLockLostException)
592 : {
593 : lockIssues++;
594 : Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", brokeredMessage.MessageId));
595 : }
596 : }
597 : }
598 :
599 : client.Close();
600 :
601 : if (loop++ % 5 == 0)
602 : {
603 : loop = 0;
604 : Thread.Yield();
605 : }
606 : else
607 : Thread.Sleep(500);
608 : }
609 : try
610 : {
611 : brokeredMessageRenewCancellationTokenSource.Dispose();
612 : }
613 : catch (ObjectDisposedException) { }
614 : }, brokeredMessageRenewCancellationTokenSource.Token);
615 :
616 : return brokeredMessageRenewCancellationTokenSource;
617 : }
618 :
619 : /// <summary>
620 : /// Create a <see cref="BrokeredMessage"/> with additional properties to aid routing and tracing
621 : /// </summary>
622 1 : protected virtual BrokeredMessage CreateBrokeredMessage<TMessage>(Func<TMessage, string> serialiserFunction, Type messageType, TMessage message)
623 : {
624 : string messageBody = serialiserFunction(message);
625 : var brokeredMessage = new BrokeredMessage(messageBody)
626 : {
627 : CorrelationId = CorrelationIdHelper.GetCorrelationId().ToString("N")
628 : };
629 : brokeredMessage.Properties.Add("CorrelationId", brokeredMessage.CorrelationId);
630 : brokeredMessage.Properties.Add("Type", messageType.FullName);
631 : brokeredMessage.Properties.Add("Source", string.Format("{0}/{1}/{2}/{3}", Logger.LoggerSettings.ModuleName, Logger.LoggerSettings.Instance, Logger.LoggerSettings.Environment, Logger.LoggerSettings.EnvironmentInstance));
632 :
633 : // see https://github.com/Chinchilla-Software-Com/CQRS/wiki/Inter-process-function-security</remarks>
634 : string configurationKey = string.Format("{0}.SigningToken", messageType.FullName);
635 : string signingToken;
636 : HashAlgorithm signer = Signer.Create();
637 : if (!ConfigurationManager.TryGetSetting(configurationKey, out signingToken) || string.IsNullOrWhiteSpace(signingToken))
638 : if (!ConfigurationManager.TryGetSetting(SigningTokenConfigurationKey, out signingToken) || string.IsNullOrWhiteSpace(signingToken))
639 : signingToken = Guid.Empty.ToString("N");
640 : if (!string.IsNullOrWhiteSpace(signingToken))
641 : using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", signingToken, messageBody))))
642 : brokeredMessage.Properties.Add("Signature", Convert.ToBase64String(signer.ComputeHash(hashStream)));
643 :
644 : try
645 : {
646 : var stackTrace = new StackTrace();
647 : StackFrame[] stackFrames = stackTrace.GetFrames();
648 : if (stackFrames != null)
649 : {
650 : foreach (StackFrame frame in stackFrames)
651 : {
652 : MethodBase method = frame.GetMethod();
653 : if (method.ReflectedType == null)
654 : continue;
655 :
656 : try
657 : {
658 : if (ExclusionNamespaces.All(@namespace => !method.ReflectedType.FullName.StartsWith(@namespace)))
659 : {
660 : brokeredMessage.Properties.Add("Source-Method", string.Format("{0}.{1}", method.ReflectedType.FullName, method.Name));
661 : break;
662 : }
663 : }
664 : catch
665 : {
666 : // Just move on
667 : }
668 : }
669 : }
670 : }
671 : catch
672 : {
673 : // Just move on
674 : }
675 :
676 : return brokeredMessage;
677 : }
678 :
679 : /// <summary>
680 : /// Extract any telemetry properties from the provided <paramref name="message"/>.
681 : /// </summary>
682 1 : protected virtual IDictionary<string, string> ExtractTelemetryProperties(BrokeredMessage message, string baseCommunicationType)
683 : {
684 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", baseCommunicationType } };
685 : object value;
686 : if (message.Properties.TryGetValue("Type", out value))
687 : telemetryProperties.Add("MessageType", value.ToString());
688 : if (message.Properties.TryGetValue("Source", out value))
689 : telemetryProperties.Add("MessageSource", value.ToString());
690 : if (message.Properties.TryGetValue("Source-Method", out value))
691 : telemetryProperties.Add("MessageSourceMethod", value.ToString());
692 : if (message.Properties.TryGetValue("CorrelationId", out value) && !telemetryProperties.ContainsKey("CorrelationId"))
693 : telemetryProperties.Add("CorrelationId", value.ToString());
694 :
695 : return telemetryProperties;
696 : }
697 :
698 : /// <summary>
699 : /// Extract the signature from the provided <paramref name="message"/>.
700 : /// </summary>
701 1 : protected virtual string ExtractSignature(BrokeredMessage message)
702 : {
703 : object value;
704 : if (message.Properties.TryGetValue("Signature", out value))
705 : return value.ToString();
706 : return null;
707 : }
708 : }
709 : }
|