Line data Source code
1 : #region IMPORTANT NOTE
2 : // This is copied almost exactly into the eventhub except for a string difference. Replicate changes there until a refactor is done.
3 : #endregion
4 :
5 : #region Copyright
6 : // // -----------------------------------------------------------------------
7 : // // <copyright company="Chinchilla Software Limited">
8 : // // Copyright Chinchilla Software Limited. All rights reserved.
9 : // // </copyright>
10 : // // -----------------------------------------------------------------------
11 : #endregion
12 :
13 : using System;
14 : using System.Collections.Generic;
15 : using System.Diagnostics;
16 : using System.IO;
17 : using System.Linq;
18 : using System.Security.Cryptography;
19 : using System.Text;
20 : using System.Text.RegularExpressions;
21 : using System.Threading;
22 : using System.Threading.Tasks;
23 : using cdmdotnet.Logging;
24 : using Cqrs.Authentication;
25 : using Cqrs.Bus;
26 : using Cqrs.Commands;
27 : using Cqrs.Configuration;
28 : using Cqrs.Events;
29 : using Cqrs.Exceptions;
30 : using Cqrs.Messages;
31 : using Microsoft.ServiceBus.Messaging;
32 : using Newtonsoft.Json;
33 :
34 : namespace Cqrs.Azure.ServiceBus
35 : {
36 : /// <summary>
37 : /// A helper for Azure Service Bus and Event Hub.
38 : /// </summary>
39 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
40 : public class AzureBusHelper<TAuthenticationToken>
41 : : IAzureBusHelper<TAuthenticationToken>
42 1 : {
43 : /// <summary>
44 : /// Instantiates a new instance of <see cref="AzureBusHelper{TAuthenticationToken}"/>.
45 : /// </summary>
46 1 : public AzureBusHelper(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IBusHelper busHelper, IHashAlgorithmFactory hashAlgorithmFactory, IConfigurationManager configurationManager, IDependencyResolver dependencyResolver)
47 : {
48 : AuthenticationTokenHelper = authenticationTokenHelper;
49 : CorrelationIdHelper = correlationIdHelper;
50 : Logger = logger;
51 : MessageSerialiser = messageSerialiser;
52 : BusHelper = busHelper;
53 : DependencyResolver = dependencyResolver;
54 : ConfigurationManager = configurationManager;
55 : Signer = hashAlgorithmFactory;
56 : }
57 :
58 : /// <summary>
59 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
60 : /// </summary>
61 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
62 :
63 : /// <summary>
64 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
65 : /// </summary>
66 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
67 :
68 : /// <summary>
69 : /// Gets or sets the <see cref="ILogger"/>.
70 : /// </summary>
71 : protected ILogger Logger { get; private set; }
72 :
73 : /// <summary>
74 : /// Gets or sets the <see cref="IMessageSerialiser{TAuthenticationToken}"/>.
75 : /// </summary>
76 : protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
77 :
78 : /// <summary>
79 : /// Gets or sets the <see cref="IBusHelper"/>.
80 : /// </summary>
81 : protected IBusHelper BusHelper { get; private set; }
82 :
83 : /// <summary>
84 : /// Gets or sets the <see cref="IConfigurationManager"/>.
85 : /// </summary>
86 : protected IConfigurationManager ConfigurationManager { get; private set; }
87 :
88 : /// <summary>
89 : /// Gets or sets the <see cref="IDependencyResolver"/>.
90 : /// </summary>
91 : protected IDependencyResolver DependencyResolver { get; private set; }
92 :
93 : /// <summary>
94 : /// The configuration key for the default message refreshing setting as used by <see cref="IConfigurationManager"/>.
95 : /// </summary>
96 : protected const string DefaultMessagesShouldRefreshConfigurationKey = "Cqrs.Azure.Messages.ShouldRefresh";
97 :
98 : /// <summary>
99 : /// The <see cref="IHashAlgorithmFactory"/> to use to sign messages.
100 : /// </summary>
101 : protected IHashAlgorithmFactory Signer { get; private set; }
102 :
103 : /// <summary>
104 : /// Prepares a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
105 : /// </summary>
106 : /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
107 : /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
108 : /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
109 1 : public virtual void PrepareCommand<TCommand>(TCommand command, string framework)
110 : where TCommand : ICommand<TAuthenticationToken>
111 : {
112 : if (command.AuthenticationToken == null || command.AuthenticationToken.Equals(default(TAuthenticationToken)))
113 : command.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
114 : command.CorrelationId = CorrelationIdHelper.GetCorrelationId();
115 :
116 : if (string.IsNullOrWhiteSpace(command.OriginatingFramework))
117 : command.OriginatingFramework = framework;
118 : var frameworks = new List<string>();
119 : if (command.Frameworks != null)
120 : frameworks.AddRange(command.Frameworks);
121 : frameworks.Add(framework);
122 : command.Frameworks = frameworks;
123 : }
124 :
125 : /// <summary>
126 : /// Prepares and validates a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
127 : /// </summary>
128 : /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
129 : /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
130 : /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
131 1 : public virtual bool PrepareAndValidateCommand<TCommand>(TCommand command, string framework)
132 : where TCommand : ICommand<TAuthenticationToken>
133 : {
134 : Type commandType = command.GetType();
135 :
136 : if (command.Frameworks != null && command.Frameworks.Contains(framework))
137 : {
138 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
139 : if (command.Frameworks.Count() != 1)
140 : {
141 : Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
142 : return false;
143 : }
144 : }
145 :
146 : ICommandValidator<TAuthenticationToken, TCommand> commandValidator = null;
147 : try
148 : {
149 : commandValidator = DependencyResolver.Resolve<ICommandValidator<TAuthenticationToken, TCommand>>();
150 : }
151 : catch (Exception exception)
152 : {
153 : Logger.LogDebug("Locating an ICommandValidator failed.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName), exception);
154 : }
155 :
156 : if (commandValidator != null && !commandValidator.IsCommandValid(command))
157 : {
158 : Logger.LogInfo("The provided command is not valid.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
159 : return false;
160 : }
161 :
162 : PrepareCommand(command, framework);
163 : return true;
164 : }
165 :
166 : /// <summary>
167 : /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveCommandHandler"/>.
168 : /// </summary>
169 : /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
170 : /// <param name="receiveCommandHandler">The handler method that will process the <see cref="ICommand{TAuthenticationToken}"/>.</param>
171 : /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
172 : /// <param name="signature">The signature of the <see cref="IMessage"/>.</param>
173 : /// <param name="signingTokenConfigurationKey">The configuration key for the signing token as used by <see cref="IConfigurationManager"/>.</param>
174 : /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="ICommand{TAuthenticationToken}"/> is being skipped.</param>
175 : /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
176 : /// <returns>The <see cref="ICommand{TAuthenticationToken}"/> that was processed.</returns>
177 1 : public virtual ICommand<TAuthenticationToken> ReceiveCommand(string messageBody, Func<ICommand<TAuthenticationToken>, bool?> receiveCommandHandler, string messageId, string signature, string signingTokenConfigurationKey, Action skippedAction = null, Action lockRefreshAction = null)
178 : {
179 : ICommand<TAuthenticationToken> command;
180 : try
181 : {
182 : command = MessageSerialiser.DeserialiseCommand(messageBody);
183 : }
184 : catch (JsonSerializationException exception)
185 : {
186 : JsonSerializationException checkException = exception;
187 : bool safeToExit = false;
188 : do
189 : {
190 : if (checkException.Message.StartsWith("Could not load assembly"))
191 : {
192 : safeToExit = true;
193 : break;
194 : }
195 : } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
196 : if (safeToExit)
197 : {
198 : const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
199 : Match match = new Regex(pattern).Match(exception.Message);
200 : if (match.Success)
201 : {
202 : string[] typeParts = match.Value.Split(',');
203 : if (typeParts.Length == 2)
204 : {
205 : string classType = typeParts[0];
206 : bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
207 :
208 : if (!isRequired)
209 : {
210 : if (skippedAction != null)
211 : skippedAction();
212 : return null;
213 : }
214 : }
215 : }
216 : }
217 : throw;
218 : }
219 :
220 : string commandTypeName = command.GetType().FullName;
221 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
222 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
223 : string identifyMessage = null;
224 : var identifiedEvent = command as ICommandWithIdentity<TAuthenticationToken>;
225 : if (identifiedEvent != null)
226 : identifyMessage = string.Format(" for aggregate {0}", identifiedEvent.Rsn);
227 : Logger.LogInfo(string.Format("A command message arrived with the {0} was of type {1}{2}.", messageId, commandTypeName, identifyMessage));
228 :
229 : VerifySignature(signingTokenConfigurationKey, signature, "A command", messageId, commandTypeName, identifyMessage, messageBody);
230 : bool canRefresh;
231 : if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", commandTypeName), out canRefresh))
232 : canRefresh = false;
233 :
234 : if (canRefresh)
235 : {
236 : if (lockRefreshAction == null)
237 : Logger.LogWarning(string.Format("A command message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, commandTypeName));
238 : else
239 : lockRefreshAction();
240 : }
241 :
242 : // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
243 : bool? result = receiveCommandHandler(command);
244 : if (result != null && !result.Value)
245 : if (skippedAction != null)
246 : skippedAction();
247 :
248 : return command;
249 : }
250 :
251 : /// <summary>
252 : /// The default command handler that
253 : /// check if the <see cref="ICommand{TAuthenticationToken}"/> has already been processed by this framework,
254 : /// checks if the <see cref="ICommand{TAuthenticationToken}"/> is required,
255 : /// finds the handler from the provided <paramref name="routeManager"/>.
256 : /// </summary>
257 : /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to process.</param>
258 : /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="ICommandHandler{TAuthenticationToken,TCommand}"/> from.</param>
259 : /// <param name="framework">The current framework.</param>
260 : /// <returns>
261 : /// True indicates the <paramref name="command"/> was successfully handled by a handler.
262 : /// False indicates the <paramref name="command"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
263 : /// Null indicates the command<paramref name="command"/> wasn't handled as it was already handled.
264 : /// </returns>
265 1 : public virtual bool? DefaultReceiveCommand(ICommand<TAuthenticationToken> command, RouteManager routeManager, string framework)
266 : {
267 : Type commandType = command.GetType();
268 :
269 : if (command.Frameworks != null && command.Frameworks.Contains(framework))
270 : {
271 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
272 : if (command.Frameworks.Count() != 1)
273 : {
274 : Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\DefaultReceiveCommand({1})", GetType().FullName, commandType.FullName));
275 : return null;
276 : }
277 : }
278 :
279 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
280 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
281 :
282 : bool isRequired = BusHelper.IsEventRequired(commandType);
283 :
284 : RouteHandlerDelegate commandHandler = routeManager.GetSingleHandler(command, isRequired);
285 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
286 : if (commandHandler == null)
287 : {
288 : Logger.LogDebug(string.Format("The command handler for '{0}' is not required.", commandType.FullName));
289 : return false;
290 : }
291 :
292 : Action<IMessage> handler = commandHandler.Delegate;
293 : handler(command);
294 : return true;
295 : }
296 :
297 : /// <summary>
298 : /// Prepares an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
299 : /// </summary>
300 : /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
301 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
302 : /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
303 1 : public virtual void PrepareEvent<TEvent>(TEvent @event, string framework)
304 : where TEvent : IEvent<TAuthenticationToken>
305 : {
306 : if (@event.AuthenticationToken == null || @event.AuthenticationToken.Equals(default(TAuthenticationToken)))
307 : @event.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
308 : @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
309 : @event.TimeStamp = DateTimeOffset.UtcNow;
310 :
311 : if (string.IsNullOrWhiteSpace(@event.OriginatingFramework))
312 : @event.OriginatingFramework = framework;
313 : var frameworks = new List<string>();
314 : if (@event.Frameworks != null)
315 : frameworks.AddRange(@event.Frameworks);
316 : frameworks.Add(framework);
317 : @event.Frameworks = frameworks;
318 : }
319 :
320 : /// <summary>
321 : /// Prepares and validates an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
322 : /// </summary>
323 : /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
324 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
325 : /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
326 1 : public virtual bool PrepareAndValidateEvent<TEvent>(TEvent @event, string framework)
327 : where TEvent : IEvent<TAuthenticationToken>
328 : {
329 : Type eventType = @event.GetType();
330 :
331 : if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
332 : {
333 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
334 : if (@event.Frameworks.Count() != 1)
335 : {
336 : Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, eventType.FullName));
337 : return false;
338 : }
339 : }
340 :
341 : PrepareEvent(@event, framework);
342 : return true;
343 : }
344 :
345 : /// <summary>
346 : /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveEventHandler"/>.
347 : /// </summary>
348 : /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
349 : /// <param name="receiveEventHandler">The handler method that will process the <see cref="IEvent{TAuthenticationToken}"/>.</param>
350 : /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
351 : /// <param name="signature">The signature of the <see cref="IMessage"/>.</param>
352 : /// <param name="signingTokenConfigurationKey">The configuration key for the signing token as used by <see cref="IConfigurationManager"/>.</param>
353 : /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="IEvent{TAuthenticationToken}"/> is being skipped.</param>
354 : /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
355 : /// <returns>The <see cref="IEvent{TAuthenticationToken}"/> that was processed.</returns>
356 1 : public virtual IEvent<TAuthenticationToken> ReceiveEvent(string messageBody, Func<IEvent<TAuthenticationToken>, bool?> receiveEventHandler, string messageId, string signature, string signingTokenConfigurationKey, Action skippedAction = null, Action lockRefreshAction = null)
357 : {
358 : IEvent<TAuthenticationToken> @event;
359 : try
360 : {
361 : @event = MessageSerialiser.DeserialiseEvent(messageBody);
362 : }
363 : catch (JsonSerializationException exception)
364 : {
365 : JsonSerializationException checkException = exception;
366 : bool safeToExit = false;
367 : do
368 : {
369 : if (checkException.Message.StartsWith("Could not load assembly"))
370 : {
371 : safeToExit = true;
372 : break;
373 : }
374 : } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
375 : if (safeToExit)
376 : {
377 : const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
378 : Match match = new Regex(pattern).Match(exception.Message);
379 : if (match.Success)
380 : {
381 : string[] typeParts = match.Value.Split(',');
382 : if (typeParts.Length == 2)
383 : {
384 : string classType = typeParts[0];
385 : bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
386 :
387 : if (!isRequired)
388 : {
389 : if (skippedAction != null)
390 : skippedAction();
391 : return null;
392 : }
393 : }
394 : }
395 : }
396 : throw;
397 : }
398 :
399 : string eventTypeName = @event.GetType().FullName;
400 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
401 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
402 : object identifyMessage = null;
403 : var identifiedEvent = @event as IEventWithIdentity<TAuthenticationToken>;
404 : if (identifiedEvent != null)
405 : identifyMessage = string.Format(" for aggregate {0}", identifiedEvent.Rsn);
406 : Logger.LogInfo(string.Format("An event message arrived with the {0} was of type {1}{2}.", messageId, eventTypeName, identifyMessage));
407 :
408 : VerifySignature(signingTokenConfigurationKey, signature, "An event", messageId, eventTypeName, identifyMessage, messageBody);
409 : bool canRefresh;
410 : if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", eventTypeName), out canRefresh))
411 : if (!ConfigurationManager.TryGetSetting(DefaultMessagesShouldRefreshConfigurationKey, out canRefresh))
412 : canRefresh = false;
413 :
414 : if (canRefresh)
415 : {
416 : if (lockRefreshAction == null)
417 : Logger.LogWarning(string.Format("An event message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, eventTypeName));
418 : else
419 : lockRefreshAction();
420 : }
421 :
422 : // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
423 : bool? result = receiveEventHandler(@event);
424 : if (result != null && !result.Value)
425 : if (skippedAction != null)
426 : skippedAction();
427 :
428 : return @event;
429 : }
430 :
431 : /// <summary>
432 : /// Refreshes the network lock.
433 : /// </summary>
434 1 : public virtual void RefreshLock(CancellationTokenSource brokeredMessageRenewCancellationTokenSource, BrokeredMessage message, string type = "message")
435 : {
436 : Task.Factory.StartNewSafely(() =>
437 : {
438 : // The capturing of ObjectDisposedException is because even the properties can throw it.
439 : try
440 : {
441 : object value;
442 : string typeName = null;
443 : if (message.Properties.TryGetValue("Type", out value))
444 : typeName = value.ToString();
445 :
446 : long loop = long.MinValue;
447 : while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
448 : {
449 : // Based on LockedUntilUtc property to determine if the lock expires soon
450 : // We lock for 45 seconds to ensure any thread based issues are mitigated.
451 : if (DateTime.UtcNow > message.LockedUntilUtc.AddSeconds(-45))
452 : {
453 : // If so, renew the lock
454 : for (int i = 0; i < 10; i++)
455 : {
456 : try
457 : {
458 : if (brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
459 : return;
460 : message.RenewLock();
461 : try
462 : {
463 : Logger.LogDebug(string.Format("Renewed the {2} lock on {1} '{0}'.", message.MessageId, type, typeName));
464 : }
465 : catch
466 : {
467 : Trace.TraceError("Renewed the {2} lock on {1} '{0}'.", message.MessageId, type, typeName);
468 : }
469 :
470 : break;
471 : }
472 : catch (ObjectDisposedException)
473 : {
474 : return;
475 : }
476 : catch (MessageLockLostException exception)
477 : {
478 : try
479 : {
480 : Logger.LogWarning(string.Format("Renewing the {2} lock on {1} '{0}' failed as the message lock was lost.", message.MessageId, type, typeName), exception: exception);
481 : }
482 : catch
483 : {
484 : Trace.TraceError("Renewing the {2} lock on {1} '{0}' failed as the message lock was lost.\r\n{3}", message.MessageId, type, typeName, exception.Message);
485 : }
486 : return;
487 : }
488 : catch (Exception exception)
489 : {
490 : try
491 : {
492 : Logger.LogWarning(string.Format("Renewing the {2} lock on {1} '{0}' failed.", message.MessageId, type, typeName), exception: exception);
493 : }
494 : catch
495 : {
496 : Trace.TraceError("Renewing the {2} lock on {1} '{0}' failed.\r\n{3}", message.MessageId, type, typeName, exception.Message);
497 : }
498 : if (i == 9)
499 : return;
500 : }
501 : }
502 : }
503 :
504 : if (loop++ % 5 == 0)
505 : Thread.Yield();
506 : else
507 : Thread.Sleep(500);
508 : if (loop == long.MaxValue)
509 : loop = long.MinValue;
510 : }
511 : try
512 : {
513 : brokeredMessageRenewCancellationTokenSource.Dispose();
514 : }
515 : catch (ObjectDisposedException) { }
516 : }
517 : catch (ObjectDisposedException) { }
518 : }, brokeredMessageRenewCancellationTokenSource.Token);
519 : }
520 :
521 : /// <summary>
522 : /// The default event handler that
523 : /// check if the <see cref="IEvent{TAuthenticationToken}"/> has already been processed by this framework,
524 : /// checks if the <see cref="IEvent{TAuthenticationToken}"/> is required,
525 : /// finds the handler from the provided <paramref name="routeManager"/>.
526 : /// </summary>
527 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to process.</param>
528 : /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="IEventHandler{TAuthenticationToken,TCommand}"/> from.</param>
529 : /// <param name="framework">The current framework.</param>
530 : /// <returns>
531 : /// True indicates the <paramref name="event"/> was successfully handled by a handler.
532 : /// False indicates the <paramref name="event"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
533 : /// Null indicates the <paramref name="event"/> wasn't handled as it was already handled.
534 : /// </returns>
535 1 : public virtual bool? DefaultReceiveEvent(IEvent<TAuthenticationToken> @event, RouteManager routeManager, string framework)
536 : {
537 : Type eventType = @event.GetType();
538 :
539 : if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
540 : {
541 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
542 : if (@event.Frameworks.Count() != 1)
543 : {
544 : Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\DefaultReceiveEvent({1})", GetType().FullName, eventType.FullName));
545 : return null;
546 : }
547 : }
548 :
549 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
550 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
551 :
552 : bool isRequired = BusHelper.IsEventRequired(eventType);
553 :
554 : IEnumerable<Action<IMessage>> handlers = routeManager.GetHandlers(@event, isRequired).Select(x => x.Delegate).ToList();
555 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
556 : if (!handlers.Any())
557 : {
558 : Logger.LogDebug(string.Format("The event handler for '{0}' is not required.", eventType.FullName));
559 : return false;
560 : }
561 :
562 : foreach (Action<IMessage> handler in handlers)
563 : handler(@event);
564 : return true;
565 : }
566 :
567 : /// <summary>
568 : /// Verifies that the signature is authorised.
569 : /// </summary>
570 1 : protected virtual void VerifySignature(string signingTokenConfigurationKey, string signature, string messagetype, string messageId, string typeName, object identifyMessage, string messageBody)
571 : {
572 : if (string.IsNullOrWhiteSpace(signature))
573 : Logger.LogWarning(string.Format("{3} message arrived with the {0} was of type {1}{2} and had no signature.", messageId, typeName, identifyMessage, messagetype));
574 : else
575 : {
576 : bool messageIsValid = false;
577 : // see https://github.com/Chinchilla-Software-Com/CQRS/wiki/Inter-process-function-security</remarks>
578 : string configurationKey = string.Format("{0}.SigningToken", typeName);
579 : string signingToken;
580 : HashAlgorithm signer = Signer.Create();
581 : if (ConfigurationManager.TryGetSetting(configurationKey, out signingToken) && !string.IsNullOrWhiteSpace(signingToken))
582 : using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", signingToken, messageBody))))
583 : messageIsValid = signature == Convert.ToBase64String(signer.ComputeHash(hashStream));
584 : if (!messageIsValid && ConfigurationManager.TryGetSetting(signingTokenConfigurationKey, out signingToken) && !string.IsNullOrWhiteSpace(signingToken))
585 : using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", signingToken, messageBody))))
586 : messageIsValid = signature == Convert.ToBase64String(signer.ComputeHash(hashStream));
587 : if (!messageIsValid)
588 : using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", Guid.Empty.ToString("N"), messageBody))))
589 : messageIsValid = signature == Convert.ToBase64String(signer.ComputeHash(hashStream));
590 : if (!messageIsValid)
591 : throw new UnAuthorisedMessageReceivedException(typeName, messageId, identifyMessage);
592 : }
593 : }
594 :
595 : /// <summary>
596 : /// Manually registers the provided <paramref name="handler"/>
597 : /// on the provided <paramref name="routeManger"/>
598 : /// </summary>
599 : /// <typeparam name="TMessage">The <see cref="Type"/> of <see cref="IMessage"/> the <paramref name="handler"/> can handle.</typeparam>
600 1 : public virtual void RegisterHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
601 : where TMessage : IMessage
602 : {
603 : Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
604 :
605 : routeManger.RegisterHandler(registerableHandler, targetedType);
606 :
607 : telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
608 : telemetryHelper.Flush();
609 : }
610 :
611 : /// <summary>
612 : /// Register an event handler that will listen and respond to all events.
613 : /// </summary>
614 1 : public virtual void RegisterGlobalEventHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler, bool holdMessageLock = true)
615 : where TMessage : IMessage
616 : {
617 : Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
618 :
619 : routeManger.RegisterGlobalEventHandler(registerableHandler);
620 :
621 : telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterGlobalEventHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
622 : telemetryHelper.Flush();
623 : }
624 : }
625 : }
|