Line data Source code
1 : #region IMPORTANT NOTE
2 : // This is copied almost exactly into the eventhub except for a string difference. Replicate changes there until a refactor is done.
3 : #endregion
4 :
5 : #region Copyright
6 : // // -----------------------------------------------------------------------
7 : // // <copyright company="Chinchilla Software Limited">
8 : // // Copyright Chinchilla Software Limited. All rights reserved.
9 : // // </copyright>
10 : // // -----------------------------------------------------------------------
11 : #endregion
12 :
13 : using System;
14 : using System.Collections.Generic;
15 : using System.Diagnostics;
16 : using System.Linq;
17 : using System.Text.RegularExpressions;
18 : using System.Threading;
19 : using System.Threading.Tasks;
20 : using cdmdotnet.Logging;
21 : using Cqrs.Authentication;
22 : using Cqrs.Bus;
23 : using Cqrs.Commands;
24 : using Cqrs.Configuration;
25 : using Cqrs.Events;
26 : using Cqrs.Messages;
27 : using Microsoft.ServiceBus.Messaging;
28 : using Newtonsoft.Json;
29 :
30 : namespace Cqrs.Azure.ServiceBus
31 : {
32 : /// <summary>
33 : /// A helper for Azure Service Bus and Event Hub.
34 : /// </summary>
35 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
36 : public class AzureBusHelper<TAuthenticationToken> : IAzureBusHelper<TAuthenticationToken>
37 2 : {
38 : /// <summary>
39 : /// Instantiates a new instance of <see cref="AzureBusHelper{TAuthenticationToken}"/>.
40 : /// </summary>
41 2 : public AzureBusHelper(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IBusHelper busHelper, IConfigurationManager configurationManager, IDependencyResolver dependencyResolver)
42 : {
43 : AuthenticationTokenHelper = authenticationTokenHelper;
44 : CorrelationIdHelper = correlationIdHelper;
45 : Logger = logger;
46 : MessageSerialiser = messageSerialiser;
47 : BusHelper = busHelper;
48 : DependencyResolver = dependencyResolver;
49 : ConfigurationManager = configurationManager;
50 : }
51 :
52 : /// <summary>
53 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
54 : /// </summary>
55 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
56 :
57 : /// <summary>
58 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
59 : /// </summary>
60 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
61 :
62 : /// <summary>
63 : /// Gets or sets the <see cref="ILogger"/>.
64 : /// </summary>
65 : protected ILogger Logger { get; private set; }
66 :
67 : /// <summary>
68 : /// Gets or sets the <see cref="IMessageSerialiser{TAuthenticationToken}"/>.
69 : /// </summary>
70 : protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
71 :
72 : /// <summary>
73 : /// Gets or sets the <see cref="IBusHelper"/>.
74 : /// </summary>
75 : protected IBusHelper BusHelper { get; private set; }
76 :
77 : /// <summary>
78 : /// Gets or sets the <see cref="IConfigurationManager"/>.
79 : /// </summary>
80 : protected IConfigurationManager ConfigurationManager { get; private set; }
81 :
82 : /// <summary>
83 : /// Gets or sets the <see cref="IDependencyResolver"/>.
84 : /// </summary>
85 : protected IDependencyResolver DependencyResolver { get; private set; }
86 :
87 : /// <summary>
88 : /// Prepares a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
89 : /// </summary>
90 : /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
91 : /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
92 : /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
93 2 : public virtual void PrepareCommand<TCommand>(TCommand command, string framework)
94 : where TCommand : ICommand<TAuthenticationToken>
95 : {
96 : if (command.AuthenticationToken == null || command.AuthenticationToken.Equals(default(TAuthenticationToken)))
97 : command.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
98 : command.CorrelationId = CorrelationIdHelper.GetCorrelationId();
99 :
100 : if (string.IsNullOrWhiteSpace(command.OriginatingFramework))
101 : command.OriginatingFramework = framework;
102 : var frameworks = new List<string>();
103 : if (command.Frameworks != null)
104 : frameworks.AddRange(command.Frameworks);
105 : frameworks.Add("Azure-ServiceBus");
106 : command.Frameworks = frameworks;
107 : }
108 :
109 : /// <summary>
110 : /// Prepares and validates a <see cref="ICommand{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
111 : /// </summary>
112 : /// <typeparam name="TCommand">The <see cref="Type"/> of<see cref="ICommand{TAuthenticationToken}"/> being sent.</typeparam>
113 : /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to send.</param>
114 : /// <param name="framework">The framework the <paramref name="command"/> is being sent from.</param>
115 2 : public virtual bool PrepareAndValidateCommand<TCommand>(TCommand command, string framework)
116 : where TCommand : ICommand<TAuthenticationToken>
117 : {
118 : Type commandType = command.GetType();
119 :
120 : if (command.Frameworks != null && command.Frameworks.Contains(framework))
121 : {
122 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
123 : if (command.Frameworks.Count() != 1)
124 : {
125 : Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
126 : return false;
127 : }
128 : }
129 :
130 : ICommandValidator<TAuthenticationToken, TCommand> commandValidator = null;
131 : try
132 : {
133 : commandValidator = DependencyResolver.Resolve<ICommandValidator<TAuthenticationToken, TCommand>>();
134 : }
135 : catch (Exception exception)
136 : {
137 : Logger.LogDebug("Locating an ICommandValidator failed.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName), exception);
138 : }
139 :
140 : if (commandValidator != null && !commandValidator.IsCommandValid(command))
141 : {
142 : Logger.LogInfo("The provided command is not valid.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
143 : return false;
144 : }
145 :
146 : PrepareCommand(command, framework);
147 : return true;
148 : }
149 :
150 : /// <summary>
151 : /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveCommandHandler"/>.
152 : /// </summary>
153 : /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
154 : /// <param name="receiveCommandHandler">The handler method that will process the <see cref="ICommand{TAuthenticationToken}"/>.</param>
155 : /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
156 : /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="ICommand{TAuthenticationToken}"/> is being skipped.</param>
157 : /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
158 : /// <returns>The <see cref="ICommand{TAuthenticationToken}"/> that was processed.</returns>
159 2 : public virtual ICommand<TAuthenticationToken> ReceiveCommand(string messageBody, Func<ICommand<TAuthenticationToken>, bool?> receiveCommandHandler, string messageId, Action skippedAction = null, Action lockRefreshAction = null)
160 : {
161 : ICommand<TAuthenticationToken> command;
162 : try
163 : {
164 : command = MessageSerialiser.DeserialiseCommand(messageBody);
165 : }
166 : catch (JsonSerializationException exception)
167 : {
168 : JsonSerializationException checkException = exception;
169 : bool safeToExit = false;
170 : do
171 : {
172 : if (checkException.Message.StartsWith("Could not load assembly"))
173 : {
174 : safeToExit = true;
175 : break;
176 : }
177 : } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
178 : if (safeToExit)
179 : {
180 : const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
181 : Match match = new Regex(pattern).Match(exception.Message);
182 : if (match.Success)
183 : {
184 : string[] typeParts = match.Value.Split(',');
185 : if (typeParts.Length == 2)
186 : {
187 : string classType = typeParts[0];
188 : bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
189 :
190 : if (!isRequired)
191 : {
192 : if (skippedAction != null)
193 : skippedAction();
194 : return null;
195 : }
196 : }
197 : }
198 : }
199 : throw;
200 : }
201 :
202 : string commandTypeName = command.GetType().FullName;
203 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
204 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
205 : Logger.LogInfo(string.Format("A command message arrived with the {0} was of type {1}.", messageId, commandTypeName));
206 :
207 : bool canRefresh;
208 : if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", commandTypeName), out canRefresh))
209 : canRefresh = false;
210 :
211 : if (canRefresh)
212 : {
213 : if (lockRefreshAction == null)
214 : Logger.LogWarning(string.Format("A command message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, commandTypeName));
215 : else
216 : lockRefreshAction();
217 : }
218 :
219 : // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
220 : bool? result = receiveCommandHandler(command);
221 : if (result != null && !result.Value)
222 : if (skippedAction != null)
223 : skippedAction();
224 :
225 : return command;
226 : }
227 :
228 : /// <summary>
229 : /// The default command handler that
230 : /// check if the <see cref="ICommand{TAuthenticationToken}"/> has already been processed by this framework,
231 : /// checks if the <see cref="ICommand{TAuthenticationToken}"/> is required,
232 : /// finds the handler from the provided <paramref name="routeManager"/>.
233 : /// </summary>
234 : /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to process.</param>
235 : /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="ICommandHandler{TAuthenticationToken,TCommand}"/> from.</param>
236 : /// <param name="framework">The current framework.</param>
237 : /// <returns>
238 : /// True indicates the <paramref name="command"/> was successfully handled by a handler.
239 : /// False indicates the <paramref name="command"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
240 : /// Null indicates the command<paramref name="command"/> wasn't handled as it was already handled.
241 : /// </returns>
242 2 : public virtual bool? DefaultReceiveCommand(ICommand<TAuthenticationToken> command, RouteManager routeManager, string framework)
243 : {
244 : Type commandType = command.GetType();
245 :
246 : if (command.Frameworks != null && command.Frameworks.Contains(framework))
247 : {
248 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
249 : if (command.Frameworks.Count() != 1)
250 : {
251 : Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\DefaultReceiveCommand({1})", GetType().FullName, commandType.FullName));
252 : return null;
253 : }
254 : }
255 :
256 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
257 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
258 :
259 : bool isRequired = BusHelper.IsEventRequired(commandType);
260 :
261 : RouteHandlerDelegate commandHandler = routeManager.GetSingleHandler(command, isRequired);
262 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
263 : if (commandHandler == null)
264 : {
265 : Logger.LogDebug(string.Format("The command handler for '{0}' is not required.", commandType.FullName));
266 : return false;
267 : }
268 :
269 : Action<IMessage> handler = commandHandler.Delegate;
270 : handler(command);
271 : return true;
272 : }
273 :
274 : /// <summary>
275 : /// Prepares an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
276 : /// </summary>
277 : /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
278 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
279 : /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
280 2 : public virtual void PrepareEvent<TEvent>(TEvent @event, string framework)
281 : where TEvent : IEvent<TAuthenticationToken>
282 : {
283 : if (@event.AuthenticationToken == null || @event.AuthenticationToken.Equals(default(TAuthenticationToken)))
284 : @event.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
285 : @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
286 : @event.TimeStamp = DateTimeOffset.UtcNow;
287 :
288 : if (string.IsNullOrWhiteSpace(@event.OriginatingFramework))
289 : @event.OriginatingFramework = framework;
290 : var frameworks = new List<string>();
291 : if (@event.Frameworks != null)
292 : frameworks.AddRange(@event.Frameworks);
293 : frameworks.Add(framework);
294 : @event.Frameworks = frameworks;
295 : }
296 :
297 : /// <summary>
298 : /// Prepares and validates an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
299 : /// </summary>
300 : /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
301 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
302 : /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
303 2 : public virtual bool PrepareAndValidateEvent<TEvent>(TEvent @event, string framework)
304 : where TEvent : IEvent<TAuthenticationToken>
305 : {
306 : Type eventType = @event.GetType();
307 :
308 : if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
309 : {
310 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
311 : if (@event.Frameworks.Count() != 1)
312 : {
313 : Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, eventType.FullName));
314 : return false;
315 : }
316 : }
317 :
318 : PrepareEvent(@event, framework);
319 : return true;
320 : }
321 :
322 : /// <summary>
323 : /// Deserialises and processes the <paramref name="messageBody"/> received from the network through the provided <paramref name="receiveEventHandler"/>.
324 : /// </summary>
325 : /// <param name="messageBody">A serialised <see cref="IMessage"/>.</param>
326 : /// <param name="receiveEventHandler">The handler method that will process the <see cref="IEvent{TAuthenticationToken}"/>.</param>
327 : /// <param name="messageId">The network id of the <see cref="IMessage"/>.</param>
328 : /// <param name="skippedAction">The <see cref="Action"/> to call when the <see cref="IEvent{TAuthenticationToken}"/> is being skipped.</param>
329 : /// <param name="lockRefreshAction">The <see cref="Action"/> to call to refresh the network lock.</param>
330 : /// <returns>The <see cref="IEvent{TAuthenticationToken}"/> that was processed.</returns>
331 2 : public virtual IEvent<TAuthenticationToken> ReceiveEvent(string messageBody, Func<IEvent<TAuthenticationToken>, bool?> receiveEventHandler, string messageId, Action skippedAction = null, Action lockRefreshAction = null)
332 : {
333 : IEvent<TAuthenticationToken> @event;
334 : try
335 : {
336 :
337 : @event = MessageSerialiser.DeserialiseEvent(messageBody);
338 : }
339 : catch (JsonSerializationException exception)
340 : {
341 : JsonSerializationException checkException = exception;
342 : bool safeToExit = false;
343 : do
344 : {
345 : if (checkException.Message.StartsWith("Could not load assembly"))
346 : {
347 : safeToExit = true;
348 : break;
349 : }
350 : } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
351 : if (safeToExit)
352 : {
353 : const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
354 : Match match = new Regex(pattern).Match(exception.Message);
355 : if (match.Success)
356 : {
357 : string[] typeParts = match.Value.Split(',');
358 : if (typeParts.Length == 2)
359 : {
360 : string classType = typeParts[0];
361 : bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
362 :
363 : if (!isRequired)
364 : {
365 : if (skippedAction != null)
366 : skippedAction();
367 : return null;
368 : }
369 : }
370 : }
371 : }
372 : throw;
373 : }
374 :
375 : string eventTypeName = @event.GetType().FullName;
376 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
377 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
378 : Logger.LogInfo(string.Format("An event message arrived with the {0} was of type {1}.", messageId, eventTypeName));
379 :
380 : bool canRefresh;
381 : if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", eventTypeName), out canRefresh))
382 : canRefresh = false;
383 :
384 : if (canRefresh)
385 : {
386 : if (lockRefreshAction == null)
387 : Logger.LogWarning(string.Format("An event message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, eventTypeName));
388 : else
389 : lockRefreshAction();
390 : }
391 :
392 : // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
393 : bool? result = receiveEventHandler(@event);
394 : if (result != null && !result.Value)
395 : if (skippedAction != null)
396 : skippedAction();
397 :
398 : return @event;
399 : }
400 :
401 : /// <summary>
402 : /// Refreshes the network lock.
403 : /// </summary>
404 2 : public virtual void RefreshLock(CancellationTokenSource brokeredMessageRenewCancellationTokenSource, BrokeredMessage message, string type = "message")
405 : {
406 : Task.Factory.StartNewSafely(() =>
407 : {
408 : // The capturing of ObjectDisposedException is because even the properties can throw it.
409 : try
410 : {
411 : long loop = long.MinValue;
412 : while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
413 : {
414 : // Based on LockedUntilUtc property to determine if the lock expires soon
415 : // We lock for 45 seconds to ensure any thread based issues are mitigated.
416 : if (DateTime.UtcNow > message.LockedUntilUtc.AddSeconds(-45))
417 : {
418 : // If so, renew the lock
419 : for (int i = 0; i < 10; i++)
420 : {
421 : try
422 : {
423 : message.RenewLock();
424 : try
425 : {
426 : Logger.LogDebug(string.Format("Renewed the lock on {1} '{0}'.", message.MessageId, type));
427 : }
428 : catch
429 : {
430 : Trace.TraceError("Renewed the lock on {1} '{0}'.", message.MessageId, type);
431 : }
432 :
433 : break;
434 : }
435 : catch (ObjectDisposedException)
436 : {
437 : return;
438 : }
439 : catch (MessageLockLostException exception)
440 : {
441 : try
442 : {
443 : Logger.LogWarning(string.Format("Renewing the lock on {1} '{0}' failed as the message lock was lost.", message.MessageId, type), exception: exception);
444 : }
445 : catch
446 : {
447 : Trace.TraceError("Renewing the lock on {1} '{0}' failed as the message lock was lost.\r\n{2}", message.MessageId, type, exception.Message);
448 : }
449 : return;
450 : }
451 : catch (Exception exception)
452 : {
453 : try
454 : {
455 : Logger.LogWarning(string.Format("Renewing the lock on {1} '{0}' failed.", message.MessageId, type), exception: exception);
456 : }
457 : catch
458 : {
459 : Trace.TraceError("Renewing the lock on {1} '{0}' failed.\r\n{2}", message.MessageId, type, exception.Message);
460 : }
461 : if (i == 9)
462 : return;
463 : }
464 : }
465 : }
466 :
467 : if (loop++ % 5 == 0)
468 : Thread.Yield();
469 : else
470 : Thread.Sleep(500);
471 : if (loop == long.MaxValue)
472 : loop = long.MinValue;
473 : }
474 : try
475 : {
476 : brokeredMessageRenewCancellationTokenSource.Dispose();
477 : }
478 : catch (ObjectDisposedException) { }
479 : }
480 : catch (ObjectDisposedException) { }
481 : }, brokeredMessageRenewCancellationTokenSource.Token);
482 : }
483 :
484 : /// <summary>
485 : /// The default event handler that
486 : /// check if the <see cref="IEvent{TAuthenticationToken}"/> has already been processed by this framework,
487 : /// checks if the <see cref="IEvent{TAuthenticationToken}"/> is required,
488 : /// finds the handler from the provided <paramref name="routeManager"/>.
489 : /// </summary>
490 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to process.</param>
491 : /// <param name="routeManager">The <see cref="RouteManager"/> to get the <see cref="IEventHandler{TAuthenticationToken,TCommand}"/> from.</param>
492 : /// <param name="framework">The current framework.</param>
493 : /// <returns>
494 : /// True indicates the <paramref name="event"/> was successfully handled by a handler.
495 : /// False indicates the <paramref name="event"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
496 : /// Null indicates the <paramref name="event"/> wasn't handled as it was already handled.
497 : /// </returns>
498 2 : public virtual bool? DefaultReceiveEvent(IEvent<TAuthenticationToken> @event, RouteManager routeManager, string framework)
499 : {
500 : Type eventType = @event.GetType();
501 :
502 : if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
503 : {
504 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
505 : if (@event.Frameworks.Count() != 1)
506 : {
507 : Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\DefaultReceiveEvent({1})", GetType().FullName, eventType.FullName));
508 : return null;
509 : }
510 : }
511 :
512 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
513 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
514 :
515 : bool isRequired = BusHelper.IsEventRequired(eventType);
516 :
517 : IEnumerable<Action<IMessage>> handlers = routeManager.GetHandlers(@event, isRequired).Select(x => x.Delegate).ToList();
518 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
519 : if (!handlers.Any())
520 : {
521 : Logger.LogDebug(string.Format("The event handler for '{0}' is not required.", eventType.FullName));
522 : return false;
523 : }
524 :
525 : foreach (Action<IMessage> handler in handlers)
526 : handler(@event);
527 : return true;
528 : }
529 :
530 : /// <summary>
531 : /// Manually registers the provided <paramref name="handler"/>
532 : /// on the provided <paramref name="routeManger"/>
533 : /// </summary>
534 : /// <typeparam name="TMessage">The <see cref="Type"/> of <see cref="IMessage"/> the <paramref name="handler"/> can handle.</typeparam>
535 2 : public virtual void RegisterHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
536 : where TMessage : IMessage
537 : {
538 : Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
539 :
540 : routeManger.RegisterHandler(registerableHandler, targetedType);
541 :
542 : telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
543 : telemetryHelper.Flush();
544 : }
545 :
546 : /// <summary>
547 : /// Register an event handler that will listen and respond to all events.
548 : /// </summary>
549 2 : public void RegisterGlobalEventHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler,
550 : bool holdMessageLock = true) where TMessage : IMessage
551 : {
552 : Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
553 :
554 : routeManger.RegisterGlobalEventHandler(registerableHandler);
555 :
556 : telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterGlobalEventHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
557 : telemetryHelper.Flush();
558 : }
559 : }
560 : }
|