Line data Source code
1 : #region IMPORTANT NOTE
2 : // This is copied almost exactly into the servicebus except for a string difference. Replicate changes there until a refactor is done.
3 : #endregion
4 :
5 : #region Copyright
6 : // // -----------------------------------------------------------------------
7 : // // <copyright company="Chinchilla Software Limited">
8 : // // Copyright Chinchilla Software Limited. All rights reserved.
9 : // // </copyright>
10 : // // -----------------------------------------------------------------------
11 : #endregion
12 :
13 : using System;
14 : using System.Collections.Generic;
15 : using System.Diagnostics;
16 : using System.Linq;
17 : using System.Text.RegularExpressions;
18 : using System.Threading;
19 : using System.Threading.Tasks;
20 : using cdmdotnet.Logging;
21 : using Cqrs.Authentication;
22 : using Cqrs.Bus;
23 : using Cqrs.Commands;
24 : using Cqrs.Configuration;
25 : using Cqrs.Events;
26 : using Cqrs.Messages;
27 : using Microsoft.ServiceBus.Messaging;
28 : using Newtonsoft.Json;
29 :
30 : namespace Cqrs.Azure.ServiceBus
31 : {
32 : /// <summary>
33 : /// A helper for Azure Service Bus and Event Hub.
34 : /// </summary>
35 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
36 : public class AzureBusHelper<TAuthenticationToken> : IAzureBusHelper<TAuthenticationToken>
37 : {
38 : /// <summary>
39 : /// Instantiates a new instance of <see cref="AzureBusHelper{TAuthenticationToken}"/>.
40 : /// </summary>
41 2 : public AzureBusHelper(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IBusHelper busHelper, IConfigurationManager configurationManager, IDependencyResolver dependencyResolver)
42 : {
43 : AuthenticationTokenHelper = authenticationTokenHelper;
44 : CorrelationIdHelper = correlationIdHelper;
45 : Logger = logger;
46 : MessageSerialiser = messageSerialiser;
47 : BusHelper = busHelper;
48 : DependencyResolver = dependencyResolver;
49 : ConfigurationManager = configurationManager;
50 : }
51 :
52 : /// <summary>
53 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
54 : /// </summary>
55 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
56 :
57 : /// <summary>
58 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
59 : /// </summary>
60 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
61 :
62 : /// <summary>
63 : /// Gets or sets the <see cref="ILogger"/>.
64 : /// </summary>
65 : protected ILogger Logger { get; private set; }
66 :
67 : /// <summary>
68 : /// Gets or sets the <see cref="IMessageSerialiser{TAuthenticationToken}"/>.
69 : /// </summary>
70 : protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
71 :
72 : /// <summary>
73 : /// Gets or sets the <see cref="IBusHelper"/>.
74 : /// </summary>
75 : protected IBusHelper BusHelper { get; private set; }
76 :
77 : /// <summary>
78 : /// Gets or sets the <see cref="IConfigurationManager"/>.
79 : /// </summary>
80 : protected IConfigurationManager ConfigurationManager { get; private set; }
81 :
82 : /// <summary>
83 : /// Gets or sets the <see cref="IDependencyResolver"/>.
84 : /// </summary>
85 : protected IDependencyResolver DependencyResolver { get; private set; }
86 :
87 : /// <summary>
88 : /// Prepares a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
89 : /// </summary>
90 : /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
91 : /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
92 : /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
93 2 : public virtual void PrepareCommand<TCommand>(TCommand command, string framework)
94 : where TCommand : ICommand<TAuthenticationToken>
95 : {
96 : if (command.AuthenticationToken == null || command.AuthenticationToken.Equals(default(TAuthenticationToken)))
97 : command.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
98 : command.CorrelationId = CorrelationIdHelper.GetCorrelationId();
99 :
100 : if (string.IsNullOrWhiteSpace(command.OriginatingFramework))
101 : command.OriginatingFramework = framework;
102 : var frameworks = new List<string>();
103 : if (command.Frameworks != null)
104 : frameworks.AddRange(command.Frameworks);
105 : frameworks.Add(framework);
106 : command.Frameworks = frameworks;
107 : }
108 :
109 : /// <summary>
110 : /// Prepares and validates a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
111 : /// </summary>
112 : /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
113 : /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
114 : /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
115 2 : public virtual bool PrepareAndValidateCommand<TCommand>(TCommand command, string framework)
116 : where TCommand : ICommand<TAuthenticationToken>
117 : {
118 : Type commandType = command.GetType();
119 :
120 : if (command.Frameworks != null && command.Frameworks.Contains(framework))
121 : {
122 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
123 : if (command.Frameworks.Count() != 1)
124 : {
125 : Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
126 : return false;
127 : }
128 : }
129 :
130 : ICommandValidator<TAuthenticationToken, TCommand> commandValidator = null;
131 : try
132 : {
133 : commandValidator = DependencyResolver.Resolve<ICommandValidator<TAuthenticationToken, TCommand>>();
134 : }
135 : catch (Exception exception)
136 : {
137 : Logger.LogDebug("Locating an ICommandValidator failed.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName), exception);
138 : }
139 :
140 : if (commandValidator != null && !commandValidator.IsCommandValid(command))
141 : {
142 : Logger.LogInfo("The provided command is not valid.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
143 : return false;
144 : }
145 :
146 : PrepareCommand(command, framework);
147 : return true;
148 : }
149 :
150 : /// <summary>
151 : /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveCommandHandler"/>.
152 : /// </summary>
153 : /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
154 : /// <param name="receiveCommandHandler">The handler method that will process the <see cref="ICommand{TAuthenticationToken}"/>.</param>
155 : /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
156 : /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="ICommand{TAuthenticationToken}"/> is being skipped.</param>
157 : /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
158 : /// <returns>The <see cref="ICommand{TAuthenticationToken}"/> that was processed.</returns>
159 2 : public virtual ICommand<TAuthenticationToken> ReceiveCommand(string messageBody, Func<ICommand<TAuthenticationToken>, bool?> receiveCommandHandler, string messageId, Action skippedAction = null, Action lockRefreshAction = null)
160 : {
161 : ICommand<TAuthenticationToken> command;
162 : try
163 : {
164 : command = MessageSerialiser.DeserialiseCommand(messageBody);
165 : }
166 : catch (JsonSerializationException exception)
167 : {
168 : JsonSerializationException checkException = exception;
169 : bool safeToExit = false;
170 : do
171 : {
172 : if (checkException.Message.StartsWith("Could not load assembly"))
173 : {
174 : safeToExit = true;
175 : break;
176 : }
177 : } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
178 : if (safeToExit)
179 : {
180 : const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
181 : Match match = new Regex(pattern).Match(exception.Message);
182 : if (match.Success)
183 : {
184 : string[] typeParts = match.Value.Split(',');
185 : if (typeParts.Length == 2)
186 : {
187 : string classType = typeParts[0];
188 : bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
189 :
190 : if (!isRequired)
191 : {
192 : if (skippedAction != null)
193 : skippedAction();
194 : return null;
195 : }
196 : }
197 : }
198 : }
199 : throw;
200 : }
201 :
202 : string commandTypeName = command.GetType().FullName;
203 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
204 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
205 : Logger.LogInfo(string.Format("A command message arrived with the {0} was of type {1}.", messageId, commandTypeName));
206 :
207 : bool canRefresh;
208 : if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", commandTypeName), out canRefresh))
209 : canRefresh = false;
210 :
211 : if (canRefresh)
212 : {
213 : if (lockRefreshAction == null)
214 : Logger.LogWarning(string.Format("A command message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, commandTypeName));
215 : else
216 : lockRefreshAction();
217 : }
218 :
219 : // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
220 : bool? result = receiveCommandHandler(command);
221 : if (result != null && !result.Value)
222 : if (skippedAction != null)
223 : skippedAction();
224 :
225 : return command;
226 : }
227 :
228 : /// <summary>
229 : /// The default command handler that
230 : /// check if the <see cref="ICommand{TAuthenticationToken}"/> has already been processed by this framework,
231 : /// checks if the <see cref="ICommand{TAuthenticationToken}"/> is required,
232 : /// finds the handler from the provided <paramref name="routeManager"/>.
233 : /// </summary>
234 : /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to process.</param>
235 : /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="ICommandHandler{TAuthenticationToken,TCommand}"/> from.</param>
236 : /// <param name="framework">The current framework.</param>
237 : /// <returns>
238 : /// True indicates the <paramref name="command"/> was successfully handled by a handler.
239 : /// False indicates the <paramref name="command"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
240 : /// Null indicates the command<paramref name="command"/> wasn't handled as it was already handled.
241 : /// </returns>
242 2 : public virtual bool? DefaultReceiveCommand(ICommand<TAuthenticationToken> command, RouteManager routeManager, string framework)
243 : {
244 : Type commandType = command.GetType();
245 :
246 : if (command.Frameworks != null && command.Frameworks.Contains(framework))
247 : {
248 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
249 : if (command.Frameworks.Count() != 1)
250 : {
251 : Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\DefaultReceiveCommand({1})", GetType().FullName, commandType.FullName));
252 : return null;
253 : }
254 : }
255 :
256 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
257 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
258 :
259 : bool isRequired = BusHelper.IsEventRequired(commandType);
260 :
261 : RouteHandlerDelegate commandHandler = routeManager.GetSingleHandler(command, isRequired);
262 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
263 : if (commandHandler == null)
264 : {
265 : Logger.LogDebug(string.Format("The command handler for '{0}' is not required.", commandType.FullName));
266 : return false;
267 : }
268 :
269 : Action<IMessage> handler = commandHandler.Delegate;
270 : handler(command);
271 : return true;
272 : }
273 :
274 : /// <summary>
275 : /// Prepares an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
276 : /// </summary>
277 : /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
278 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
279 : /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
280 2 : public virtual void PrepareEvent<TEvent>(TEvent @event, string framework)
281 : where TEvent : IEvent<TAuthenticationToken>
282 : {
283 : if (@event.AuthenticationToken == null || @event.AuthenticationToken.Equals(default(TAuthenticationToken)))
284 : @event.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
285 : @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
286 : @event.TimeStamp = DateTimeOffset.UtcNow;
287 :
288 : if (string.IsNullOrWhiteSpace(@event.OriginatingFramework))
289 : @event.OriginatingFramework = framework;
290 : var frameworks = new List<string>();
291 : if (@event.Frameworks != null)
292 : frameworks.AddRange(@event.Frameworks);
293 : frameworks.Add(framework);
294 : @event.Frameworks = frameworks;
295 : }
296 :
297 : /// <summary>
298 : /// Prepares and validates an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
299 : /// </summary>
300 : /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
301 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
302 : /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
303 2 : public virtual bool PrepareAndValidateEvent<TEvent>(TEvent @event, string framework)
304 : where TEvent : IEvent<TAuthenticationToken>
305 : {
306 : Type eventType = @event.GetType();
307 :
308 : if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
309 : {
310 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
311 : if (@event.Frameworks.Count() != 1)
312 : {
313 : Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, eventType.FullName));
314 : return false;
315 : }
316 : }
317 :
318 : PrepareEvent(@event, framework);
319 : return true;
320 : }
321 :
322 : /// <summary>
323 : /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveEventHandler"/>.
324 : /// </summary>
325 : /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
326 : /// <param name="receiveEventHandler">The handler method that will process the <see cref="IEvent{TAuthenticationToken}"/>.</param>
327 : /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
328 : /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="IEvent{TAuthenticationToken}"/> is being skipped.</param>
329 : /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
330 : /// <returns>The <see cref="IEvent{TAuthenticationToken}"/> that was processed.</returns>
331 2 : public virtual IEvent<TAuthenticationToken> ReceiveEvent(string messageBody, Func<IEvent<TAuthenticationToken>, bool?> receiveEventHandler, string messageId, Action skippedAction = null, Action lockRefreshAction = null)
332 : {
333 : IEvent<TAuthenticationToken> @event;
334 : try
335 : {
336 : @event = MessageSerialiser.DeserialiseEvent(messageBody);
337 : }
338 : catch (JsonSerializationException exception)
339 : {
340 : JsonSerializationException checkException = exception;
341 : bool safeToExit = false;
342 : do
343 : {
344 : if (checkException.Message.StartsWith("Could not load assembly"))
345 : {
346 : safeToExit = true;
347 : break;
348 : }
349 : } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
350 : if (safeToExit)
351 : {
352 : const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
353 : Match match = new Regex(pattern).Match(exception.Message);
354 : if (match.Success)
355 : {
356 : string[] typeParts = match.Value.Split(',');
357 : if (typeParts.Length == 2)
358 : {
359 : string classType = typeParts[0];
360 : bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
361 :
362 : if (!isRequired)
363 : {
364 : if (skippedAction != null)
365 : skippedAction();
366 : return null;
367 : }
368 : }
369 : }
370 : }
371 : throw;
372 : }
373 :
374 : string eventTypeName = @event.GetType().FullName;
375 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
376 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
377 : Logger.LogInfo(string.Format("An event message arrived with the {0} was of type {1}.", messageId, eventTypeName));
378 :
379 : bool canRefresh;
380 : if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", eventTypeName), out canRefresh))
381 : canRefresh = false;
382 :
383 : if (canRefresh)
384 : {
385 : if (lockRefreshAction == null)
386 : Logger.LogWarning(string.Format("An event message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, eventTypeName));
387 : else
388 : lockRefreshAction();
389 : }
390 :
391 : // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
392 : bool? result = receiveEventHandler(@event);
393 : if (result != null && !result.Value)
394 : if (skippedAction != null)
395 : skippedAction();
396 :
397 : return @event;
398 : }
399 :
400 : /// <summary>
401 : /// Refreshes the network lock.
402 : /// </summary>
403 2 : public virtual void RefreshLock(CancellationTokenSource brokeredMessageRenewCancellationTokenSource, BrokeredMessage message, string type = "message")
404 : {
405 : Task.Factory.StartNewSafely(() =>
406 : {
407 : // The capturing of ObjectDisposedException is because even the properties can throw it.
408 : try
409 : {
410 : long loop = long.MinValue;
411 : while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
412 : {
413 : // Based on LockedUntilUtc property to determine if the lock expires soon
414 : // We lock for 45 seconds to ensure any thread based issues are mitigated.
415 : if (DateTime.UtcNow > message.LockedUntilUtc.AddSeconds(-45))
416 : {
417 : // If so, renew the lock
418 : for (int i = 0; i < 10; i++)
419 : {
420 : try
421 : {
422 : message.RenewLock();
423 : try
424 : {
425 : Logger.LogDebug(string.Format("Renewed the lock on {1} '{0}'.", message.MessageId, type));
426 : }
427 : catch
428 : {
429 : Trace.TraceError("Renewed the lock on {1} '{0}'.", message.MessageId, type);
430 : }
431 :
432 : break;
433 : }
434 : catch (ObjectDisposedException)
435 : {
436 : return;
437 : }
438 : catch (MessageLockLostException exception)
439 : {
440 : try
441 : {
442 : Logger.LogWarning(string.Format("Renewing the lock on {1} '{0}' failed as the message lock was lost.", message.MessageId, type), exception: exception);
443 : }
444 : catch
445 : {
446 : Trace.TraceError("Renewing the lock on {1} '{0}' failed as the message lock was lost.\r\n{2}", message.MessageId, type, exception.Message);
447 : }
448 : return;
449 : }
450 : catch (Exception exception)
451 : {
452 : try
453 : {
454 : Logger.LogWarning(string.Format("Renewing the lock on {1} '{0}' failed.", message.MessageId, type), exception: exception);
455 : }
456 : catch
457 : {
458 : Trace.TraceError("Renewing the lock on {1} '{0}' failed.\r\n{2}", message.MessageId, type, exception.Message);
459 : }
460 : if (i == 9)
461 : return;
462 : }
463 : }
464 : }
465 :
466 : if (loop++ % 5 == 0)
467 : Thread.Yield();
468 : else
469 : Thread.Sleep(500);
470 : if (loop == long.MaxValue)
471 : loop = long.MinValue;
472 : }
473 : try
474 : {
475 : brokeredMessageRenewCancellationTokenSource.Dispose();
476 : }
477 : catch (ObjectDisposedException) { }
478 : }
479 : catch (ObjectDisposedException) { }
480 : }, brokeredMessageRenewCancellationTokenSource.Token);
481 : }
482 :
483 : /// <summary>
484 : /// The default event handler that
485 : /// check if the <see cref="IEvent{TAuthenticationToken}"/> has already been processed by this framework,
486 : /// checks if the <see cref="IEvent{TAuthenticationToken}"/> is required,
487 : /// finds the handler from the provided <paramref name="routeManager"/>.
488 : /// </summary>
489 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to process.</param>
490 : /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="IEventHandler{TAuthenticationToken,TCommand}"/> from.</param>
491 : /// <param name="framework">The current framework.</param>
492 : /// <returns>
493 : /// True indicates the <paramref name="event"/> was successfully handled by a handler.
494 : /// False indicates the <paramref name="event"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
495 : /// Null indicates the <paramref name="event"/> wasn't handled as it was already handled.
496 : /// </returns>
497 2 : public virtual bool? DefaultReceiveEvent(IEvent<TAuthenticationToken> @event, RouteManager routeManager, string framework)
498 : {
499 : Type eventType = @event.GetType();
500 :
501 : if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
502 : {
503 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
504 : if (@event.Frameworks.Count() != 1)
505 : {
506 : Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\DefaultReceiveEvent({1})", GetType().FullName, eventType.FullName));
507 : return null;
508 : }
509 : }
510 :
511 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
512 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
513 :
514 : bool isRequired = BusHelper.IsEventRequired(eventType);
515 :
516 : IEnumerable<Action<IMessage>> handlers = routeManager.GetHandlers(@event, isRequired).Select(x => x.Delegate).ToList();
517 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
518 : if (!handlers.Any())
519 : {
520 : Logger.LogDebug(string.Format("The event handler for '{0}' is not required.", eventType.FullName));
521 : return false;
522 : }
523 :
524 : foreach (Action<IMessage> handler in handlers)
525 : handler(@event);
526 : return true;
527 : }
528 :
529 : /// <summary>
530 : /// Manually registers the provided <paramref name="handler"/>
531 : /// on the provided <paramref name="routeManger"/>
532 : /// </summary>
533 : /// <typeparam name="TMessage">The <see cref="Type"/> of <see cref="IMessage"/> the <paramref name="handler"/> can handle.</typeparam>
534 2 : public virtual void RegisterHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
535 : where TMessage : IMessage
536 : {
537 : Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
538 :
539 : routeManger.RegisterHandler(registerableHandler, targetedType);
540 :
541 : telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
542 : telemetryHelper.Flush();
543 : }
544 :
545 : /// <summary>
546 : /// Register an event handler that will listen and respond to all events.
547 : /// </summary>
548 2 : public void RegisterGlobalEventHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler,
549 : bool holdMessageLock = true) where TMessage : IMessage
550 : {
551 : Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
552 :
553 : routeManger.RegisterGlobalEventHandler(registerableHandler);
554 :
555 : telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterGlobalEventHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
556 : telemetryHelper.Flush();
557 : }
558 : }
559 : }
|