Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Linq;
13 : using cdmdotnet.Logging;
14 : using Cqrs.Authentication;
15 : using Cqrs.Bus;
16 : using Cqrs.Commands;
17 : using Cqrs.Configuration;
18 : using Cqrs.Events;
19 : using Cqrs.Infrastructure;
20 : using Cqrs.Messages;
21 :
22 : namespace Cqrs.Akka.Commands
23 : {
24 : /// <summary>
25 : /// A <see cref="ICommandPublisher{TAuthenticationToken}"/> that resolves handlers , executes the handler and then publishes the <see cref="ICommand{TAuthenticationToken}"/> on the public command bus.
26 : /// </summary>
27 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
28 : public class AkkaCommandBus<TAuthenticationToken>
29 : : IAkkaCommandPublisher<TAuthenticationToken>
30 : , ICommandHandlerRegistrar
31 1 : {
32 : /// <summary>
33 : /// Gets the <see cref="RouteManager"/>
34 : /// </summary>
35 : protected static RouteManager Routes { get; private set; }
36 :
37 : /// <summary>
38 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}">Authentication Token Helper</see>
39 : /// </summary>
40 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
41 :
42 : /// <summary>
43 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>
44 : /// </summary>
45 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
46 :
47 : /// <summary>
48 : /// Gets or sets the <see cref="IDependencyResolver"/>
49 : /// </summary>
50 : protected IDependencyResolver DependencyResolver { get; private set; }
51 :
52 : /// <summary>
53 : /// Gets or sets the <see cref="IBusHelper"/>
54 : /// </summary>
55 : protected IBusHelper BusHelper { get; private set; }
56 :
57 : /// <summary>
58 : /// Gets or sets the <see cref="ILogger"/>
59 : /// </summary>
60 : protected ILogger Logger { get; private set; }
61 :
62 : /// <summary>
63 : /// Gets or sets the <see cref="ICommandPublisher{TAuthenticationToken}"/>
64 : /// </summary>
65 : protected ICommandPublisher<TAuthenticationToken> CommandPublisher { get; private set; }
66 :
67 : /// <summary>
68 : /// Gets or sets the <see cref="ICommandReceiver{TAuthenticationToken}"/>
69 : /// </summary>
70 : protected ICommandReceiver<TAuthenticationToken> CommandReceiver { get; private set; }
71 :
72 : /// <summary>
73 : /// Gets or sets the current list of events waiting to be evaluated for <see cref="PublishAndWait{TCommand,TEvent}(TCommand,Cqrs.Events.IEventReceiver{TAuthenticationToken})"/>
74 : /// </summary>
75 : protected IDictionary<Guid, IList<IEvent<TAuthenticationToken>>> EventWaits { get; private set; }
76 :
77 : static AkkaCommandBus()
78 : {
79 : Routes = new RouteManager();
80 : }
81 :
82 : /// <summary>
83 : /// Instantiates a new instance of <see cref="AkkaCommandBus{TAuthenticationToken}"/>
84 : /// </summary>
85 1 : public AkkaCommandBus(IBusHelper busHelper, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, IDependencyResolver dependencyResolver, ILogger logger, ICommandPublisher<TAuthenticationToken> commandPublisher, ICommandReceiver<TAuthenticationToken> commandReceiver)
86 : {
87 : Logger = logger;
88 : BusHelper = busHelper;
89 : AuthenticationTokenHelper = authenticationTokenHelper;
90 : CorrelationIdHelper = correlationIdHelper;
91 : DependencyResolver = dependencyResolver;
92 : EventWaits = new ConcurrentDictionary<Guid, IList<IEvent<TAuthenticationToken>>>();
93 : CommandPublisher = commandPublisher;
94 : CommandReceiver = commandReceiver;
95 : }
96 :
97 : /// <summary>
98 : /// Sets the
99 : /// <see cref="IMessageWithAuthenticationToken{TAuthenticationToken}.AuthenticationToken"/>,
100 : /// <see cref="IMessage.CorrelationId"/>,
101 : /// <see cref="IMessage.OriginatingFramework"/> to "Akka" and
102 : /// adds a value of "Akka" to the <see cref="IMessage.Frameworks"/>
103 : /// if not already done so
104 : /// </summary>
105 1 : protected virtual void PrepareCommand<TCommand>(TCommand command)
106 : where TCommand : ICommand<TAuthenticationToken>
107 : {
108 : if (command.AuthenticationToken == null || command.AuthenticationToken.Equals(default(TAuthenticationToken)))
109 : command.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
110 : command.CorrelationId = CorrelationIdHelper.GetCorrelationId();
111 :
112 : if (string.IsNullOrWhiteSpace(command.OriginatingFramework))
113 : command.OriginatingFramework = "Akka";
114 :
115 : var frameworks = new List<string>();
116 : if (command.Frameworks != null)
117 : frameworks.AddRange(command.Frameworks);
118 : frameworks.Add("Akka");
119 : command.Frameworks = frameworks;
120 : }
121 :
122 : /// <summary>
123 : /// Locates a suitable <see cref="ICommandValidator{TAuthenticationToken,TCommand}"/> to validate the provided <paramref name="command"/> and validates the provided <paramref name="command"/> if one is located
124 : /// Calls <see cref="PrepareCommand{TCommand}"/>
125 : /// Checks if the provided <paramref name="command"/> is required to be processed
126 : /// Locates a single <see cref="RouteHandlerDelegate">command handler</see> for the provided <paramref name="command"/>
127 : /// </summary>
128 : /// <returns>
129 : /// False if a suitable <see cref="ICommandValidator{TAuthenticationToken,TCommand}"/> is located and the provided <paramref name="command"/> fails validation,
130 : /// False if no <see cref="RouteHandlerDelegate">command handler</see> is found but the command isn't required to be handled,
131 : /// True otherwise.
132 : /// </returns>
133 1 : protected virtual bool PrepareAndValidateCommand<TCommand>(TCommand command, out RouteHandlerDelegate commandHandler)
134 : where TCommand : ICommand<TAuthenticationToken>
135 : {
136 : Type commandType = command.GetType();
137 :
138 : if (command.Frameworks != null && command.Frameworks.Contains("Akka"))
139 : {
140 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
141 : if (command.Frameworks.Count() != 1)
142 : {
143 : Logger.LogInfo("The provided command has already been processed in Akka.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
144 : commandHandler = null;
145 : return false;
146 : }
147 : }
148 :
149 : ICommandValidator<TAuthenticationToken, TCommand> commandValidator = null;
150 : try
151 : {
152 : commandValidator = DependencyResolver.Resolve<ICommandValidator<TAuthenticationToken, TCommand>>();
153 : }
154 : catch (Exception exception)
155 : {
156 : Logger.LogDebug("Locating an ICommandValidator failed.", string.Format("{0}\\Handle({1})", GetType().FullName, commandType.FullName), exception);
157 : }
158 :
159 : if (commandValidator != null && !commandValidator.IsCommandValid(command))
160 : {
161 : Logger.LogInfo("The provided command is not valid.", string.Format("{0}\\Handle({1})", GetType().FullName, commandType.FullName));
162 : commandHandler = null;
163 : return false;
164 : }
165 :
166 : PrepareCommand(command);
167 :
168 : bool isRequired = BusHelper.IsEventRequired(commandType);
169 :
170 : commandHandler = Routes.GetSingleHandler(command, isRequired);
171 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
172 : if (commandHandler == null)
173 : Logger.LogDebug(string.Format("The command handler for '{0}' is not required.", commandType.FullName));
174 :
175 : return true;
176 : }
177 :
178 : #region Implementation of ICommandPublisher<TAuthenticationToken>
179 :
180 : /// <summary>
181 : /// Publishes the provided <paramref name="command"/> on the command bus.
182 : /// </summary>
183 1 : public virtual void Publish<TCommand>(TCommand command)
184 : where TCommand : ICommand<TAuthenticationToken>
185 : {
186 : RouteHandlerDelegate commandHandler;
187 : if (!PrepareAndValidateCommand(command, out commandHandler))
188 : return;
189 :
190 : // This could be null if Akka won't handle the command and something else will.
191 : if (commandHandler != null)
192 : commandHandler.Delegate(command);
193 :
194 : // Let everything else know about the command (usually double handling a command is bad... but sometimes it might be useful... like pushing from AWS to Azure so both systems handle it... although an event really is the proper pattern to use here.
195 : CommandPublisher.Publish(command);
196 : }
197 :
198 : /// <summary>
199 : /// Publishes the provided <paramref name="commands"/> on the command bus.
200 : /// </summary>
201 1 : public virtual void Publish<TCommand>(IEnumerable<TCommand> commands)
202 : where TCommand : ICommand<TAuthenticationToken>
203 : {
204 : IList<TCommand> sourceCommands = commands.ToList();
205 : foreach (TCommand command in sourceCommands)
206 : {
207 : RouteHandlerDelegate commandHandler;
208 : if (!PrepareAndValidateCommand(command, out commandHandler))
209 : return;
210 :
211 : // This could be null if Akka won't handle the command and something else will.
212 : if (commandHandler != null)
213 : commandHandler.Delegate(command);
214 : }
215 : // Let everything else know about the command (usually double handling a command is bad... but sometimes it might be useful... like pushing from AWS to Azure so both systems handle it... although an event really is the proper pattern to use here.
216 : CommandPublisher.Publish((IEnumerable<TCommand>)sourceCommands);
217 : }
218 :
219 : /// <summary>
220 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/>
221 : /// </summary>
222 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
223 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
224 1 : public virtual TEvent PublishAndWait<TCommand, TEvent>(TCommand command, IEventReceiver<TAuthenticationToken> eventReceiver = null)
225 : where TCommand : ICommand<TAuthenticationToken>
226 : {
227 : return PublishAndWait<TCommand, TEvent>(command, -1, eventReceiver);
228 : }
229 :
230 : /// <summary>
231 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
232 : /// </summary>
233 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
234 : /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see cref="F:System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
235 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
236 1 : public virtual TEvent PublishAndWait<TCommand, TEvent>(TCommand command, int millisecondsTimeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
237 : where TCommand : ICommand<TAuthenticationToken>
238 : {
239 : return PublishAndWait(command, events => (TEvent)events.SingleOrDefault(@event => @event is TEvent), millisecondsTimeout, eventReceiver);
240 : }
241 :
242 : /// <summary>
243 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
244 : /// </summary>
245 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
246 : /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
247 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
248 1 : public virtual TEvent PublishAndWait<TCommand, TEvent>(TCommand command, TimeSpan timeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
249 : where TCommand : ICommand<TAuthenticationToken>
250 : {
251 : long num = (long)timeout.TotalMilliseconds;
252 : if (num < -1L || num > int.MaxValue)
253 : throw new ArgumentOutOfRangeException("timeout", timeout, "SpinWait_SpinUntil_TimeoutWrong");
254 : return PublishAndWait<TCommand, TEvent>(command, (int)timeout.TotalMilliseconds, eventReceiver);
255 : }
256 :
257 : /// <summary>
258 : /// Publishes the provided <paramref name="command"></paramref> and waits until the specified condition is satisfied an event of <typeparamref name="TEvent"/>
259 : /// </summary>
260 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
261 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
262 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
263 1 : public virtual TEvent PublishAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, IEventReceiver<TAuthenticationToken> eventReceiver = null)
264 : where TCommand : ICommand<TAuthenticationToken>
265 : {
266 : return PublishAndWait(command, condition, -1, eventReceiver);
267 : }
268 :
269 : /// <summary>
270 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
271 : /// </summary>
272 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
273 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
274 : /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see cref="F:System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
275 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
276 1 : public virtual TEvent PublishAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, int millisecondsTimeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
277 : where TCommand : ICommand<TAuthenticationToken>
278 : {
279 : if (eventReceiver != null)
280 : throw new NotSupportedException("Specifying a different event receiver is not yet supported.");
281 :
282 : TEvent result = (TEvent)(object)null;
283 : EventWaits.Add(command.CorrelationId, new List<IEvent<TAuthenticationToken>>());
284 :
285 : Publish(command);
286 :
287 : SpinWait.SpinUntil(() =>
288 : {
289 : IList<IEvent<TAuthenticationToken>> events = EventWaits[command.CorrelationId];
290 :
291 : result = condition(events);
292 :
293 : return result != null;
294 : }, millisecondsTimeout, SpinWait.DefaultSleepInMilliseconds);
295 :
296 : return result;
297 : }
298 :
299 : /// <summary>
300 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
301 : /// </summary>
302 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
303 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
304 : /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
305 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
306 1 : public virtual TEvent PublishAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, TimeSpan timeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
307 : where TCommand : ICommand<TAuthenticationToken>
308 : {
309 : long num = (long)timeout.TotalMilliseconds;
310 : if (num < -1L || num > int.MaxValue)
311 : throw new ArgumentOutOfRangeException("timeout", timeout, "SpinWait_SpinUntil_TimeoutWrong");
312 : return PublishAndWait(command, condition, (int)timeout.TotalMilliseconds, eventReceiver);
313 : }
314 :
315 : #endregion
316 :
317 : #region Implementation of IHandlerRegistrar
318 :
319 : /// <summary>
320 : /// Register an event or command handler that will listen and respond to events or commands.
321 : /// </summary>
322 1 : public void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
323 : where TMessage : IMessage
324 : {
325 : Routes.RegisterHandler(handler, targetedType, holdMessageLock);
326 : }
327 :
328 : /// <summary>
329 : /// Register an event or command handler that will listen and respond to events or commands.
330 : /// </summary>
331 1 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true)
332 : where TMessage : IMessage
333 : {
334 : RegisterHandler(handler, null);
335 : }
336 :
337 : #endregion
338 : }
339 : }
|