Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Diagnostics;
13 : using System.Linq;
14 : using Chinchilla.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Commands;
17 : using Cqrs.Configuration;
18 : using Cqrs.Events;
19 : using Cqrs.Messages;
20 : using SpinWait = Cqrs.Infrastructure.SpinWait;
21 :
22 : namespace Cqrs.Bus
23 : {
24 : /// <summary>
25 : /// An in process command bus
26 : /// (<see cref="ICommandPublisher{TAuthenticationToken}"/> and <see cref="ICommandReceiver{TAuthenticationToken}"/>)
27 : /// event bus
28 : /// (<see cref="IEventPublisher{TAuthenticationToken}"/> and <see cref="IEventHandler{TAuthenticationToken,TTarget,TEvent}"/>)
29 : /// as well as a <see cref="IEventHandlerRegistrar"/> and <see cref="ICommandHandlerRegistrar"/> that requires no networking.
30 : /// </summary>
31 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
32 : public class InProcessBus<TAuthenticationToken>
33 : : IPublishAndWaitCommandPublisher<TAuthenticationToken>
34 : , IEventPublisher<TAuthenticationToken>
35 : , IEventHandlerRegistrar
36 : , ICommandHandlerRegistrar
37 : , ICommandReceiver<TAuthenticationToken>
38 : , IEventReceiver<TAuthenticationToken>
39 1 : {
40 : /// <summary>
41 : /// Gets or sets the Route Manager
42 : /// </summary>
43 : private static RouteManager Routes { get; set; }
44 :
45 : /// <summary>
46 : /// Gets or sets the Authentication Token Helper
47 : /// </summary>
48 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
49 :
50 : /// <summary>
51 : /// Gets or sets the CorrelationId Helper
52 : /// </summary>
53 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
54 :
55 : /// <summary>
56 : /// Gets or sets the Dependency Resolver
57 : /// </summary>
58 : protected IDependencyResolver DependencyResolver { get; private set; }
59 :
60 : /// <summary>
61 : /// Gets or sets the Logger
62 : /// </summary>
63 : protected ILogger Logger { get; private set; }
64 :
65 : /// <summary>
66 : /// Gets or sets the Configuration Manager
67 : /// </summary>
68 : protected IConfigurationManager ConfigurationManager { get; private set; }
69 :
70 : /// <summary>
71 : /// Gets or sets the Bus Helper
72 : /// </summary>
73 : protected IBusHelper BusHelper { get; private set; }
74 :
75 : /// <summary>
76 : /// Gets or sets the current list of events waiting to be evaluated for <see cref="PublishAndWait{TCommand,TEvent}(TCommand,Cqrs.Events.IEventReceiver{TAuthenticationToken})"/>
77 : /// </summary>
78 : protected IDictionary<Guid, IList<IEvent<TAuthenticationToken>>> EventWaits { get; private set; }
79 :
80 : /// <summary>
81 : /// Gets or sets the Telemetry Helper
82 : /// </summary>
83 : protected ITelemetryHelper TelemetryHelper { get; set; }
84 :
85 : static InProcessBus()
86 : {
87 : Routes = new RouteManager();
88 : }
89 :
90 : /// <summary>
91 : /// Instantiates a new instance of the <see cref="InProcessBus{TAuthenticationToken}"/> class.
92 : /// </summary>
93 1 : public InProcessBus(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, IDependencyResolver dependencyResolver, ILogger logger, IConfigurationManager configurationManager, IBusHelper busHelper)
94 : {
95 : AuthenticationTokenHelper = authenticationTokenHelper;
96 : CorrelationIdHelper = correlationIdHelper;
97 : DependencyResolver = dependencyResolver;
98 : Logger = logger;
99 : ConfigurationManager = configurationManager;
100 : BusHelper = busHelper;
101 : EventWaits = new ConcurrentDictionary<Guid, IList<IEvent<TAuthenticationToken>>>();
102 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.InProcessBus.UseApplicationInsightTelemetryHelper", correlationIdHelper);
103 : }
104 :
105 : /// <summary>
106 : /// Sets the
107 : /// <see cref="IMessageWithAuthenticationToken{TAuthenticationToken}.AuthenticationToken"/>,
108 : /// <see cref="IMessage.CorrelationId"/>,
109 : /// <see cref="IMessage.OriginatingFramework"/> to "Built-In" and
110 : /// adds a value of "Built-In" to the <see cref="IMessage.Frameworks"/>
111 : /// if not already done so
112 : /// </summary>
113 1 : protected virtual void PrepareCommand<TCommand>(TCommand command)
114 : where TCommand : ICommand<TAuthenticationToken>
115 : {
116 : if (command.AuthenticationToken == null || command.AuthenticationToken.Equals(default(TAuthenticationToken)))
117 : command.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
118 : command.CorrelationId = CorrelationIdHelper.GetCorrelationId();
119 :
120 : if (string.IsNullOrWhiteSpace(command.OriginatingFramework))
121 : command.OriginatingFramework = "Built-In";
122 :
123 : var frameworks = new List<string>();
124 : if (command.Frameworks != null)
125 : frameworks.AddRange(command.Frameworks);
126 : frameworks.Add("Built-In");
127 : command.Frameworks = frameworks;
128 : }
129 :
130 : /// <summary>
131 : /// Locates a suitable <see cref="ICommandValidator{TAuthenticationToken,TCommand}"/> to validate the provided <paramref name="command"/> and validates the provided <paramref name="command"/> if one is located
132 : /// Calls <see cref="PrepareCommand{TCommand}"/>
133 : /// Checks if the provided <paramref name="command"/> is required to be processed
134 : /// Locates a single <see cref="RouteHandlerDelegate">command handler</see> for the provided <paramref name="command"/>
135 : /// </summary>
136 : /// <returns>
137 : /// False if a suitable <see cref="ICommandValidator{TAuthenticationToken,TCommand}"/> is located and the provided <paramref name="command"/> fails validation,
138 : /// False if no <see cref="RouteHandlerDelegate">command handler</see> is found but the command isn't required to be handled,
139 : /// True otherwise.
140 : /// </returns>
141 1 : protected virtual bool PrepareAndValidateCommand<TCommand>(TCommand command, out RouteHandlerDelegate commandHandler)
142 : where TCommand : ICommand<TAuthenticationToken>
143 : {
144 : Type commandType = command.GetType();
145 :
146 : if (command.Frameworks != null && command.Frameworks.Contains("Built-In"))
147 : {
148 : Logger.LogInfo("The provided command has already been processed by the Built-In bus.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
149 : commandHandler = null;
150 : return false;
151 : }
152 :
153 : ICommandValidator<TAuthenticationToken, TCommand> commandValidator = null;
154 : try
155 : {
156 : commandValidator = DependencyResolver.Resolve<ICommandValidator<TAuthenticationToken, TCommand>>();
157 : }
158 : catch (Exception exception)
159 : {
160 : Logger.LogDebug("Locating an ICommandValidator failed.", string.Format("{0}\\Handle({1})", GetType().FullName, commandType.FullName), exception);
161 : }
162 :
163 : if (commandValidator != null && !commandValidator.IsCommandValid(command))
164 : {
165 : Logger.LogInfo("The provided command is not valid.", string.Format("{0}\\Handle({1})", GetType().FullName, commandType.FullName));
166 : commandHandler = null;
167 : return false;
168 : }
169 :
170 : PrepareCommand(command);
171 :
172 : bool isRequired = BusHelper.IsEventRequired(commandType);
173 :
174 : commandHandler = Routes.GetSingleHandler(command, isRequired);
175 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
176 : if (commandHandler == null)
177 : {
178 : Logger.LogDebug(string.Format("The command handler for '{0}' is not required.", commandType.FullName));
179 : return false;
180 : }
181 :
182 : return true;
183 : }
184 :
185 : #region Implementation of ICommandPublisher<TAuthenticationToken>
186 :
187 : /// <summary>
188 : /// Publishes the provided <paramref name="command"/> on the command bus.
189 : /// </summary>
190 : void ICommandPublisher<TAuthenticationToken>.Publish<TCommand>(TCommand command)
191 : {
192 : Send(command);
193 : }
194 :
195 : /// <summary>
196 : /// Publishes the provided <paramref name="command"/> on the command bus.
197 : /// </summary>
198 1 : public virtual void Send<TCommand>(TCommand command)
199 : where TCommand : ICommand<TAuthenticationToken>
200 : {
201 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
202 : Stopwatch mainStopWatch = Stopwatch.StartNew();
203 : string responseCode = "200";
204 : bool wasSuccessfull = false;
205 :
206 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "InProcessBus" } };
207 : string telemetryName = string.Format("{0}/{1}", command.GetType().FullName, command.Id);
208 : var telemeteredCommand = command as ITelemeteredMessage;
209 : if (telemeteredCommand != null)
210 : telemetryName = telemeteredCommand.TelemetryName;
211 : telemetryName = string.Format("Command/{0}", telemetryName);
212 :
213 : try
214 : {
215 : RouteHandlerDelegate commandHandler;
216 : if (!PrepareAndValidateCommand(command, out commandHandler))
217 : return;
218 :
219 : try
220 : {
221 : Action<IMessage> handler = commandHandler.Delegate;
222 : handler(command);
223 : }
224 : catch (Exception exception)
225 : {
226 : responseCode = "500";
227 : Logger.LogError("An issue occurred while trying to publish a command.", exception: exception, metaData: new Dictionary<string, object> {{"Command", command}});
228 : throw;
229 : }
230 :
231 : Logger.LogInfo(string.Format("A command was sent of type {0}.", command.GetType().FullName));
232 : wasSuccessfull = true;
233 : }
234 : finally
235 : {
236 : mainStopWatch.Stop();
237 : TelemetryHelper.TrackDependency("InProcessBus/CommandBus", "Command", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
238 : }
239 : }
240 :
241 : /// <summary>
242 : /// Publishes the provided <paramref name="commands"/> on the command bus.
243 : /// </summary>
244 : void ICommandPublisher<TAuthenticationToken>.Publish<TCommand>(IEnumerable<TCommand> commands)
245 : {
246 : Send(commands);
247 : }
248 :
249 : /// <summary>
250 : /// Publishes the provided <paramref name="commands"/> on the command bus.
251 : /// </summary>
252 1 : public virtual void Send<TCommand>(IEnumerable<TCommand> commands)
253 : where TCommand : ICommand<TAuthenticationToken>
254 : {
255 : IEnumerable<TCommand> sourceCommands = commands.ToList();
256 :
257 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
258 : Stopwatch mainStopWatch = Stopwatch.StartNew();
259 : string responseCode = "500";
260 : bool wasSuccessfull = false;
261 :
262 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "InProcessBus" } };
263 : string telemetryName = "Commands";
264 : string telemetryNames = string.Empty;
265 : foreach (TCommand command in sourceCommands)
266 : {
267 : string subTelemetryName = string.Format("{0}/{1}", command.GetType().FullName, command.Id);
268 : var telemeteredCommand = command as ITelemeteredMessage;
269 : if (telemeteredCommand != null)
270 : subTelemetryName = telemeteredCommand.TelemetryName;
271 : telemetryNames = string.Format("{0}{1},", telemetryNames, subTelemetryName);
272 : }
273 : if (telemetryNames.Length > 0)
274 : telemetryNames = telemetryNames.Substring(0, telemetryNames.Length - 1);
275 : telemetryProperties.Add("Commands", telemetryNames);
276 :
277 : try
278 : {
279 : foreach (TCommand command in sourceCommands)
280 : Send(command);
281 :
282 : responseCode = "200";
283 : wasSuccessfull = true;
284 : }
285 : finally
286 : {
287 : mainStopWatch.Stop();
288 : TelemetryHelper.TrackDependency("InProcessBus/CommandBus", "Command", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
289 : }
290 : }
291 :
292 : #endregion
293 :
294 : #region Implementation of ISendAndWaitCommandSender<TAuthenticationToken>
295 :
296 : /// <summary>
297 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/>
298 : /// </summary>
299 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
300 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
301 1 : public virtual TEvent SendAndWait<TCommand, TEvent>(TCommand command, IEventReceiver<TAuthenticationToken> eventReceiver = null)
302 : where TCommand : ICommand<TAuthenticationToken>
303 : {
304 : return SendAndWait<TCommand, TEvent>(command, -1, eventReceiver);
305 : }
306 :
307 : /// <summary>
308 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
309 : /// </summary>
310 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
311 : /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see cref="F:System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
312 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
313 1 : public virtual TEvent SendAndWait<TCommand, TEvent>(TCommand command, int millisecondsTimeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
314 : where TCommand : ICommand<TAuthenticationToken>
315 : {
316 : return SendAndWait(command, events => (TEvent)events.SingleOrDefault(@event => @event is TEvent), millisecondsTimeout, eventReceiver);
317 : }
318 :
319 : /// <summary>
320 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
321 : /// </summary>
322 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
323 : /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
324 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
325 1 : public virtual TEvent SendAndWait<TCommand, TEvent>(TCommand command, TimeSpan timeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
326 : where TCommand : ICommand<TAuthenticationToken>
327 : {
328 : long num = (long)timeout.TotalMilliseconds;
329 : if (num < -1L || num > int.MaxValue)
330 : throw new ArgumentOutOfRangeException("timeout", timeout, "SpinWait_SpinUntil_TimeoutWrong");
331 : return SendAndWait<TCommand, TEvent>(command, (int)timeout.TotalMilliseconds, eventReceiver);
332 : }
333 :
334 : /// <summary>
335 : /// Sends the provided <paramref name="command"></paramref> and waits until the specified condition is satisfied an event of <typeparamref name="TEvent"/>
336 : /// </summary>
337 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
338 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
339 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
340 1 : public virtual TEvent SendAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, IEventReceiver<TAuthenticationToken> eventReceiver = null)
341 : where TCommand : ICommand<TAuthenticationToken>
342 : {
343 : return PublishAndWait(command, condition, eventReceiver);
344 : }
345 :
346 : /// <summary>
347 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
348 : /// </summary>
349 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
350 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
351 : /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see cref="F:System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
352 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
353 1 : public virtual TEvent SendAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, int millisecondsTimeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
354 : where TCommand : ICommand<TAuthenticationToken>
355 : {
356 : return PublishAndWait(command, condition, millisecondsTimeout, eventReceiver);
357 : }
358 :
359 : /// <summary>
360 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
361 : /// </summary>
362 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
363 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
364 : /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
365 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
366 1 : public virtual TEvent SendAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, TimeSpan timeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
367 : where TCommand : ICommand<TAuthenticationToken>
368 : {
369 : return PublishAndWait(command, condition, timeout, eventReceiver);
370 : }
371 :
372 : #endregion
373 :
374 : #region Implementation of IEventPublisher<TAuthenticationToken>
375 :
376 : /// <summary>
377 : /// Publishes the provided <paramref name="event"/> on the event bus.
378 : /// </summary>
379 1 : public virtual void Publish<TEvent>(TEvent @event)
380 : where TEvent : IEvent<TAuthenticationToken>
381 : {
382 : Type eventType = @event.GetType();
383 : string eventName = eventType.FullName;
384 : ISagaEvent<TAuthenticationToken> sagaEvent = @event as ISagaEvent<TAuthenticationToken>;
385 : if (sagaEvent != null)
386 : eventName = string.Format("Cqrs.Events.SagaEvent[{0}]", sagaEvent.Event.GetType().FullName);
387 :
388 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
389 : Stopwatch mainStopWatch = Stopwatch.StartNew();
390 : string responseCode = "200";
391 : bool wasSuccessfull = false;
392 :
393 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "InProcessBus" } };
394 : string telemetryName = string.Format("{0}/{1}/{2}", eventName, @event.GetIdentity(), @event.Id);
395 : var telemeteredEvent = @event as ITelemeteredMessage;
396 : if (telemeteredEvent != null)
397 : telemetryName = telemeteredEvent.TelemetryName;
398 : telemetryName = string.Format("Event/{0}", telemetryName);
399 :
400 : try
401 : {
402 : if (@event.Frameworks != null && @event.Frameworks.Contains("Built-In"))
403 : {
404 : Logger.LogInfo("The provided event has already been processed by the Built-In bus.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, eventType.FullName));
405 : return;
406 : }
407 :
408 : if (@event.AuthenticationToken == null || @event.AuthenticationToken.Equals(default(TAuthenticationToken)))
409 : @event.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
410 : @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
411 :
412 : if (string.IsNullOrWhiteSpace(@event.OriginatingFramework))
413 : {
414 : @event.TimeStamp = DateTimeOffset.UtcNow;
415 : @event.OriginatingFramework = "Built-In";
416 : }
417 :
418 : var frameworks = new List<string>();
419 : if (@event.Frameworks != null)
420 : frameworks.AddRange(@event.Frameworks);
421 : frameworks.Add("Built-In");
422 : @event.Frameworks = frameworks;
423 :
424 : bool isRequired;
425 : if (!ConfigurationManager.TryGetSetting(string.Format("{0}.IsRequired", eventName), out isRequired))
426 : isRequired = true;
427 :
428 : IEnumerable<Action<IMessage>> handlers = Routes.GetHandlers(@event, isRequired).Select(x => x.Delegate).ToList();
429 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
430 : if (!handlers.Any())
431 : Logger.LogDebug(string.Format("An event handler for '{0}' is not required.", eventName));
432 :
433 : foreach (Action<IMessage> handler in handlers)
434 : {
435 : IList<IEvent<TAuthenticationToken>> events;
436 : if (EventWaits.TryGetValue(@event.CorrelationId, out events))
437 : events.Add(@event);
438 : handler(@event);
439 : }
440 :
441 : Logger.LogInfo(string.Format("An event was sent of type {0}.", eventName));
442 : wasSuccessfull = true;
443 : }
444 : catch (Exception exception)
445 : {
446 : responseCode = "500";
447 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
448 : throw;
449 : }
450 : finally
451 : {
452 : mainStopWatch.Stop();
453 : TelemetryHelper.TrackDependency("InProcessBus/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
454 : }
455 : }
456 :
457 : /// <summary>
458 : /// Publishes the provided <paramref name="events"/> on the event bus.
459 : /// </summary>
460 1 : public virtual void Publish<TEvent>(IEnumerable<TEvent> events)
461 : where TEvent : IEvent<TAuthenticationToken>
462 : {
463 : IEnumerable<TEvent> sourceEvents = events.ToList();
464 :
465 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
466 : Stopwatch mainStopWatch = Stopwatch.StartNew();
467 : string responseCode = "500";
468 : bool wasSuccessfull = false;
469 :
470 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "InProcessBus" } };
471 : string telemetryName = "Events";
472 : string telemetryNames = string.Empty;
473 : foreach (TEvent @event in sourceEvents)
474 : {
475 : string subTelemetryName = string.Format("{0}/{1}/{2}", @event.GetType().FullName, @event.GetIdentity(), @event.Id);
476 : var telemeteredCommand = @event as ITelemeteredMessage;
477 : if (telemeteredCommand != null)
478 : subTelemetryName = telemeteredCommand.TelemetryName;
479 : telemetryNames = string.Format("{0}{1},", telemetryNames, subTelemetryName);
480 : }
481 : if (telemetryNames.Length > 0)
482 : telemetryNames = telemetryNames.Substring(0, telemetryNames.Length - 1);
483 : telemetryProperties.Add("Events", telemetryNames);
484 :
485 : try
486 : {
487 : foreach (TEvent @event in sourceEvents)
488 : Publish(@event);
489 :
490 : responseCode = "200";
491 : wasSuccessfull = true;
492 : }
493 : finally
494 : {
495 : mainStopWatch.Stop();
496 : TelemetryHelper.TrackDependency("InProcessBus/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
497 : }
498 : }
499 :
500 : #endregion
501 :
502 : #region Implementation of IPublishAndWaitCommandPublisher<TAuthenticationToken>
503 :
504 : /// <summary>
505 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/>
506 : /// </summary>
507 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
508 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
509 1 : public TEvent PublishAndWait<TCommand, TEvent>(TCommand command, IEventReceiver<TAuthenticationToken> eventReceiver = null) where TCommand : ICommand<TAuthenticationToken>
510 : {
511 : return PublishAndWait<TCommand, TEvent>(command, -1, eventReceiver);
512 : }
513 :
514 : /// <summary>
515 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
516 : /// </summary>
517 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
518 : /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see cref="F:System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
519 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
520 1 : public TEvent PublishAndWait<TCommand, TEvent>(TCommand command, int millisecondsTimeout, IEventReceiver<TAuthenticationToken> eventReceiver = null) where TCommand : ICommand<TAuthenticationToken>
521 : {
522 : return PublishAndWait(command, events => (TEvent)events.SingleOrDefault(@event => @event is TEvent), millisecondsTimeout, eventReceiver);
523 : }
524 :
525 : /// <summary>
526 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
527 : /// </summary>
528 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
529 : /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
530 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
531 1 : public TEvent PublishAndWait<TCommand, TEvent>(TCommand command, TimeSpan timeout, IEventReceiver<TAuthenticationToken> eventReceiver = null) where TCommand : ICommand<TAuthenticationToken>
532 : {
533 : long num = (long)timeout.TotalMilliseconds;
534 : if (num < -1L || num > int.MaxValue)
535 : throw new ArgumentOutOfRangeException("timeout", timeout, "SpinWait_SpinUntil_TimeoutWrong");
536 : return PublishAndWait<TCommand, TEvent>(command, (int)timeout.TotalMilliseconds, eventReceiver);
537 : }
538 :
539 : /// <summary>
540 : /// Publishes the provided <paramref name="command"></paramref> and waits until the specified condition is satisfied an event of <typeparamref name="TEvent"/>
541 : /// </summary>
542 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
543 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
544 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
545 1 : public TEvent PublishAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, IEventReceiver<TAuthenticationToken> eventReceiver = null) where TCommand : ICommand<TAuthenticationToken>
546 : {
547 : return PublishAndWait(command, condition, -1, eventReceiver);
548 : }
549 :
550 : /// <summary>
551 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
552 : /// </summary>
553 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
554 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
555 : /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see cref="F:System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
556 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
557 1 : public TEvent PublishAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, int millisecondsTimeout,
558 : IEventReceiver<TAuthenticationToken> eventReceiver = null) where TCommand : ICommand<TAuthenticationToken>
559 : {
560 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
561 : Stopwatch mainStopWatch = Stopwatch.StartNew();
562 : string responseCode = "200";
563 : bool wasSuccessfull = false;
564 :
565 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "InProcessBus" } };
566 : string telemetryName = string.Format("{0}/{1}", command.GetType().FullName, command.Id);
567 : var telemeteredCommand = command as ITelemeteredMessage;
568 : if (telemeteredCommand != null)
569 : telemetryName = telemeteredCommand.TelemetryName;
570 : telemetryName = string.Format("Command/{0}", telemetryName);
571 :
572 : TEvent result;
573 :
574 : try
575 : {
576 : if (eventReceiver != null)
577 : throw new NotSupportedException("Specifying a different event receiver is not yet supported.");
578 : RouteHandlerDelegate commandHandler;
579 : if (!PrepareAndValidateCommand(command, out commandHandler))
580 : return (TEvent)(object)null;
581 :
582 : result = (TEvent)(object)null;
583 : EventWaits.Add(command.CorrelationId, new List<IEvent<TAuthenticationToken>>());
584 :
585 : Action<IMessage> handler = commandHandler.Delegate;
586 : handler(command);
587 : Logger.LogInfo(string.Format("A command was sent of type {0}.", command.GetType().FullName));
588 : wasSuccessfull = true;
589 : }
590 : finally
591 : {
592 : mainStopWatch.Stop();
593 : TelemetryHelper.TrackDependency("InProcessBus/CommandBus", "Command", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
594 : }
595 :
596 : SpinWait.SpinUntil(() =>
597 : {
598 : IList<IEvent<TAuthenticationToken>> events = EventWaits[command.CorrelationId];
599 :
600 : result = condition(events);
601 :
602 : return result != null;
603 : }, millisecondsTimeout, SpinWait.DefaultSleepInMilliseconds);
604 :
605 : TelemetryHelper.TrackDependency("InProcessBus/CommandBus", "Command/AndWait", string.Format("Command/AndWait{0}", telemetryName.Substring(7)), null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
606 : return result;
607 : }
608 :
609 : /// <summary>
610 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
611 : /// </summary>
612 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
613 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
614 : /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
615 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
616 1 : public TEvent PublishAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, TimeSpan timeout,
617 : IEventReceiver<TAuthenticationToken> eventReceiver = null) where TCommand : ICommand<TAuthenticationToken>
618 : {
619 : long num = (long)timeout.TotalMilliseconds;
620 : if (num < -1L || num > int.MaxValue)
621 : throw new ArgumentOutOfRangeException("timeout", timeout, "SpinWait_SpinUntil_TimeoutWrong");
622 : return PublishAndWait(command, condition, (int)timeout.TotalMilliseconds, eventReceiver);
623 : }
624 :
625 : #endregion
626 :
627 : #region Implementation of IHandlerRegistrar
628 :
629 : /// <summary>
630 : /// Register an event or command handler that will listen and respond to events or commands.
631 : /// </summary>
632 1 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
633 : where TMessage : IMessage
634 : {
635 : Action<TMessage> registerableHandler = BusHelper.BuildTelemeteredActionHandler<TMessage, TAuthenticationToken>(TelemetryHelper, handler, holdMessageLock, "In-Process/Bus");
636 :
637 : Routes.RegisterHandler(registerableHandler, targetedType);
638 :
639 : TelemetryHelper.TrackEvent(string.Format("Cqrs/RegisterHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "In-Process/Bus" } });
640 : TelemetryHelper.Flush();
641 : }
642 :
643 : /// <summary>
644 : /// Register an event or command handler that will listen and respond to events or commands.
645 : /// </summary>
646 1 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true)
647 : where TMessage : IMessage
648 : {
649 : RegisterHandler(handler, null, holdMessageLock);
650 : }
651 :
652 : /// <summary>
653 : /// Register an event handler that will listen and respond to all events.
654 : /// </summary>
655 1 : public void RegisterGlobalEventHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true) where TMessage : IMessage
656 : {
657 : Action<TMessage> registerableHandler = BusHelper.BuildTelemeteredActionHandler<TMessage, TAuthenticationToken>(TelemetryHelper, handler, holdMessageLock, "In-Process/Bus");
658 :
659 : Routes.RegisterGlobalEventHandler(registerableHandler, holdMessageLock);
660 :
661 : TelemetryHelper.TrackEvent(string.Format("Cqrs/RegisterGlobalEventHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "In-Process/Bus" } });
662 : TelemetryHelper.Flush();
663 : }
664 :
665 : #endregion
666 :
667 : #region Implementation of ICommandReceiver
668 :
669 : /// <summary>
670 : /// Receives a <see cref="ICommand{TAuthenticationToken}"/> from the command bus.
671 : /// </summary>
672 1 : public virtual bool? ReceiveCommand(ICommand<TAuthenticationToken> command)
673 : {
674 : Send(command);
675 : return true;
676 : }
677 :
678 : /// <summary>
679 : /// Receives an <see cref="IEvent{TAuthenticationToken}"/> from the event bus.
680 : /// </summary>
681 1 : public virtual bool? ReceiveEvent(IEvent<TAuthenticationToken> @event)
682 : {
683 : Publish(@event);
684 : return true;
685 : }
686 :
687 : void ICommandReceiver.Start()
688 : {
689 : // This is in-process so doesn't need to do anything
690 : }
691 :
692 : #endregion
693 :
694 : #region Implementation of IEventReceiver
695 :
696 : void IEventReceiver.Start()
697 : {
698 : // This is in-process so doesn't need to do anything
699 : }
700 :
701 : #endregion
702 : }
703 : }
|