Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Configuration;
12 : using System.Threading;
13 : using System.Threading.Tasks;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Bus;
17 : using Cqrs.Configuration;
18 : using Cqrs.Exceptions;
19 : using Cqrs.Messages;
20 : using Microsoft.ServiceBus;
21 : using Microsoft.ServiceBus.Messaging;
22 :
23 : namespace Cqrs.Azure.ServiceBus
24 : {
25 : /// <summary>
26 : /// An <see cref="AzureBus{TAuthenticationToken}"/> that uses Azure Service Bus.
27 : /// </summary>
28 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
29 : public abstract class AzureServiceBus<TAuthenticationToken> : AzureBus<TAuthenticationToken>
30 1 : {
31 : /// <summary>
32 : /// Gets the private <see cref="TopicClient"/> publisher.
33 : /// </summary>
34 : protected TopicClient PrivateServiceBusPublisher { get; private set; }
35 :
36 : /// <summary>
37 : /// Gets the public <see cref="TopicClient"/> publisher.
38 : /// </summary>
39 : protected TopicClient PublicServiceBusPublisher { get; private set; }
40 :
41 : /// <summary>
42 : /// Gets the private <see cref="SubscriptionClient"/> receivers.
43 : /// </summary>
44 : protected IDictionary<int, SubscriptionClient> PrivateServiceBusReceivers { get; private set; }
45 :
46 : /// <summary>
47 : /// Gets the public <see cref="SubscriptionClient"/> receivers.
48 : /// </summary>
49 : protected IDictionary<int, SubscriptionClient> PublicServiceBusReceivers { get; private set; }
50 :
51 : /// <summary>
52 : /// The name of the private topic.
53 : /// </summary>
54 : protected string PrivateTopicName { get; private set; }
55 :
56 : /// <summary>
57 : /// The name of the public topic.
58 : /// </summary>
59 : protected string PublicTopicName { get; private set; }
60 :
61 : /// <summary>
62 : /// The name of the subscription in the private topic.
63 : /// </summary>
64 : protected string PrivateTopicSubscriptionName { get; private set; }
65 :
66 : /// <summary>
67 : /// The name of the subscription in the public topic.
68 : /// </summary>
69 : protected string PublicTopicSubscriptionName { get; private set; }
70 :
71 : /// <summary>
72 : /// The configuration key for the message bus connection string as used by <see cref="IConfigurationManager"/>.
73 : /// </summary>
74 : protected abstract string MessageBusConnectionStringConfigurationKey { get; }
75 :
76 : /// <summary>
77 : /// The configuration key for the name of the private topic as used by <see cref="IConfigurationManager"/>.
78 : /// </summary>
79 : protected abstract string PrivateTopicNameConfigurationKey { get; }
80 :
81 : /// <summary>
82 : /// The configuration key for the name of the public topic as used by <see cref="IConfigurationManager"/>.
83 : /// </summary>
84 : protected abstract string PublicTopicNameConfigurationKey { get; }
85 :
86 : /// <summary>
87 : /// The default name of the private topic if no <see cref="IConfigurationManager"/> value is set.
88 : /// </summary>
89 : protected abstract string DefaultPrivateTopicName { get; }
90 :
91 : /// <summary>
92 : /// The default name of the public topic if no <see cref="IConfigurationManager"/> value is set.
93 : /// </summary>
94 : protected abstract string DefaultPublicTopicName { get; }
95 :
96 : /// <summary>
97 : /// The configuration key for the name of the subscription in the private topic as used by <see cref="IConfigurationManager"/>.
98 : /// </summary>
99 : protected abstract string PrivateTopicSubscriptionNameConfigurationKey { get; }
100 :
101 : /// <summary>
102 : /// The configuration key for the name of the subscription in the public topic as used by <see cref="IConfigurationManager"/>.
103 : /// </summary>
104 : protected abstract string PublicTopicSubscriptionNameConfigurationKey { get; }
105 :
106 : /// <summary>
107 : /// The configuration key that
108 : /// specifies if an <see cref="Exception"/> is thrown if the network lock is lost
109 : /// as used by <see cref="IConfigurationManager"/>.
110 : /// </summary>
111 : protected abstract string ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey { get; }
112 :
113 : /// <summary>
114 : /// Specifies if an <see cref="Exception"/> is thrown if the network lock is lost.
115 : /// </summary>
116 : protected bool ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete { get; private set; }
117 :
118 : /// <summary>
119 : /// The default name of the subscription in the private topic if no <see cref="IConfigurationManager"/> value is set.
120 : /// </summary>
121 : protected const string DefaultPrivateTopicSubscriptionName = "Root";
122 :
123 : /// <summary>
124 : /// The default name of the subscription in the public topic if no <see cref="IConfigurationManager"/> value is set.
125 : /// </summary>
126 : protected const string DefaultPublicTopicSubscriptionName = "Root";
127 :
128 : /// <summary>
129 : /// The <see cref="Action{TBrokeredMessage}">handler</see> used for <see cref="SubscriptionClient.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage}, OnMessageOptions)"/> on each receiver.
130 : /// </summary>
131 : protected Action<BrokeredMessage> ReceiverMessageHandler { get; set; }
132 :
133 : /// <summary>
134 : /// The <see cref="OnMessageOptions" /> used for <see cref="SubscriptionClient.OnMessage(System.Action{Microsoft.ServiceBus.Messaging.BrokeredMessage}, OnMessageOptions)"/> on each receiver.
135 : /// </summary>
136 : protected OnMessageOptions ReceiverMessageHandlerOptions { get; set; }
137 :
138 : /// <summary>
139 : /// Gets the <see cref="IBusHelper"/>.
140 : /// </summary>
141 : protected IBusHelper BusHelper { get; private set; }
142 :
143 : /// <summary>
144 : /// Gets the <see cref="IAzureBusHelper{TAuthenticationToken}"/>.
145 : /// </summary>
146 : protected IAzureBusHelper<TAuthenticationToken> AzureBusHelper { get; private set; }
147 :
148 : /// <summary>
149 : /// Gets the <see cref="ITelemetryHelper"/>.
150 : /// </summary>
151 : protected ITelemetryHelper TelemetryHelper { get; set; }
152 :
153 : /// <summary>
154 : /// Instantiates a new instance of <see cref="AzureServiceBus{TAuthenticationToken}"/>
155 : /// </summary>
156 1 : protected AzureServiceBus(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper, bool isAPublisher)
157 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, isAPublisher)
158 : {
159 : AzureBusHelper = azureBusHelper;
160 : BusHelper = busHelper;
161 : TelemetryHelper = new NullTelemetryHelper();
162 : PrivateServiceBusReceivers = new Dictionary<int, SubscriptionClient>();
163 : PublicServiceBusReceivers = new Dictionary<int, SubscriptionClient>();
164 : }
165 :
166 : #region Overrides of AzureBus<TAuthenticationToken>
167 :
168 : /// <summary>
169 : /// Gets the connection string for the bus from <see cref="AzureBus{TAuthenticationToken}.ConfigurationManager"/>
170 : /// </summary>
171 1 : protected override string GetConnectionString()
172 : {
173 : string connectionString = ConfigurationManager.GetSetting(MessageBusConnectionStringConfigurationKey);
174 : if (string.IsNullOrWhiteSpace(connectionString))
175 : throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and has a valid connection string value.", MessageBusConnectionStringConfigurationKey));
176 : return connectionString;
177 : }
178 :
179 : #endregion
180 :
181 : /// <summary>
182 : /// Instantiate publishing on this bus by
183 : /// calling <see cref="CheckPrivateTopicExists"/> and <see cref="CheckPublicTopicExists"/>
184 : /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
185 : /// </summary>
186 1 : protected override void InstantiatePublishing()
187 : {
188 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
189 : CheckPrivateTopicExists(namespaceManager);
190 : CheckPublicTopicExists(namespaceManager);
191 :
192 : PrivateServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PrivateTopicName);
193 : PublicServiceBusPublisher = TopicClient.CreateFromConnectionString(ConnectionString, PublicTopicName);
194 : StartSettingsChecking();
195 : }
196 :
197 : /// <summary>
198 : /// Instantiate receiving on this bus by
199 : /// calling <see cref="CheckPrivateTopicExists"/> and <see cref="CheckPublicTopicExists"/>
200 : /// then InstantiateReceiving for private and public topics,
201 : /// calls <see cref="CleanUpDeadLetters"/> for the private and public topics,
202 : /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
203 : /// </summary>
204 1 : protected override void InstantiateReceiving()
205 : {
206 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
207 :
208 : CheckPrivateTopicExists(namespaceManager);
209 : CheckPublicTopicExists(namespaceManager);
210 :
211 : try
212 : {
213 : InstantiateReceiving(namespaceManager, PrivateServiceBusReceivers, PrivateTopicName, PrivateTopicSubscriptionName);
214 : }
215 : catch (UriFormatException exception)
216 : {
217 : throw new InvalidConfigurationException("The connection string for one of the private Service Bus receivers may be invalid.", exception);
218 : }
219 : try
220 : {
221 : InstantiateReceiving(namespaceManager, PublicServiceBusReceivers, PublicTopicName, PublicTopicSubscriptionName);
222 : }
223 : catch (UriFormatException exception)
224 : {
225 : throw new InvalidConfigurationException("The connection string for one of the public Service Bus receivers may be invalid.", exception);
226 : }
227 :
228 : bool enableDeadLetterCleanUp;
229 : string enableDeadLetterCleanUpValue = ConfigurationManager.GetSetting("Cqrs.Azure.Servicebus.EnableDeadLetterCleanUp");
230 : if (bool.TryParse(enableDeadLetterCleanUpValue, out enableDeadLetterCleanUp) && enableDeadLetterCleanUp)
231 : {
232 : CleanUpDeadLetters(PrivateTopicName, PrivateTopicSubscriptionName);
233 : CleanUpDeadLetters(PublicTopicName, PublicTopicSubscriptionName);
234 : }
235 :
236 : // If this is also a publisher, then it will the check over there and that will handle this
237 : // we only need to check one of these
238 : if (PublicServiceBusPublisher != null)
239 : return;
240 :
241 : StartSettingsChecking();
242 : }
243 :
244 : /// <summary>
245 : /// Creates <see cref="AzureBus{TAuthenticationToken}.NumberOfReceiversCount"/> <see cref="SubscriptionClient"/>.
246 : /// If flushing is required, any flushed <see cref="SubscriptionClient"/> has <see cref="ClientEntity.Close()"/> called on it first.
247 : /// </summary>
248 : /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
249 : /// <param name="serviceBusReceivers">The receivers collection to place <see cref="SubscriptionClient"/> instances into.</param>
250 : /// <param name="topicName">The topic name.</param>
251 : /// <param name="topicSubscriptionName">The topic subscription name.</param>
252 1 : protected virtual void InstantiateReceiving(NamespaceManager namespaceManager, IDictionary<int, SubscriptionClient> serviceBusReceivers, string topicName, string topicSubscriptionName)
253 : {
254 : for (int i = 0; i < NumberOfReceiversCount; i++)
255 : {
256 : SubscriptionClient serviceBusReceiver = SubscriptionClient.CreateFromConnectionString(ConnectionString, topicName, topicSubscriptionName);
257 : if (serviceBusReceivers.ContainsKey(i))
258 : serviceBusReceivers[i] = serviceBusReceiver;
259 : else
260 : serviceBusReceivers.Add(i, serviceBusReceiver);
261 : }
262 : // Remove any if the number has decreased
263 : for (int i = NumberOfReceiversCount; i < serviceBusReceivers.Count; i++)
264 : {
265 : SubscriptionClient serviceBusReceiver;
266 : if (serviceBusReceivers.TryGetValue(i, out serviceBusReceiver))
267 : serviceBusReceiver.Close();
268 : serviceBusReceivers.Remove(i);
269 : }
270 : }
271 :
272 : /// <summary>
273 : /// Checks if the private topic and subscription name exists as per <see cref="PrivateTopicName"/> and <see cref="PrivateTopicSubscriptionName"/>.
274 : /// </summary>
275 : /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
276 1 : protected virtual void CheckPrivateTopicExists(NamespaceManager namespaceManager)
277 : {
278 : CheckTopicExists(namespaceManager, PrivateTopicName = ConfigurationManager.GetSetting(PrivateTopicNameConfigurationKey) ?? DefaultPrivateTopicName, PrivateTopicSubscriptionName = ConfigurationManager.GetSetting(PrivateTopicSubscriptionNameConfigurationKey) ?? DefaultPrivateTopicSubscriptionName);
279 : }
280 :
281 : /// <summary>
282 : /// Checks if the public topic and subscription name exists as per <see cref="PublicTopicName"/> and <see cref="PublicTopicSubscriptionName"/>.
283 : /// </summary>
284 : /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
285 1 : protected virtual void CheckPublicTopicExists(NamespaceManager namespaceManager)
286 : {
287 : CheckTopicExists(namespaceManager, PublicTopicName = ConfigurationManager.GetSetting(PublicTopicNameConfigurationKey) ?? DefaultPublicTopicName, PublicTopicSubscriptionName = ConfigurationManager.GetSetting(PublicTopicSubscriptionNameConfigurationKey) ?? DefaultPublicTopicSubscriptionName);
288 : }
289 :
290 : /// <summary>
291 : /// Checks if a topic by the provided <paramref name="topicName"/> exists and
292 : /// Checks if a subscription name by the provided <paramref name="subscriptionName"/> exists.
293 : /// </summary>
294 1 : protected virtual void CheckTopicExists(NamespaceManager namespaceManager, string topicName, string subscriptionName)
295 : {
296 : // Configure Queue Settings
297 : var eventTopicDescription = new TopicDescription(topicName)
298 : {
299 : MaxSizeInMegabytes = 5120,
300 : DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
301 : EnablePartitioning = true,
302 : EnableBatchedOperations = true
303 : };
304 : // Create the topic if it does not exist already
305 : if (!namespaceManager.TopicExists(eventTopicDescription.Path))
306 : namespaceManager.CreateTopic(eventTopicDescription);
307 :
308 : if (!namespaceManager.SubscriptionExists(eventTopicDescription.Path, subscriptionName))
309 : namespaceManager.CreateSubscription
310 : (
311 : new SubscriptionDescription(eventTopicDescription.Path, subscriptionName)
312 : {
313 : DefaultMessageTimeToLive = new TimeSpan(0, 25, 0),
314 : EnableBatchedOperations = true,
315 : EnableDeadLetteringOnFilterEvaluationExceptions = true
316 : }
317 : );
318 : }
319 :
320 : /// <summary>
321 : /// Triggers settings checking on both public and private publishers and receivers,
322 : /// then calls <see cref="InstantiatePublishing"/> if <see cref="PublicServiceBusPublisher"/> is not null.
323 : /// </summary>
324 1 : protected override void TriggerSettingsChecking()
325 : {
326 : // First refresh the EventBlackListProcessing property
327 : bool throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
328 : if (!ConfigurationManager.TryGetSetting(ThrowExceptionOnReceiverMessageLockLostExceptionDuringCompleteConfigurationKey, out throwExceptionOnReceiverMessageLockLostExceptionDuringComplete))
329 : throwExceptionOnReceiverMessageLockLostExceptionDuringComplete = true;
330 : ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete = throwExceptionOnReceiverMessageLockLostExceptionDuringComplete;
331 :
332 : TriggerSettingsChecking(PrivateServiceBusPublisher, PrivateServiceBusReceivers);
333 : TriggerSettingsChecking(PublicServiceBusPublisher, PublicServiceBusReceivers);
334 :
335 : // Restart configuration, we order this intentionally with the publisher second as if this triggers the cancellation there's nothing else to process here
336 : // we also only need to check one of the publishers
337 : if (PublicServiceBusPublisher != null)
338 : {
339 : Logger.LogDebug("Recursively calling into InstantiatePublishing.");
340 : InstantiatePublishing();
341 : }
342 : }
343 :
344 : /// <summary>
345 : /// Triggers settings checking on the provided <paramref name="serviceBusPublisher"/> and <paramref name="serviceBusReceivers"/>,
346 : /// then calls <see cref="InstantiateReceiving()"/>.
347 : /// </summary>
348 1 : protected virtual void TriggerSettingsChecking(TopicClient serviceBusPublisher, IDictionary<int, SubscriptionClient> serviceBusReceivers)
349 : {
350 : // Let's wrap up using this message bus and start the switch
351 : if (serviceBusPublisher != null)
352 : {
353 : serviceBusPublisher.Close();
354 : Logger.LogDebug("Publishing service bus closed.");
355 : }
356 : foreach (SubscriptionClient serviceBusReceiver in serviceBusReceivers.Values)
357 : {
358 : // Let's wrap up using this message bus and start the switch
359 : if (serviceBusReceiver != null)
360 : {
361 : serviceBusReceiver.Close();
362 : Logger.LogDebug("Receiving service bus closed.");
363 : }
364 : // Restart configuration, we order this intentionally with the receiver first as if this triggers the cancellation we know this isn't a publisher as well
365 : if (serviceBusReceiver != null)
366 : {
367 : Logger.LogDebug("Recursively calling into InstantiateReceiving.");
368 : InstantiateReceiving();
369 :
370 : // This will be the case of a connection setting change re-connection
371 : if (ReceiverMessageHandler != null && ReceiverMessageHandlerOptions != null)
372 : {
373 : // Callback to handle received messages
374 : Logger.LogDebug("Re-registering onMessage handler.");
375 : ApplyReceiverMessageHandler();
376 : }
377 : else
378 : Logger.LogWarning("No onMessage handler was found to re-bind.");
379 : }
380 : }
381 : }
382 :
383 : /// <summary>
384 : /// Registers the provided <paramref name="receiverMessageHandler"/> with the provided <paramref name="receiverMessageHandlerOptions"/>.
385 : /// </summary>
386 1 : protected virtual void RegisterReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
387 : {
388 : StoreReceiverMessageHandler(receiverMessageHandler, receiverMessageHandlerOptions);
389 :
390 : ApplyReceiverMessageHandler();
391 : }
392 :
393 : /// <summary>
394 : /// Stores the provided <paramref name="receiverMessageHandler"/> and <paramref name="receiverMessageHandlerOptions"/>.
395 : /// </summary>
396 1 : protected virtual void StoreReceiverMessageHandler(Action<BrokeredMessage> receiverMessageHandler, OnMessageOptions receiverMessageHandlerOptions)
397 : {
398 : ReceiverMessageHandler = receiverMessageHandler;
399 : ReceiverMessageHandlerOptions = receiverMessageHandlerOptions;
400 : }
401 :
402 : /// <summary>
403 : /// Applies the stored ReceiverMessageHandler and ReceiverMessageHandlerOptions to all receivers in
404 : /// <see cref="PrivateServiceBusReceivers"/> and <see cref="PublicServiceBusReceivers"/>.
405 : /// </summary>
406 1 : protected override void ApplyReceiverMessageHandler()
407 : {
408 : foreach (SubscriptionClient serviceBusReceiver in PrivateServiceBusReceivers.Values)
409 : serviceBusReceiver.OnMessage(ReceiverMessageHandler, ReceiverMessageHandlerOptions);
410 : foreach (SubscriptionClient serviceBusReceiver in PublicServiceBusReceivers.Values)
411 : serviceBusReceiver.OnMessage(ReceiverMessageHandler, ReceiverMessageHandlerOptions);
412 : }
413 :
414 : /// <summary>
415 : /// Using a <see cref="Task"/>, clears all dead letters from the topic and subscription of the
416 : /// provided <paramref name="topicName"/> and <paramref name="topicSubscriptionName"/>.
417 : /// </summary>
418 : /// <param name="topicName">The name of the topic.</param>
419 : /// <param name="topicSubscriptionName">The name of the subscription.</param>
420 : /// <returns></returns>
421 1 : protected virtual CancellationTokenSource CleanUpDeadLetters(string topicName, string topicSubscriptionName)
422 : {
423 : var brokeredMessageRenewCancellationTokenSource = new CancellationTokenSource();
424 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
425 : int lockIssues = 0;
426 :
427 : Action<BrokeredMessage, IMessage> leaveDeadlLetterInQueue = (deadLetterBrokeredMessage, deadLetterMessage) =>
428 : {
429 : // Remove message from queue
430 : try
431 : {
432 : deadLetterBrokeredMessage.Abandon();
433 : lockIssues = 0;
434 : }
435 : catch (MessageLockLostException)
436 : {
437 : lockIssues++;
438 : Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
439 : }
440 : Logger.LogDebug(string.Format("A dead-letter message of type {0} arrived with the id '{1}' but left in the queue due to settings.", deadLetterMessage.GetType().FullName, deadLetterBrokeredMessage.MessageId));
441 : };
442 : Action<BrokeredMessage> removeDeadlLetterFromQueue = deadLetterBrokeredMessage =>
443 : {
444 : // Remove message from queue
445 : try
446 : {
447 : deadLetterBrokeredMessage.Complete();
448 : lockIssues = 0;
449 : }
450 : catch (MessageLockLostException)
451 : {
452 : lockIssues++;
453 : Logger.LogWarning(string.Format("The lock supplied for complete for the skipped dead-letter message '{0}' is invalid.", deadLetterBrokeredMessage.MessageId));
454 : }
455 : Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}' but was removed as processing was skipped due to settings.", deadLetterBrokeredMessage.MessageId));
456 : };
457 :
458 : Task.Factory.StartNewSafely(() =>
459 : {
460 : int loop = 0;
461 : while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
462 : {
463 : lockIssues = 0;
464 : MessagingFactory factory = MessagingFactory.CreateFromConnectionString(ConnectionString);
465 : string deadLetterPath = SubscriptionClient.FormatDeadLetterPath(topicName, topicSubscriptionName);
466 : MessageReceiver client = factory.CreateMessageReceiver(deadLetterPath, ReceiveMode.PeekLock);
467 :
468 : IEnumerable<BrokeredMessage> brokeredMessages = client.ReceiveBatch(1000);
469 :
470 : foreach (BrokeredMessage brokeredMessage in brokeredMessages)
471 : {
472 : if (lockIssues > 10)
473 : break;
474 : try
475 : {
476 : Logger.LogDebug(string.Format("A dead-letter message arrived with the id '{0}'.", brokeredMessage.MessageId));
477 : string messageBody = brokeredMessage.GetBody<string>();
478 :
479 : // Closure protection
480 : BrokeredMessage message = brokeredMessage;
481 : try
482 : {
483 : AzureBusHelper.ReceiveEvent
484 : (
485 : messageBody,
486 : @event =>
487 : {
488 : bool isRequired = BusHelper.IsEventRequired(@event.GetType());
489 : if (!isRequired)
490 : removeDeadlLetterFromQueue(message);
491 : else
492 : leaveDeadlLetterInQueue(message, @event);
493 : return true;
494 : },
495 : string.Format("id '{0}'", brokeredMessage.MessageId),
496 : () =>
497 : {
498 : removeDeadlLetterFromQueue(message);
499 : },
500 : () => { }
501 : );
502 : }
503 : catch
504 : {
505 : AzureBusHelper.ReceiveCommand
506 : (
507 : messageBody,
508 : command =>
509 : {
510 : bool isRequired = BusHelper.IsEventRequired(command.GetType());
511 : if (!isRequired)
512 : removeDeadlLetterFromQueue(message);
513 : else
514 : leaveDeadlLetterInQueue(message, command);
515 : return true;
516 : },
517 : string.Format("id '{0}'", brokeredMessage.MessageId),
518 : () =>
519 : {
520 : removeDeadlLetterFromQueue(message);
521 : },
522 : () => { }
523 : );
524 : }
525 : }
526 : catch (Exception exception)
527 : {
528 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
529 : // Indicates a problem, unlock message in queue
530 : Logger.LogError(string.Format("A dead-letter message arrived with the id '{0}' but failed to be process.", brokeredMessage.MessageId), exception: exception);
531 : try
532 : {
533 : brokeredMessage.Abandon();
534 : }
535 : catch (MessageLockLostException)
536 : {
537 : lockIssues++;
538 : Logger.LogWarning(string.Format("The lock supplied for abandon for the skipped dead-letter message '{0}' is invalid.", brokeredMessage.MessageId));
539 : }
540 : }
541 : }
542 :
543 : client.Close();
544 :
545 : if (loop++ % 5 == 0)
546 : {
547 : loop = 0;
548 : Thread.Yield();
549 : }
550 : else
551 : Thread.Sleep(500);
552 : }
553 : try
554 : {
555 : brokeredMessageRenewCancellationTokenSource.Dispose();
556 : }
557 : catch (ObjectDisposedException) { }
558 : }, brokeredMessageRenewCancellationTokenSource.Token);
559 :
560 : return brokeredMessageRenewCancellationTokenSource;
561 : }
562 : }
563 : }
|