Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Configuration;
12 : using System.Diagnostics;
13 : using System.IO;
14 : using System.Linq;
15 : using System.Reflection;
16 : using System.Security.Cryptography;
17 : using System.Text;
18 : using System.Threading;
19 : using System.Threading.Tasks;
20 : using Chinchilla.Logging;
21 : using Cqrs.Authentication;
22 : using Cqrs.Bus;
23 : using Cqrs.Configuration;
24 : using Cqrs.Exceptions;
25 : using Cqrs.Messages;
26 : #if NET452
27 : using Microsoft.ServiceBus;
28 : using Microsoft.ServiceBus.Messaging;
29 : using Manager = Microsoft.ServiceBus.NamespaceManager;
30 : using IMessageReceiver = Microsoft.ServiceBus.Messaging.SubscriptionClient;
31 : #endif
32 : #if NETCOREAPP3_0
33 : using System.Runtime.Serialization;
34 : using System.Xml;
35 : using Microsoft.Azure.ServiceBus;
36 : using Microsoft.Azure.ServiceBus.Core;
37 : using Microsoft.Azure.ServiceBus.Management;
38 : using Manager = Microsoft.Azure.ServiceBus.Management.ManagementClient;
39 : using BrokeredMessage = Microsoft.Azure.ServiceBus.Message;
40 : #endif
41 :
42 : namespace Cqrs.Azure.ServiceBus
43 : {
44 : /// <summary>
45 : /// An <see cref="AzureBus{TAuthenticationToken}"/> that uses Azure Service Bus.
46 : /// </summary>
47 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
48 : /// <remarks>
49 : /// https://markheath.net/post/migrating-to-new-servicebus-sdk
50 : /// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions#receive-messages-from-the-subscription
51 : /// https://stackoverflow.com/questions/47427361/azure-service-bus-read-messages-sent-by-net-core-2-with-brokeredmessage-getbo
52 : /// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
53 : /// </remarks>
54 : public abstract class AzureServiceBus<TAuthenticationToken>
55 : : AzureBus<TAuthenticationToken>
56 1 : {
57 : /// <summary>
58 : /// Gets the private <see cref="TopicClient"/> publisher.
59 : /// </summary>
60 : protected TopicClient PrivateServiceBusPublisher { get; private set; }
61 :
62 : /// <summary>
63 : /// Gets the public <see cref="TopicClient"/> publisher.
64 : /// </summary>
65 : protected TopicClient PublicServiceBusPublisher { get; private set; }
66 :
67 : /// <summary>
68 : /// Gets the private <see cref="IMessageReceiver"/> receivers.
69 : /// </summary>
70 : protected IDictionary<int, IMessageReceiver> PrivateServiceBusReceivers { get; private set; }
71 :
72 : /// <summary>
73 : /// Gets the public <see cref="IMessageReceiver"/> receivers.
74 : /// </summary>
75 : protected IDictionary<int, IMessageReceiver> PublicServiceBusReceivers { get; private set; }
76 :
77 : /// <summary>
78 : /// The name of the private topic.
79 : /// </summary>
80 : protected string PrivateTopicName { get; private set; }
81 :
82 : /// <summary>
83 : /// The name of the public topic.
84 : /// </summary>
85 : protected string PublicTopicName { get; private set; }
86 :
87 : /// <summary>
88 : /// The name of the subscription in the private topic.
89 : /// </summary>
90 : protected string PrivateTopicSubscriptionName { get; private set; }
91 :
92 : /// <summary>
93 : /// The name of the subscription in the public topic.
94 : /// </summary>
95 : protected string PublicTopicSubscriptionName { get; private set; }
96 :
97 : /// <summary>
98 : /// The configuration key for the message bus connection string as used by <see cref="IConfigurationManager"/>.
99 : /// </summary>
100 : protected abstract string MessageBusConnectionStringConfigurationKey { get; }
101 :
102 : /// <summary>
103 : /// The configuration key for the signing token as used by <see cref="IConfigurationManager"/>.
104 : /// </summary>
105 : protected abstract string SigningTokenConfigurationKey { get; }
106 :
107 : /// <summary>
108 : /// The configuration key for the name of the private topic as used by <see cref="IConfigurationManager"/>.
109 : /// </summary>
110 : protected abstract string PrivateTopicNameConfigurationKey { get; }
111 :
112 : /// <summary>
113 : /// The configuration key for the name of the public topic as used by <see cref="IConfigurationManager"/>.
114 : /// </summary>
115 : protected abstract string PublicTopicNameConfigurationKey { get; }
116 :
117 : /// <summary>
118 : /// The default name of the private topic if no <see cref="IConfigurationManager"/> value is set.
119 : /// </summary>
120 : protected abstract string DefaultPrivateTopicName { get; }
121 :
122 : /// <summary>
123 : /// The default name of the public topic if no <see cref="IConfigurationManager"/> value is set.
124 : /// </summary>
125 : protected abstract string DefaultPublicTopicName { get; }
126 :
127 : /// <summary>
128 : /// The configuration key for the name of the subscription in the private topic as used by <see cref="IConfigurationManager"/>.
129 : /// </summary>
130 : protected abstract string PrivateTopicSubscriptionNameConfigurationKey { get; }
131 :
132 : /// <summary>
133 : /// The configuration key for the name of the subscription in the public topic as used by <see cref="IConfigurationManager"/>.
134 : /// </summary>
135 : protected abstract string PublicTopicSubscriptionNameConfigurationKey { get; }
136 :
137 : /// <summary>
138 : /// The configuration key that
139 : /// specifies if an <see cref="Exception"/> is thrown if the network lock is lost
140 : /// as used by <see cref="IConfigurationManager"/>.
141 : /// </summary>
142 : protected abstract string ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey { get; }
143 :
144 : /// <summary>
145 : /// Specifies if an <see cref="Exception"/> is thrown if the network lock is lost.
146 : /// </summary>
147 : protected bool ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete { get; private set; }
148 :
149 : /// <summary>
150 : /// The default name of the subscription in the private topic if no <see cref="IConfigurationManager"/> value is set.
151 : /// </summary>
152 : protected const string DefaultPrivateTopicSubscriptionName = "Root";
153 :
154 : /// <summary>
155 : /// The default name of the subscription in the public topic if no <see cref="IConfigurationManager"/> value is set.
156 : /// </summary>
157 : protected const string DefaultPublicTopicSubscriptionName = "Root";
158 :
159 : #if NET452
160 : /// <summary>
161 : /// The <see cref="Action{TBrokeredMessage}">handler</see> used for <see cref="IMessageReceiver.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage}, OnMessageOptions)"/> on each receiver.
162 : /// </summary>
163 : protected Action<BrokeredMessage> ReceiverMessageHandler { get; set; }
164 : #endif
165 : #if NETCOREAPP3_0
166 : /// <summary>
167 : /// The <see cref="Action{IMessageReceiver, TBrokeredMessage}">handler</see> used for <see cref="MessageReceiver.RegisterMessageHandler(Func{BrokeredMessage, CancellationToken, Task}, MessageHandlerOptions)"/> on each receiver.
168 : /// </summary>
169 : protected Action<IMessageReceiver, BrokeredMessage> ReceiverMessageHandler { get; set; }
170 : #endif
171 :
172 : #if NET452
173 : /// <summary>
174 : /// The <see cref="OnMessageOptions" /> used for <see cref="IMessageReceiver.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage}, OnMessageOptions)"/> on each receiver.
175 : /// </summary>
176 : protected OnMessageOptions ReceiverMessageHandlerOptions { get; set; }
177 : #endif
178 : #if NETCOREAPP3_0
179 : /// <summary>
180 : /// The <see cref="MessageHandlerOptions" /> used for <see cref="MessageReceiver.RegisterMessageHandler(Func{BrokeredMessage, CancellationToken, Task}, MessageHandlerOptions)"/> on each receiver.
181 : /// </summary>
182 : protected MessageHandlerOptions ReceiverMessageHandlerOptions { get; set; }
183 : #endif
184 :
185 : /// <summary>
186 : /// Gets the <see cref="IBusHelper"/>.
187 : /// </summary>
188 : protected IBusHelper BusHelper { get; private set; }
189 :
190 : /// <summary>
191 : /// Gets the <see cref="IAzureBusHelper{TAuthenticationToken}"/>.
192 : /// </summary>
193 : protected IAzureBusHelper<TAuthenticationToken> AzureBusHelper { get; private set; }
194 :
195 : /// <summary>
196 : /// Gets the <see cref="ITelemetryHelper"/>.
197 : /// </summary>
198 : protected ITelemetryHelper TelemetryHelper { get; set; }
199 :
200 : /// <summary>
201 : /// The maximum number of time a retry is tried if a <see cref="System.TimeoutException"/> is thrown while sending messages.
202 : /// </summary>
203 : protected short TimeoutOnSendRetryMaximumCount { get; private set; }
204 :
205 : /// <summary>
206 : /// The <see cref="IHashAlgorithmFactory"/> to use to sign messages.
207 : /// </summary>
208 : protected IHashAlgorithmFactory Signer { get; private set; }
209 :
210 : /// <summary>
211 : /// A list of namespaces to exclude when trying to automatically determine the container.
212 : /// </summary>
213 : protected IList<string> ExclusionNamespaces { get; private set; }
214 :
215 : /// <summary>
216 : /// Instantiates a new instance of <see cref="AzureServiceBus{TAuthenticationToken}"/>
217 : /// </summary>
218 1 : protected AzureServiceBus(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper, IHashAlgorithmFactory hashAlgorithmFactory, bool isAPublisher)
219 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, isAPublisher)
220 : {
221 : AzureBusHelper = azureBusHelper;
222 : BusHelper = busHelper;
223 : TelemetryHelper = new NullTelemetryHelper();
224 : PrivateServiceBusReceivers = new Dictionary<int, IMessageReceiver>();
225 : PublicServiceBusReceivers = new Dictionary<int, IMessageReceiver>();
226 : TimeoutOnSendRetryMaximumCount = 1;
227 : string timeoutOnSendRetryMaximumCountValue;
228 : short timeoutOnSendRetryMaximumCount;
229 : if (ConfigurationManager.TryGetSetting("Cqrs.Azure.Servicebus.TimeoutOnSendRetryMaximumCount", out timeoutOnSendRetryMaximumCountValue) && !string.IsNullOrWhiteSpace(timeoutOnSendRetryMaximumCountValue) && short.TryParse(timeoutOnSendRetryMaximumCountValue, out timeoutOnSendRetryMaximumCount))
230 : TimeoutOnSendRetryMaximumCount = timeoutOnSendRetryMaximumCount;
231 : ExclusionNamespaces = new SynchronizedCollection<string> { "Cqrs", "System" };
232 : Signer = hashAlgorithmFactory;
233 : }
234 :
235 : #region Overrides of AzureBus<TAuthenticationToken>
236 :
237 : /// <summary>
238 : /// Gets the connection string for the bus from <see cref="AzureBus{TAuthenticationToken}.ConfigurationManager"/>
239 : /// </summary>
240 1 : protected override string GetConnectionString()
241 : {
242 : string connectionString = ConfigurationManager.GetSetting(MessageBusConnectionStringConfigurationKey);
243 : if (string.IsNullOrWhiteSpace(connectionString))
244 : throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and has a valid connection string value.", MessageBusConnectionStringConfigurationKey));
245 : return connectionString;
246 : }
247 :
248 : #endregion
249 :
250 : /// <summary>
251 : /// Instantiate publishing on this bus by
252 : /// calling <see cref="CheckPrivateTopicExists"/> and <see cref="CheckPublicTopicExists"/>
253 : /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
254 : /// </summary>
255 1 : protected override void InstantiatePublishing()
256 : {
257 : #if NET452
258 : Manager manager = Manager.CreateFromConnectionString(ConnectionString);
259 : #endif
260 : #if NETCOREAPP3_0
261 : var manager = new Manager(ConnectionString);
262 : #endif
263 : CheckPrivateTopicExists(manager);
264 : CheckPublicTopicExists(manager);
265 :
266 : #if NET452
267 : PrivateServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PrivateTopicName);
268 : PublicServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PublicTopicName);
269 : #endif
270 : #if NETCOREAPP3_0
271 : PrivateServiceBusPublisher = new TopicClient(ConnectionString, PrivateTopicName);
272 : PublicServiceBusPublisher = new TopicClient(ConnectionString, PublicTopicName);
273 : #endif
274 : StartSettingsChecking();
275 : }
276 :
277 : /// <summary>
278 : /// Instantiate receiving on this bus by
279 : /// calling <see cref="CheckPrivateTopicExists"/> and <see cref="CheckPublicTopicExists"/>
280 : /// then InstantiateReceiving for private and public topics,
281 : /// calls <see cref="CleanUpDeadLetters"/> for the private and public topics,
282 : /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
283 : /// </summary>
284 1 : protected override void InstantiateReceiving()
285 : {
286 : #if NET452
287 : Manager manager = Manager.CreateFromConnectionString(ConnectionString);
288 : #endif
289 : #if NETCOREAPP3_0
290 : var manager = new Manager(ConnectionString);
291 : #endif
292 :
293 : CheckPrivateTopicExists(manager);
294 : CheckPublicTopicExists(manager);
295 :
296 : try
297 : {
298 : InstantiateReceiving(manager, PrivateServiceBusReceivers, PrivateTopicName, PrivateTopicSubscriptionName);
299 : }
300 : catch (UriFormatException exception)
301 : {
302 : throw new InvalidConfigurationException("The connection string for one of the private Service Bus receivers may be invalid.", exception);
303 : }
304 : try
305 : {
306 : InstantiateReceiving(manager, PublicServiceBusReceivers, PublicTopicName, PublicTopicSubscriptionName);
307 : }
308 : catch (UriFormatException exception)
309 : {
310 : throw new InvalidConfigurationException("The connection string for one of the public Service Bus receivers may be invalid.", exception);
311 : }
312 :
313 : bool enableDeadLetterCleanUp;
314 : string enableDeadLetterCleanUpValue = ConfigurationManager.GetSetting("Cqrs.Azure.Servicebus.EnableDeadLetterCleanUp");
315 : if (bool.TryParse(enableDeadLetterCleanUpValue, out enableDeadLetterCleanUp) && enableDeadLetterCleanUp)
316 : {
317 : CleanUpDeadLetters(PrivateTopicName, PrivateTopicSubscriptionName);
318 : CleanUpDeadLetters(PublicTopicName, PublicTopicSubscriptionName);
319 : }
320 :
321 : // If this is also a publisher, then it will the check over there and that will handle this
322 : // we only need to check one of these
323 : if (PublicServiceBusPublisher != null)
324 : return;
325 :
326 : StartSettingsChecking();
327 : }
328 :
329 : #if NET452
330 : /// <summary>
331 : /// Creates <see cref="AzureBus{TAuthenticationToken}.NumberOfReceiversCount"/> <see cref="IMessageReceiver"/>.
332 : /// If flushing is required, any flushed <see cref="IMessageReceiver"/> has <see cref="ClientEntity.Close()"/> called on it first.
333 : /// </summary>
334 : /// <param name="manager">The <see cref="Manager"/>.</param>
335 : /// <param name="serviceBusReceivers">The receivers collection to place <see cref="IMessageReceiver"/> instances into.</param>
336 : /// <param name="topicName">The topic name.</param>
337 : /// <param name="topicSubscriptionName">The topic subscription name.</param>
338 : #endif
339 : #if NETCOREAPP3_0
340 : /// <summary>
341 : /// Creates <see cref="AzureBus{TAuthenticationToken}.NumberOfReceiversCount"/> <see cref="IMessageReceiver"/>.
342 : /// If flushing is required, any flushed <see cref="IMessageReceiver"/> has <see cref="ClientEntity.CloseAsync()"/> called on it first.
343 : /// </summary>
344 : /// <param name="manager">The <see cref="Manager"/>.</param>
345 : /// <param name="serviceBusReceivers">The receivers collection to place <see cref="IMessageReceiver"/> instances into.</param>
346 : /// <param name="topicName">The topic name.</param>
347 : /// <param name="topicSubscriptionName">The topic subscription name.</param>
348 : #endif
349 0 : protected virtual void InstantiateReceiving(Manager manager, IDictionary<int, IMessageReceiver> serviceBusReceivers, string topicName, string topicSubscriptionName)
350 : {
351 : for (int i = 0; i < NumberOfReceiversCount; i++)
352 : {
353 : #if NET452
354 : IMessageReceiver serviceBusReceiver = SubscriptionClient.CreateFromConnectionString(ConnectionString, topicName, topicSubscriptionName);
355 : #endif
356 : #if NETCOREAPP3_0
357 : IMessageReceiver serviceBusReceiver = new MessageReceiver(ConnectionString, EntityNameHelper.FormatSubscriptionPath(topicName, topicSubscriptionName));
358 : #endif
359 : if (serviceBusReceivers.ContainsKey(i))
360 : serviceBusReceivers[i] = serviceBusReceiver;
361 : else
362 : serviceBusReceivers.Add(i, serviceBusReceiver);
363 : }
364 : // Remove any if the number has decreased
365 : for (int i = NumberOfReceiversCount; i < serviceBusReceivers.Count; i++)
366 : {
367 : IMessageReceiver serviceBusReceiver;
368 : if (serviceBusReceivers.TryGetValue(i, out serviceBusReceiver))
369 : {
370 : #if NET452
371 : serviceBusReceiver.Close();
372 : #endif
373 : #if NETCOREAPP3_0
374 : serviceBusReceiver.CloseAsync().Wait(1500);
375 : #endif
376 : }
377 : serviceBusReceivers.Remove(i);
378 : }
379 : }
380 :
381 : /// <summary>
382 : /// Checks if the private topic and subscription name exists as per <see cref="PrivateTopicName"/> and <see cref="PrivateTopicSubscriptionName"/>.
383 : /// </summary>
384 : /// <param name="manager">The <see cref="Manager"/>.</param>
385 1 : protected virtual void CheckPrivateTopicExists(Manager manager)
386 : {
387 : CheckTopicExists(manager, PrivateTopicName = ConfigurationManager.GetSetting(PrivateTopicNameConfigurationKey) ?? DefaultPrivateTopicName, PrivateTopicSubscriptionName = ConfigurationManager.GetSetting(PrivateTopicSubscriptionNameConfigurationKey) ?? DefaultPrivateTopicSubscriptionName);
388 : }
389 :
390 : /// <summary>
391 : /// Checks if the public topic and subscription name exists as per <see cref="PublicTopicName"/> and <see cref="PublicTopicSubscriptionName"/>.
392 : /// </summary>
393 : /// <param name="manager">The <see cref="Manager"/>.</param>
394 1 : protected virtual void CheckPublicTopicExists(Manager manager)
395 : {
396 : CheckTopicExists(manager, PublicTopicName = ConfigurationManager.GetSetting(PublicTopicNameConfigurationKey) ?? DefaultPublicTopicName, PublicTopicSubscriptionName = ConfigurationManager.GetSetting(PublicTopicSubscriptionNameConfigurationKey) ?? DefaultPublicTopicSubscriptionName);
397 : }
398 :
399 : /// <summary>
400 : /// Checks if a topic by the provided <paramref name="topicName"/> exists and
401 : /// Checks if a subscription name by the provided <paramref name="subscriptionName"/> exists.
402 : /// </summary>
403 1 : protected virtual void CheckTopicExists(Manager manager, string topicName, string subscriptionName)
404 : {
405 : // Configure Queue Settings
406 : var eventTopicDescription = new TopicDescription(topicName)
407 : {
408 : #if NET452
409 : MaxSizeInMegabytes = 5120,
410 : #endif
411 : #if NETCOREAPP3_0
412 : MaxSizeInMB = 5120,
413 : #endif
414 : DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
415 : EnablePartitioning = true,
416 : EnableBatchedOperations = true,
417 : };
418 :
419 : #if NETCOREAPP3_0
420 : Task<bool> checkTask = manager.TopicExistsAsync(topicName);
421 : checkTask.Wait(1500);
422 : if (!checkTask.Result)
423 : {
424 : Task<TopicDescription> createTask = manager.CreateTopicAsync(eventTopicDescription);
425 : createTask.Wait(1500);
426 : }
427 : #endif
428 :
429 : #if NET452
430 : // Create the topic if it does not exist already
431 : if (!manager.TopicExists(eventTopicDescription.Path))
432 : manager.CreateTopic(eventTopicDescription);
433 :
434 : if (!manager.SubscriptionExists(eventTopicDescription.Path, subscriptionName))
435 : manager.CreateSubscription
436 : (
437 : new SubscriptionDescription(eventTopicDescription.Path, subscriptionName)
438 : {
439 : DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
440 : EnableBatchedOperations = true,
441 : EnableDeadLetteringOnFilterEvaluationExceptions = true
442 : }
443 : );
444 : #endif
445 : }
446 :
447 : /// <summary>
448 : /// Triggers settings checking on both public and private publishers and receivers,
449 : /// then calls <see cref="InstantiatePublishing"/> if <see cref="PublicServiceBusPublisher"/> is not null.
450 : /// </summary>
451 1 : protected override void TriggerSettingsChecking()
452 : {
453 : // First refresh the EventBlackListProcessing property
454 : bool throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
455 : if (!ConfigurationManager.TryGetSetting(ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey, out throwExceptionOnReceiverMessageLockLostExceptionDuringComplete))
456 : throwExceptionOnReceiverMessageLockLostExceptionDuringComplete = true;
457 : ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete = throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
458 :
459 : TriggerSettingsChecking(PrivateServiceBusPublisher, PrivateServiceBusReceivers);
460 : TriggerSettingsChecking(PublicServiceBusPublisher, PublicServiceBusReceivers);
461 :
462 : // Restart configuration, we order this intentionally with the publisher second as if this triggers the cancellation there's nothing else to process here
463 : // we also only need to check one of the publishers
464 : if (PublicServiceBusPublisher != null)
465 : {
466 : Logger.LogDebug("Recursively calling into InstantiatePublishing.");
467 : InstantiatePublishing();
468 : }
469 : }
470 :
471 : /// <summary>
472 : /// Triggers settings checking on the provided <paramref name="serviceBusPublisher"/> and <paramref name="serviceBusReceivers"/>,
473 : /// then calls <see cref="InstantiateReceiving()"/>.
474 : /// </summary>
475 1 : protected virtual void TriggerSettingsChecking(TopicClient serviceBusPublisher, IDictionary<int, IMessageReceiver> serviceBusReceivers)
476 : {
477 : // Let's wrap up using this message bus and start the switch
478 : if (serviceBusPublisher != null)
479 : {
480 : #if NET452
481 : serviceBusPublisher.Close();
482 : #endif
483 : #if NETCOREAPP3_0
484 : serviceBusPublisher.CloseAsync().Wait(1500);
485 : #endif
486 : Logger.LogDebug("Publishing service bus closed.");
487 : }
488 : foreach (IMessageReceiver serviceBusReceiver in serviceBusReceivers.Values)
489 : {
490 : // Let's wrap up using this message bus and start the switch
491 : if (serviceBusReceiver != null)
492 : {
493 : #if NET452
494 : serviceBusReceiver.Close();
495 : #endif
496 : #if NETCOREAPP3_0
497 : serviceBusReceiver.CloseAsync().Wait(1500);
498 : #endif
499 : Logger.LogDebug("Receiving service bus closed.");
500 : }
501 : // Restart configuration, we order this intentionally with the receiver first as if this triggers the cancellation we know this isn't a publisher as well
502 : if (serviceBusReceiver != null)
503 : {
504 : Logger.LogDebug("Recursively calling into InstantiateReceiving.");
505 : InstantiateReceiving();
506 :
507 : // This will be the case of a connection setting change re-connection
508 : if (ReceiverMessageHandler != null && ReceiverMessageHandlerOptions != null)
509 : {
510 : // Callback to handle received messages
511 : Logger.LogDebug("Re-registering onMessage handler.");
512 : ApplyReceiverMessageHandler();
513 : }
514 : else
515 : Logger.LogWarning("No onMessage handler was found to re-bind.");
516 : }
517 : }
518 : }
519 :
520 : /// <summary>
521 : /// Registers the provided <paramref name="receiverMessageHandler"/> with the provided <paramref name="receiverMessageHandlerOptions"/>.
522 : /// </summary>
523 : #if NET452
524 : protected virtual void RegisterReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
525 : #endif
526 : #if NETCOREAPP3_0
527 : protected virtual void RegisterReceiverMessageHandler(Action<IMessageReceiver, BrokeredMessage> receiverMessageHandler, MessageHandlerOptions receiverMessageHandlerOptions)
528 : #endif
529 : {
530 : StoreReceiverMessageHandler(receiverMessageHandler, receiverMessageHandlerOptions);
531 :
532 : ApplyReceiverMessageHandler();
533 : }
534 :
535 : /// <summary>
536 : /// Stores the provided <paramref name="receiverMessageHandler"/> and <paramref name="receiverMessageHandlerOptions"/>.
537 : /// </summary>
538 : #if NET452
539 : protected virtual void StoreReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
540 : #endif
541 : #if NETCOREAPP3_0
542 : protected virtual void StoreReceiverMessageHandler(Action<IMessageReceiver, BrokeredMessage> receiverMessageHandler, MessageHandlerOptions receiverMessageHandlerOptions)
543 : #endif
544 : {
545 : ReceiverMessageHandler = receiverMessageHandler;
546 : ReceiverMessageHandlerOptions = receiverMessageHandlerOptions;
547 : }
548 :
549 : /// <summary>
550 : /// Applies the stored ReceiverMessageHandler and ReceiverMessageHandlerOptions to all receivers in
551 : /// <see cref="PrivateServiceBusReceivers"/> and <see cref="PublicServiceBusReceivers"/>.
552 : /// </summary>
553 1 : protected override void ApplyReceiverMessageHandler()
554 : {
555 : foreach (IMessageReceiver serviceBusReceiver in PrivateServiceBusReceivers.Values)
556 : {
557 : #if NET452
558 : serviceBusReceiver
559 : .OnMessage
560 : (
561 : message =>
562 : {
563 : BusHelper.SetWasPrivateBusUsed(true);
564 : ReceiverMessageHandler(message);
565 : },
566 : ReceiverMessageHandlerOptions
567 : );
568 : #endif
569 : #if NETCOREAPP3_0
570 : serviceBusReceiver
571 : .RegisterMessageHandler
572 : (
573 : (message, cancellationToken) =>
574 : {
575 : return Task.Factory.StartNewSafely(() => {
576 : BusHelper.SetWasPrivateBusUsed(true);
577 : ReceiverMessageHandler(serviceBusReceiver, message);
578 : });
579 : },
580 : ReceiverMessageHandlerOptions
581 : );
582 : #endif
583 : }
584 : foreach (IMessageReceiver serviceBusReceiver in PublicServiceBusReceivers.Values)
585 : {
586 : #if NET452
587 : serviceBusReceiver
588 : .OnMessage
589 : (
590 : message =>
591 : {
592 : BusHelper.SetWasPrivateBusUsed(false);
593 : ReceiverMessageHandler(message);
594 : },
595 : ReceiverMessageHandlerOptions
596 : );
597 : #endif
598 : #if NETCOREAPP3_0
599 : serviceBusReceiver
600 : .RegisterMessageHandler
601 : (
602 : (message, cancellationToken) =>
603 : {
604 : return Task.Factory.StartNewSafely(() => {
605 : BusHelper.SetWasPrivateBusUsed(false);
606 : ReceiverMessageHandler(serviceBusReceiver, message);
607 : });
608 : },
609 : ReceiverMessageHandlerOptions
610 : );
611 : #endif
612 : }
613 : }
614 :
615 : /// <summary>
616 : /// Using a <see cref="Task"/>, clears all dead letters from the topic and subscription of the
617 : /// provided <paramref name="topicName"/> and <paramref name="topicSubscriptionName"/>.
618 : /// </summary>
619 : /// <param name="topicName">The name of the topic.</param>
620 : /// <param name="topicSubscriptionName">The name of the subscription.</param>
621 : /// <returns></returns>
622 1 : protected virtual CancellationTokenSource CleanUpDeadLetters(string topicName, string topicSubscriptionName)
623 : {
624 : var brokeredMessageRenewCancellationTokenSource = new CancellationTokenSource();
625 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
626 : int lockIssues = 0;
627 :
628 : #if NET452
629 : Action<BrokeredMessage, IMessage> leaveDeadlLetterInQueue = (deadLetterBrokeredMessage, deadLetterMessage) =>
630 : #endif
631 : #if NETCOREAPP3_0
632 : Action<IMessageReceiver, BrokeredMessage, IMessage> leaveDeadlLetterInQueue = (client, deadLetterBrokeredMessage, deadLetterMessage) =>
633 : #endif
634 : {
635 : // Remove message from queue
636 : try
637 : {
638 : #if NET452
639 : deadLetterBrokeredMessage.Abandon();
640 : #endif
641 : #if NETCOREAPP3_0
642 : client.AbandonAsync(deadLetterBrokeredMessage.SystemProperties.LockToken).Wait(1500);
643 : #endif
644 : lockIssues = 0;
645 : }
646 : catch (MessageLockLostException)
647 : {
648 : lockIssues++;
649 : Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
650 : }
651 : Logger.LogDebug(string.Format("A dead-letter message of type {0} arrived with the id '{1}' but left in the queue due to settings.", deadLetterMessage.GetType().FullName, deadLetterBrokeredMessage.MessageId));
652 : };
653 : #if NET452
654 : Action <BrokeredMessage> removeDeadlLetterFromQueue = (deadLetterBrokeredMessage) =>
655 : #endif
656 : #if NETCOREAPP3_0
657 : Action<IMessageReceiver, BrokeredMessage> removeDeadlLetterFromQueue = (client, deadLetterBrokeredMessage) =>
658 : #endif
659 : {
660 : // Remove message from queue
661 : try
662 : {
663 : #if NET452
664 : deadLetterBrokeredMessage.Complete();
665 : #endif
666 : #if NETCOREAPP3_0
667 : client.CompleteAsync(deadLetterBrokeredMessage.SystemProperties.LockToken).Wait(1500);
668 : #endif
669 : lockIssues = 0;
670 : }
671 : catch (MessageLockLostException)
672 : {
673 : lockIssues++;
674 : Logger.LogWarning(string.Format("The lock supplied for complete for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
675 : }
676 : Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}' but was removed as processing was skipped due to settings.", deadLetterBrokeredMessage.MessageId));
677 : };
678 :
679 : Task.Factory.StartNewSafely(() =>
680 : {
681 : int loop = 0;
682 : while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
683 : {
684 : lockIssues = 0;
685 : IEnumerable<BrokeredMessage> brokeredMessages;
686 :
687 : #if NET452
688 : MessagingFactory factory = MessagingFactory.CreateFromConnectionString(ConnectionString);
689 : string deadLetterPath = SubscriptionClient.FormatDeadLetterPath(topicName, topicSubscriptionName);
690 : MessageReceiver client = factory.CreateMessageReceiver(deadLetterPath, ReceiveMode.PeekLock);
691 : brokeredMessages = client.ReceiveBatch(1000);
692 : #endif
693 : #if NETCOREAPP3_0
694 : string deadLetterPath = EntityNameHelper.FormatDeadLetterPath(EntityNameHelper.FormatSubscriptionPath(topicName, topicSubscriptionName));
695 : MessageReceiver client = new MessageReceiver(ConnectionString, deadLetterPath, ReceiveMode.PeekLock);
696 : Task<IList<BrokeredMessage>> receiveTask = client.ReceiveAsync(1000);
697 : receiveTask.Wait(10000);
698 : if (receiveTask.IsCompletedSuccessfully && receiveTask.Result != null)
699 : brokeredMessages = receiveTask.Result;
700 : else
701 : brokeredMessages = Enumerable.Empty<BrokeredMessage>();
702 : #endif
703 :
704 : foreach (BrokeredMessage brokeredMessage in brokeredMessages)
705 : {
706 : if (lockIssues > 10)
707 : break;
708 : try
709 : {
710 : Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}'.", brokeredMessage.MessageId));
711 : string messageBody = brokeredMessage.GetBodyAsString();
712 :
713 : // Closure protection
714 : BrokeredMessage message = brokeredMessage;
715 : try
716 : {
717 : AzureBusHelper.ReceiveEvent
718 : (
719 : messageBody,
720 : @event =>
721 : {
722 : bool isRequired = BusHelper.IsEventRequired(@event.GetType());
723 : if (!isRequired)
724 : {
725 : #if NET452
726 : removeDeadlLetterFromQueue(message);
727 : #endif
728 : #if NETCOREAPP3_0
729 : removeDeadlLetterFromQueue(client, message);
730 : #endif
731 : }
732 : else
733 : {
734 : #if NET452
735 : leaveDeadlLetterInQueue(message, @event);
736 : #endif
737 : #if NETCOREAPP3_0
738 : leaveDeadlLetterInQueue(client, message, @event);
739 : #endif
740 : }
741 : return true;
742 : },
743 : string.Format("id '{0}'", brokeredMessage.MessageId),
744 : ExtractSignature(message),
745 : SigningTokenConfigurationKey,
746 : () =>
747 : {
748 : #if NET452
749 : removeDeadlLetterFromQueue(message);
750 : #endif
751 : #if NETCOREAPP3_0
752 : removeDeadlLetterFromQueue(client, message);
753 : #endif
754 : },
755 : () => { }
756 : );
757 : }
758 : catch
759 : {
760 : AzureBusHelper.ReceiveCommand
761 : (
762 : messageBody,
763 : command =>
764 : {
765 : bool isRequired = BusHelper.IsEventRequired(command.GetType());
766 : if (!isRequired)
767 : {
768 : #if NET452
769 : removeDeadlLetterFromQueue(message);
770 : #endif
771 : #if NETCOREAPP3_0
772 : removeDeadlLetterFromQueue(client, message);
773 : #endif
774 : }
775 : else
776 : {
777 : #if NET452
778 : leaveDeadlLetterInQueue(message, command);
779 : #endif
780 : #if NETCOREAPP3_0
781 : leaveDeadlLetterInQueue(client, message, command);
782 : #endif
783 : }
784 : return true;
785 : },
786 : string.Format("id '{0}'", brokeredMessage.MessageId),
787 : ExtractSignature(message),
788 : SigningTokenConfigurationKey,
789 : () =>
790 : {
791 : #if NET452
792 : removeDeadlLetterFromQueue(message);
793 : #endif
794 : #if NETCOREAPP3_0
795 : removeDeadlLetterFromQueue(client, message);
796 : #endif
797 : },
798 : () => { }
799 : );
800 : }
801 : }
802 : catch (Exception exception)
803 : {
804 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
805 : // Indicates a problem, unlock message in queue
806 : Logger.LogError(string.Format("A dead-letter message arrived with the id '{0}' but failed to be process.", brokeredMessage.MessageId), exception: exception);
807 : try
808 : {
809 : #if NET452
810 : brokeredMessage.Abandon();
811 : #endif
812 : #if NETCOREAPP3_0
813 : client.AbandonAsync(brokeredMessage.SystemProperties.LockToken).Wait(1500);
814 : #endif
815 : }
816 : catch (MessageLockLostException)
817 : {
818 : lockIssues++;
819 : Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", brokeredMessage.MessageId));
820 : }
821 : }
822 : }
823 : #if NET452
824 : client.Close();
825 : #endif
826 : #if NETCOREAPP3_0
827 : client.CloseAsync().Wait(1500);
828 : #endif
829 :
830 : if (loop++ % 5 == 0)
831 : {
832 : loop = 0;
833 : Thread.Yield();
834 : }
835 : else
836 : Thread.Sleep(500);
837 : }
838 : try
839 : {
840 : brokeredMessageRenewCancellationTokenSource.Dispose();
841 : }
842 : catch (ObjectDisposedException) { }
843 : }, brokeredMessageRenewCancellationTokenSource.Token);
844 :
845 : return brokeredMessageRenewCancellationTokenSource;
846 : }
847 :
848 : #if NETCOREAPP3_0
849 : DataContractSerializer brokeredMessageSerialiser = new DataContractSerializer(typeof(string));
850 : #endif
851 : /// <summary>
852 : /// Create a <see cref="BrokeredMessage"/> with additional properties to aid routing and tracing
853 : /// </summary>
854 1 : protected virtual BrokeredMessage CreateBrokeredMessage<TMessage>(Func<TMessage, string> serialiserFunction, Type messageType, TMessage message)
855 : {
856 : string messageBody = serialiserFunction(message);
857 : #if NET452
858 : var brokeredMessage = new BrokeredMessage(messageBody)
859 : #endif
860 : #if NETCOREAPP3_0
861 : byte[] messageBodyData;
862 : using (var stream = new MemoryStream())
863 : {
864 : XmlDictionaryWriter binaryDictionaryWriter = XmlDictionaryWriter.CreateBinaryWriter(stream);
865 : brokeredMessageSerialiser.WriteObject(binaryDictionaryWriter, messageBody);
866 : binaryDictionaryWriter.Flush();
867 : messageBodyData = stream.ToArray();
868 : }
869 :
870 : var brokeredMessage = new BrokeredMessage(messageBodyData)
871 : #endif
872 : {
873 : CorrelationId = CorrelationIdHelper.GetCorrelationId().ToString("N")
874 : };
875 : brokeredMessage.AddUserProperty("CorrelationId", brokeredMessage.CorrelationId);
876 : brokeredMessage.AddUserProperty("Type", messageType.FullName);
877 : brokeredMessage.AddUserProperty("Source", string.Format("{0}/{1}/{2}/{3}", Logger.LoggerSettings.ModuleName, Logger.LoggerSettings.Instance, Logger.LoggerSettings.Environment, Logger.LoggerSettings.EnvironmentInstance));
878 :
879 : // see https://github.com/Chinchilla-Software-Com/CQRS/wiki/Inter-process-function-security</remarks>
880 : string configurationKey = string.Format("{0}.SigningToken", messageType.FullName);
881 : string signingToken;
882 : HashAlgorithm signer = Signer.Create();
883 : if (!ConfigurationManager.TryGetSetting(configurationKey, out signingToken) || string.IsNullOrWhiteSpace(signingToken))
884 : if (!ConfigurationManager.TryGetSetting(SigningTokenConfigurationKey, out signingToken) || string.IsNullOrWhiteSpace(signingToken))
885 : signingToken = Guid.Empty.ToString("N");
886 : if (!string.IsNullOrWhiteSpace(signingToken))
887 : using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", signingToken, messageBody))))
888 : brokeredMessage.AddUserProperty("Signature", Convert.ToBase64String(signer.ComputeHash(hashStream)));
889 :
890 : try
891 : {
892 : var stackTrace = new StackTrace();
893 : StackFrame[] stackFrames = stackTrace.GetFrames();
894 : if (stackFrames != null)
895 : {
896 : foreach (StackFrame frame in stackFrames)
897 : {
898 : MethodBase method = frame.GetMethod();
899 : if (method.ReflectedType == null)
900 : continue;
901 :
902 : try
903 : {
904 : if (ExclusionNamespaces.All(@namespace => !method.ReflectedType.FullName.StartsWith(@namespace)))
905 : {
906 : brokeredMessage.AddUserProperty("Source-Method", string.Format("{0}.{1}", method.ReflectedType.FullName, method.Name));
907 : break;
908 : }
909 : }
910 : catch
911 : {
912 : // Just move on
913 : }
914 : }
915 : }
916 : }
917 : catch
918 : {
919 : // Just move on
920 : }
921 :
922 : return brokeredMessage;
923 : }
924 :
925 : /// <summary>
926 : /// Extract any telemetry properties from the provided <paramref name="message"/>.
927 : /// </summary>
928 1 : protected virtual IDictionary<string, string> ExtractTelemetryProperties(BrokeredMessage message, string baseCommunicationType)
929 : {
930 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", baseCommunicationType } };
931 : object value;
932 : if (message.TryGetUserPropertyValue("Type", out value))
933 : telemetryProperties.Add("MessageType", value.ToString());
934 : if (message.TryGetUserPropertyValue("Source", out value))
935 : telemetryProperties.Add("MessageSource", value.ToString());
936 : if (message.TryGetUserPropertyValue("Source-Method", out value))
937 : telemetryProperties.Add("MessageSourceMethod", value.ToString());
938 : if (message.TryGetUserPropertyValue("CorrelationId", out value) && !telemetryProperties.ContainsKey("CorrelationId"))
939 : telemetryProperties.Add("CorrelationId", value.ToString());
940 :
941 : return telemetryProperties;
942 : }
943 :
944 : /// <summary>
945 : /// Extract the signature from the provided <paramref name="message"/>.
946 : /// </summary>
947 1 : protected virtual string ExtractSignature(BrokeredMessage message)
948 : {
949 : object value;
950 : if (message.TryGetUserPropertyValue("Signature", out value))
951 : return value.ToString();
952 : return null;
953 : }
954 : }
955 : }
|