Line data Source code
1 : #region IMPORTANT NOTE
2 : // This is copied almost exactly into the eventhub except for a string difference. Replicate changes there until a refactor is done.
3 : #endregion
4 :
5 : #region Copyright
6 : // // -----------------------------------------------------------------------
7 : // // <copyright company="Chinchilla Software Limited">
8 : // // Copyright Chinchilla Software Limited. All rights reserved.
9 : // // </copyright>
10 : // // -----------------------------------------------------------------------
11 : #endregion
12 :
13 : using System;
14 : using System.Collections.Generic;
15 : using System.Diagnostics;
16 : using System.IO;
17 : using System.Linq;
18 : using System.Security.Cryptography;
19 : using System.Text;
20 : using System.Text.RegularExpressions;
21 : using System.Threading;
22 : using System.Threading.Tasks;
23 : using Chinchilla.Logging;
24 : using Cqrs.Authentication;
25 : using Cqrs.Bus;
26 : using Cqrs.Commands;
27 : using Cqrs.Configuration;
28 : using Cqrs.Events;
29 : using Cqrs.Exceptions;
30 : using Cqrs.Messages;
31 : #if NET452
32 : using Microsoft.ServiceBus.Messaging;
33 : #endif
34 : #if NETCOREAPP3_0
35 : using Microsoft.Azure.ServiceBus;
36 : using Microsoft.Azure.ServiceBus.Core;
37 : using BrokeredMessage = Microsoft.Azure.ServiceBus.Message;
38 : #endif
39 : using Newtonsoft.Json;
40 :
41 : namespace Cqrs.Azure.ServiceBus
42 : {
43 : /// <summary>
44 : /// A helper for Azure Service Bus and Event Hub.
45 : /// </summary>
46 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
47 : public class AzureBusHelper<TAuthenticationToken>
48 : : IAzureBusHelper<TAuthenticationToken>
49 1 : {
50 : /// <summary>
51 : /// Instantiates a new instance of <see cref="AzureBusHelper{TAuthenticationToken}"/>.
52 : /// </summary>
53 1 : public AzureBusHelper(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IBusHelper busHelper, IHashAlgorithmFactory hashAlgorithmFactory, IConfigurationManager configurationManager, IDependencyResolver dependencyResolver)
54 : {
55 : AuthenticationTokenHelper = authenticationTokenHelper;
56 : CorrelationIdHelper = correlationIdHelper;
57 : Logger = logger;
58 : MessageSerialiser = messageSerialiser;
59 : BusHelper = busHelper;
60 : DependencyResolver = dependencyResolver;
61 : ConfigurationManager = configurationManager;
62 : Signer = hashAlgorithmFactory;
63 : }
64 :
65 : /// <summary>
66 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
67 : /// </summary>
68 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
69 :
70 : /// <summary>
71 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
72 : /// </summary>
73 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
74 :
75 : /// <summary>
76 : /// Gets or sets the <see cref="ILogger"/>.
77 : /// </summary>
78 : protected ILogger Logger { get; private set; }
79 :
80 : /// <summary>
81 : /// Gets or sets the <see cref="IMessageSerialiser{TAuthenticationToken}"/>.
82 : /// </summary>
83 : protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
84 :
85 : /// <summary>
86 : /// Gets or sets the <see cref="IBusHelper"/>.
87 : /// </summary>
88 : protected IBusHelper BusHelper { get; private set; }
89 :
90 : /// <summary>
91 : /// Gets or sets the <see cref="IConfigurationManager"/>.
92 : /// </summary>
93 : protected IConfigurationManager ConfigurationManager { get; private set; }
94 :
95 : /// <summary>
96 : /// Gets or sets the <see cref="IDependencyResolver"/>.
97 : /// </summary>
98 : protected IDependencyResolver DependencyResolver { get; private set; }
99 :
100 : /// <summary>
101 : /// The configuration key for the default message refreshing setting as used by <see cref="IConfigurationManager"/>.
102 : /// </summary>
103 : protected const string DefaultMessagesShouldRefreshConfigurationKey = "Cqrs.Azure.Messages.ShouldRefresh";
104 :
105 : /// <summary>
106 : /// The <see cref="IHashAlgorithmFactory"/> to use to sign messages.
107 : /// </summary>
108 : protected IHashAlgorithmFactory Signer { get; private set; }
109 :
110 : /// <summary>
111 : /// Prepares a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
112 : /// </summary>
113 : /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
114 : /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
115 : /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
116 1 : public virtual void PrepareCommand<TCommand>(TCommand command, string framework)
117 : where TCommand : ICommand<TAuthenticationToken>
118 : {
119 : if (command.AuthenticationToken == null || command.AuthenticationToken.Equals(default(TAuthenticationToken)))
120 : command.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
121 : command.CorrelationId = CorrelationIdHelper.GetCorrelationId();
122 :
123 : if (string.IsNullOrWhiteSpace(command.OriginatingFramework))
124 : command.OriginatingFramework = framework;
125 : var frameworks = new List<string>();
126 : if (command.Frameworks != null)
127 : frameworks.AddRange(command.Frameworks);
128 : frameworks.Add(framework);
129 : command.Frameworks = frameworks;
130 : }
131 :
132 : /// <summary>
133 : /// Prepares and validates a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
134 : /// </summary>
135 : /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
136 : /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
137 : /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
138 1 : public virtual bool PrepareAndValidateCommand<TCommand>(TCommand command, string framework)
139 : where TCommand : ICommand<TAuthenticationToken>
140 : {
141 : Type commandType = command.GetType();
142 :
143 : if (command.Frameworks != null && command.Frameworks.Contains(framework))
144 : {
145 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
146 : if (command.Frameworks.Count() != 1)
147 : {
148 : Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
149 : return false;
150 : }
151 : }
152 :
153 : ICommandValidator<TAuthenticationToken, TCommand> commandValidator = null;
154 : try
155 : {
156 : commandValidator = DependencyResolver.Resolve<ICommandValidator<TAuthenticationToken, TCommand>>();
157 : }
158 : catch (Exception exception)
159 : {
160 : Logger.LogDebug("Locating an ICommandValidator failed.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName), exception);
161 : }
162 :
163 : if (commandValidator != null && !commandValidator.IsCommandValid(command))
164 : {
165 : Logger.LogInfo("The provided command is not valid.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
166 : return false;
167 : }
168 :
169 : PrepareCommand(command, framework);
170 : return true;
171 : }
172 :
173 : /// <summary>
174 : /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveCommandHandler"/>.
175 : /// </summary>
176 : /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
177 : /// <param name="receiveCommandHandler">The handler method that will process the <see cref="ICommand{TAuthenticationToken}"/>.</param>
178 : /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
179 : /// <param name="signature">The signature of the <see cref="IMessage"/>.</param>
180 : /// <param name="signingTokenConfigurationKey">The configuration key for the signing token as used by <see cref="IConfigurationManager"/>.</param>
181 : /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="ICommand{TAuthenticationToken}"/> is being skipped.</param>
182 : /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
183 : /// <returns>The <see cref="ICommand{TAuthenticationToken}"/> that was processed.</returns>
184 1 : public virtual ICommand<TAuthenticationToken> ReceiveCommand(string messageBody, Func<ICommand<TAuthenticationToken>, bool?> receiveCommandHandler, string messageId, string signature, string signingTokenConfigurationKey, Action skippedAction = null, Action lockRefreshAction = null)
185 : {
186 : ICommand<TAuthenticationToken> command;
187 : try
188 : {
189 : command = MessageSerialiser.DeserialiseCommand(messageBody);
190 : }
191 : catch (JsonSerializationException exception)
192 : {
193 : JsonSerializationException checkException = exception;
194 : bool safeToExit = false;
195 : do
196 : {
197 : if (checkException.Message.StartsWith("Could not load assembly"))
198 : {
199 : safeToExit = true;
200 : break;
201 : }
202 : } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
203 : if (safeToExit)
204 : {
205 : const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
206 : Match match = new Regex(pattern).Match(exception.Message);
207 : if (match.Success)
208 : {
209 : string[] typeParts = match.Value.Split(',');
210 : if (typeParts.Length == 2)
211 : {
212 : string classType = typeParts[0];
213 : bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
214 :
215 : if (!isRequired)
216 : {
217 : if (skippedAction != null)
218 : skippedAction();
219 : return null;
220 : }
221 : }
222 : }
223 : }
224 : throw;
225 : }
226 :
227 : string commandTypeName = command.GetType().FullName;
228 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
229 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
230 : string identifyMessage = null;
231 : var identifiedEvent = command as ICommandWithIdentity<TAuthenticationToken>;
232 : if (identifiedEvent != null)
233 : identifyMessage = string.Format(" for aggregate {0}", identifiedEvent.Rsn);
234 : Logger.LogInfo(string.Format("A command message arrived with the {0} was of type {1}{2}.", messageId, commandTypeName, identifyMessage));
235 :
236 : VerifySignature(signingTokenConfigurationKey, signature, "A command", messageId, commandTypeName, identifyMessage, messageBody);
237 : bool canRefresh;
238 : if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", commandTypeName), out canRefresh))
239 : canRefresh = false;
240 :
241 : if (canRefresh)
242 : {
243 : if (lockRefreshAction == null)
244 : Logger.LogWarning(string.Format("A command message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, commandTypeName));
245 : else
246 : lockRefreshAction();
247 : }
248 :
249 : // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
250 : bool? result = receiveCommandHandler(command);
251 : if (result != null && !result.Value)
252 : if (skippedAction != null)
253 : skippedAction();
254 :
255 : return command;
256 : }
257 :
258 : /// <summary>
259 : /// The default command handler that
260 : /// check if the <see cref="ICommand{TAuthenticationToken}"/> has already been processed by this framework,
261 : /// checks if the <see cref="ICommand{TAuthenticationToken}"/> is required,
262 : /// finds the handler from the provided <paramref name="routeManager"/>.
263 : /// </summary>
264 : /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to process.</param>
265 : /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="ICommandHandler{TAuthenticationToken,TCommand}"/> from.</param>
266 : /// <param name="framework">The current framework.</param>
267 : /// <returns>
268 : /// True indicates the <paramref name="command"/> was successfully handled by a handler.
269 : /// False indicates the <paramref name="command"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
270 : /// Null indicates the command<paramref name="command"/> wasn't handled as it was already handled.
271 : /// </returns>
272 1 : public virtual bool? DefaultReceiveCommand(ICommand<TAuthenticationToken> command, RouteManager routeManager, string framework)
273 : {
274 : Type commandType = command.GetType();
275 :
276 : if (command.Frameworks != null && command.Frameworks.Contains(framework))
277 : {
278 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
279 : if (command.Frameworks.Count() != 1)
280 : {
281 : Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\DefaultReceiveCommand({1})", GetType().FullName, commandType.FullName));
282 : return null;
283 : }
284 : }
285 :
286 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
287 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
288 :
289 : bool isRequired = BusHelper.IsEventRequired(commandType);
290 :
291 : RouteHandlerDelegate commandHandler = routeManager.GetSingleHandler(command, isRequired);
292 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
293 : if (commandHandler == null)
294 : {
295 : Logger.LogDebug(string.Format("The command handler for '{0}' is not required.", commandType.FullName));
296 : return false;
297 : }
298 :
299 : Action<IMessage> handler = commandHandler.Delegate;
300 : handler(command);
301 : return true;
302 : }
303 :
304 : /// <summary>
305 : /// Prepares an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
306 : /// </summary>
307 : /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
308 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
309 : /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
310 1 : public virtual void PrepareEvent<TEvent>(TEvent @event, string framework)
311 : where TEvent : IEvent<TAuthenticationToken>
312 : {
313 : if (@event.AuthenticationToken == null || @event.AuthenticationToken.Equals(default(TAuthenticationToken)))
314 : @event.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
315 : @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
316 : @event.TimeStamp = DateTimeOffset.UtcNow;
317 :
318 : if (string.IsNullOrWhiteSpace(@event.OriginatingFramework))
319 : @event.OriginatingFramework = framework;
320 : var frameworks = new List<string>();
321 : if (@event.Frameworks != null)
322 : frameworks.AddRange(@event.Frameworks);
323 : frameworks.Add(framework);
324 : @event.Frameworks = frameworks;
325 : }
326 :
327 : /// <summary>
328 : /// Prepares and validates an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
329 : /// </summary>
330 : /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
331 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
332 : /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
333 1 : public virtual bool PrepareAndValidateEvent<TEvent>(TEvent @event, string framework)
334 : where TEvent : IEvent<TAuthenticationToken>
335 : {
336 : Type eventType = @event.GetType();
337 :
338 : if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
339 : {
340 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
341 : if (@event.Frameworks.Count() != 1)
342 : {
343 : Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, eventType.FullName));
344 : return false;
345 : }
346 : }
347 :
348 : PrepareEvent(@event, framework);
349 : return true;
350 : }
351 :
352 : /// <summary>
353 : /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveEventHandler"/>.
354 : /// </summary>
355 : /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
356 : /// <param name="receiveEventHandler">The handler method that will process the <see cref="IEvent{TAuthenticationToken}"/>.</param>
357 : /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
358 : /// <param name="signature">The signature of the <see cref="IMessage"/>.</param>
359 : /// <param name="signingTokenConfigurationKey">The configuration key for the signing token as used by <see cref="IConfigurationManager"/>.</param>
360 : /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="IEvent{TAuthenticationToken}"/> is being skipped.</param>
361 : /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
362 : /// <returns>The <see cref="IEvent{TAuthenticationToken}"/> that was processed.</returns>
363 1 : public virtual IEvent<TAuthenticationToken> ReceiveEvent(string messageBody, Func<IEvent<TAuthenticationToken>, bool?> receiveEventHandler, string messageId, string signature, string signingTokenConfigurationKey, Action skippedAction = null, Action lockRefreshAction = null)
364 : {
365 : IEvent<TAuthenticationToken> @event;
366 : try
367 : {
368 : @event = MessageSerialiser.DeserialiseEvent(messageBody);
369 : }
370 : catch (JsonSerializationException exception)
371 : {
372 : JsonSerializationException checkException = exception;
373 : bool safeToExit = false;
374 : do
375 : {
376 : if (checkException.Message.StartsWith("Could not load assembly"))
377 : {
378 : safeToExit = true;
379 : break;
380 : }
381 : } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
382 : if (safeToExit)
383 : {
384 : const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
385 : Match match = new Regex(pattern).Match(exception.Message);
386 : if (match.Success)
387 : {
388 : string[] typeParts = match.Value.Split(',');
389 : if (typeParts.Length == 2)
390 : {
391 : string classType = typeParts[0];
392 : bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
393 :
394 : if (!isRequired)
395 : {
396 : if (skippedAction != null)
397 : skippedAction();
398 : return null;
399 : }
400 : }
401 : }
402 : }
403 : throw;
404 : }
405 :
406 : string eventTypeName = @event.GetType().FullName;
407 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
408 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
409 : object identifyMessage = null;
410 : var identifiedEvent = @event as IEventWithIdentity<TAuthenticationToken>;
411 : if (identifiedEvent != null)
412 : identifyMessage = string.Format(" for aggregate {0}", identifiedEvent.Rsn);
413 : Logger.LogInfo(string.Format("An event message arrived with the {0} was of type {1}{2}.", messageId, eventTypeName, identifyMessage));
414 :
415 : VerifySignature(signingTokenConfigurationKey, signature, "An event", messageId, eventTypeName, identifyMessage, messageBody);
416 : bool canRefresh;
417 : if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", eventTypeName), out canRefresh))
418 : if (!ConfigurationManager.TryGetSetting(DefaultMessagesShouldRefreshConfigurationKey, out canRefresh))
419 : canRefresh = false;
420 :
421 : if (canRefresh)
422 : {
423 : if (lockRefreshAction == null)
424 : Logger.LogWarning(string.Format("An event message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, eventTypeName));
425 : else
426 : lockRefreshAction();
427 : }
428 :
429 : // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
430 : bool? result = receiveEventHandler(@event);
431 : if (result != null && !result.Value)
432 : if (skippedAction != null)
433 : skippedAction();
434 :
435 : return @event;
436 : }
437 :
438 : /// <summary>
439 : /// Refreshes the network lock.
440 : /// </summary>
441 : #if NET452
442 : public virtual void RefreshLock(CancellationTokenSource brokeredMessageRenewCancellationTokenSource, BrokeredMessage message, string type = "message")
443 : #endif
444 : #if NETCOREAPP3_0
445 : public virtual void RefreshLock(IMessageReceiver client, CancellationTokenSource brokeredMessageRenewCancellationTokenSource, BrokeredMessage message, string type = "message")
446 : #endif
447 : {
448 : Task.Factory.StartNewSafely(() =>
449 : {
450 : // The capturing of ObjectDisposedException is because even the properties can throw it.
451 : try
452 : {
453 : object value;
454 : string typeName = null;
455 : if (message.TryGetUserPropertyValue("Type", out value))
456 : typeName = value.ToString();
457 :
458 : long loop = long.MinValue;
459 : while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
460 : {
461 : // Based on LockedUntilUtc property to determine if the lock expires soon
462 : // We lock for 45 seconds to ensure any thread based issues are mitigated.
463 : #if NET452
464 : if (DateTime.UtcNow > message.LockedUntilUtc.AddSeconds(-45))
465 : #endif
466 : #if NETCOREAPP3_0
467 : if (DateTime.UtcNow > message.ExpiresAtUtc.AddSeconds(-45))
468 : #endif
469 : {
470 : // If so, renew the lock
471 : for (int i = 0; i < 10; i++)
472 : {
473 : try
474 : {
475 : if (brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
476 : return;
477 : #if NET452
478 : message.RenewLock();
479 : #endif
480 : #if NETCOREAPP3_0
481 : client.RenewLockAsync(message).Wait(1500);
482 : #endif
483 : try
484 : {
485 : Logger.LogDebug(string.Format("Renewed the {2} lock on {1} '{0}'.", message.MessageId, type, typeName));
486 : }
487 : catch
488 : {
489 : Trace.TraceError("Renewed the {2} lock on {1} '{0}'.", message.MessageId, type, typeName);
490 : }
491 :
492 : break;
493 : }
494 : catch (ObjectDisposedException)
495 : {
496 : return;
497 : }
498 : catch (MessageLockLostException exception)
499 : {
500 : try
501 : {
502 : Logger.LogWarning(string.Format("Renewing the {2} lock on {1} '{0}' failed as the message lock was lost.", message.MessageId, type, typeName), exception: exception);
503 : }
504 : catch
505 : {
506 : Trace.TraceError("Renewing the {2} lock on {1} '{0}' failed as the message lock was lost.\r\n{3}", message.MessageId, type, typeName, exception.Message);
507 : }
508 : return;
509 : }
510 : catch (Exception exception)
511 : {
512 : try
513 : {
514 : Logger.LogWarning(string.Format("Renewing the {2} lock on {1} '{0}' failed.", message.MessageId, type, typeName), exception: exception);
515 : }
516 : catch
517 : {
518 : Trace.TraceError("Renewing the {2} lock on {1} '{0}' failed.\r\n{3}", message.MessageId, type, typeName, exception.Message);
519 : }
520 : if (i == 9)
521 : return;
522 : }
523 : }
524 : }
525 :
526 : if (loop++ % 5 == 0)
527 : Thread.Yield();
528 : else
529 : Thread.Sleep(500);
530 : if (loop == long.MaxValue)
531 : loop = long.MinValue;
532 : }
533 : try
534 : {
535 : brokeredMessageRenewCancellationTokenSource.Dispose();
536 : }
537 : catch (ObjectDisposedException) { }
538 : }
539 : catch (ObjectDisposedException) { }
540 : }, brokeredMessageRenewCancellationTokenSource.Token);
541 : }
542 :
543 : /// <summary>
544 : /// The default event handler that
545 : /// check if the <see cref="IEvent{TAuthenticationToken}"/> has already been processed by this framework,
546 : /// checks if the <see cref="IEvent{TAuthenticationToken}"/> is required,
547 : /// finds the handler from the provided <paramref name="routeManager"/>.
548 : /// </summary>
549 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to process.</param>
550 : /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="IEventHandler{TAuthenticationToken,TCommand}"/> from.</param>
551 : /// <param name="framework">The current framework.</param>
552 : /// <returns>
553 : /// True indicates the <paramref name="event"/> was successfully handled by a handler.
554 : /// False indicates the <paramref name="event"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
555 : /// Null indicates the <paramref name="event"/> wasn't handled as it was already handled.
556 : /// </returns>
557 1 : public virtual bool? DefaultReceiveEvent(IEvent<TAuthenticationToken> @event, RouteManager routeManager, string framework)
558 : {
559 : Type eventType = @event.GetType();
560 :
561 : if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
562 : {
563 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
564 : if (@event.Frameworks.Count() != 1)
565 : {
566 : Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\DefaultReceiveEvent({1})", GetType().FullName, eventType.FullName));
567 : return null;
568 : }
569 : }
570 :
571 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
572 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
573 :
574 : bool isRequired = BusHelper.IsEventRequired(eventType);
575 :
576 : IEnumerable<Action<IMessage>> handlers = routeManager.GetHandlers(@event, isRequired).Select(x => x.Delegate).ToList();
577 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
578 : if (!handlers.Any())
579 : {
580 : Logger.LogDebug(string.Format("The event handler for '{0}' is not required.", eventType.FullName));
581 : return false;
582 : }
583 :
584 : foreach (Action<IMessage> handler in handlers)
585 : handler(@event);
586 : return true;
587 : }
588 :
589 : /// <summary>
590 : /// Verifies that the signature is authorised.
591 : /// </summary>
592 1 : protected virtual void VerifySignature(string signingTokenConfigurationKey, string signature, string messagetype, string messageId, string typeName, object identifyMessage, string messageBody)
593 : {
594 : if (string.IsNullOrWhiteSpace(signature))
595 : Logger.LogWarning(string.Format("{3} message arrived with the {0} was of type {1}{2} and had no signature.", messageId, typeName, identifyMessage, messagetype));
596 : else
597 : {
598 : bool messageIsValid = false;
599 : // see https://github.com/Chinchilla-Software-Com/CQRS/wiki/Inter-process-function-security</remarks>
600 : string configurationKey = string.Format("{0}.SigningToken", typeName);
601 : string signingToken;
602 : HashAlgorithm signer = Signer.Create();
603 : if (ConfigurationManager.TryGetSetting(configurationKey, out signingToken) && !string.IsNullOrWhiteSpace(signingToken))
604 : using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", signingToken, messageBody))))
605 : messageIsValid = signature == Convert.ToBase64String(signer.ComputeHash(hashStream));
606 : if (!messageIsValid && ConfigurationManager.TryGetSetting(signingTokenConfigurationKey, out signingToken) && !string.IsNullOrWhiteSpace(signingToken))
607 : using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", signingToken, messageBody))))
608 : messageIsValid = signature == Convert.ToBase64String(signer.ComputeHash(hashStream));
609 : if (!messageIsValid)
610 : using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", Guid.Empty.ToString("N"), messageBody))))
611 : messageIsValid = signature == Convert.ToBase64String(signer.ComputeHash(hashStream));
612 : if (!messageIsValid)
613 : throw new UnAuthorisedMessageReceivedException(typeName, messageId, identifyMessage);
614 : }
615 : }
616 :
617 : /// <summary>
618 : /// Manually registers the provided <paramref name="handler"/>
619 : /// on the provided <paramref name="routeManger"/>
620 : /// </summary>
621 : /// <typeparam name="TMessage">The <see cref="Type"/> of <see cref="IMessage"/> the <paramref name="handler"/> can handle.</typeparam>
622 1 : public virtual void RegisterHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
623 : where TMessage : IMessage
624 : {
625 : Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
626 :
627 : routeManger.RegisterHandler(registerableHandler, targetedType);
628 :
629 : telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
630 : telemetryHelper.Flush();
631 : }
632 :
633 : /// <summary>
634 : /// Register an event handler that will listen and respond to all events.
635 : /// </summary>
636 1 : public virtual void RegisterGlobalEventHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler, bool holdMessageLock = true)
637 : where TMessage : IMessage
638 : {
639 : Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
640 :
641 : routeManger.RegisterGlobalEventHandler(registerableHandler);
642 :
643 : telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterGlobalEventHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
644 : telemetryHelper.Flush();
645 : }
646 : }
647 : }
|