Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Linq;
13 : using cdmdotnet.Logging;
14 : using Cqrs.Authentication;
15 : using Cqrs.Bus;
16 : using Cqrs.Commands;
17 : using Cqrs.Configuration;
18 : using Cqrs.Events;
19 : using Cqrs.Infrastructure;
20 : using Cqrs.Messages;
21 :
22 : namespace Cqrs.Akka.Commands
23 : {
24 : /// <summary>
25 : /// A <see cref="ICommandPublisher{TAuthenticationToken}"/> that resolves handlers , executes the handler and then publishes the <see cref="ICommand{TAuthenticationToken}"/> on the public command bus.
26 : /// </summary>
27 : public class AkkaCommandBus<TAuthenticationToken>
28 : : IAkkaCommandSender<TAuthenticationToken>
29 : , ICommandHandlerRegistrar
30 1 : {
31 : protected static RouteManager Routes { get; private set; }
32 :
33 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
34 :
35 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
36 :
37 : protected IDependencyResolver DependencyResolver { get; private set; }
38 :
39 : static AkkaCommandBus()
40 : {
41 : Routes = new RouteManager();
42 : }
43 :
44 0 : public AkkaCommandBus(IBusHelper busHelper, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, IDependencyResolver dependencyResolver, ILogger logger, ICommandPublisher<TAuthenticationToken> commandSender, ICommandReceiver<TAuthenticationToken> commandReceiver)
45 : {
46 : Logger = logger;
47 : BusHelper = busHelper;
48 : AuthenticationTokenHelper = authenticationTokenHelper;
49 : CorrelationIdHelper = correlationIdHelper;
50 : DependencyResolver = dependencyResolver;
51 : EventWaits = new ConcurrentDictionary<Guid, IList<IEvent<TAuthenticationToken>>>();
52 : CommandSender = commandSender;
53 : CommandReceiver = commandReceiver;
54 : }
55 :
56 : protected IBusHelper BusHelper { get; private set; }
57 :
58 : protected ILogger Logger { get; private set; }
59 :
60 : protected ICommandPublisher<TAuthenticationToken> CommandSender { get; private set; }
61 :
62 : protected ICommandReceiver<TAuthenticationToken> CommandReceiver { get; private set; }
63 :
64 : protected IDictionary<Guid, IList<IEvent<TAuthenticationToken>>> EventWaits { get; private set; }
65 :
66 0 : protected virtual void PrepareCommand<TCommand>(TCommand command)
67 : where TCommand : ICommand<TAuthenticationToken>
68 : {
69 : if (command.AuthenticationToken == null || command.AuthenticationToken.Equals(default(TAuthenticationToken)))
70 : command.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
71 : command.CorrelationId = CorrelationIdHelper.GetCorrelationId();
72 :
73 : if (string.IsNullOrWhiteSpace(command.OriginatingFramework))
74 : command.OriginatingFramework = "Akka";
75 :
76 : var frameworks = new List<string>();
77 : if (command.Frameworks != null)
78 : frameworks.AddRange(command.Frameworks);
79 : frameworks.Add("Akka");
80 : command.Frameworks = frameworks;
81 : }
82 :
83 0 : protected virtual bool PrepareAndValidateCommand<TCommand>(TCommand command, out RouteHandlerDelegate commandHandler)
84 : where TCommand : ICommand<TAuthenticationToken>
85 : {
86 : Type commandType = command.GetType();
87 :
88 : if (command.Frameworks != null && command.Frameworks.Contains("Akka"))
89 : {
90 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
91 : if (command.Frameworks.Count() != 1)
92 : {
93 : Logger.LogInfo("The provided command has already been processed in Akka.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
94 : commandHandler = null;
95 : return false;
96 : }
97 : }
98 :
99 : ICommandValidator<TAuthenticationToken, TCommand> commandValidator = null;
100 : try
101 : {
102 : commandValidator = DependencyResolver.Resolve<ICommandValidator<TAuthenticationToken, TCommand>>();
103 : }
104 : catch (Exception exception)
105 : {
106 : Logger.LogDebug("Locating an ICommandValidator failed.", string.Format("{0}\\Handle({1})", GetType().FullName, commandType.FullName), exception);
107 : }
108 :
109 : if (commandValidator != null && !commandValidator.IsCommandValid(command))
110 : {
111 : Logger.LogInfo("The provided command is not valid.", string.Format("{0}\\Handle({1})", GetType().FullName, commandType.FullName));
112 : commandHandler = null;
113 : return false;
114 : }
115 :
116 : PrepareCommand(command);
117 :
118 : bool isRequired = BusHelper.IsEventRequired(commandType);
119 :
120 : commandHandler = Routes.GetSingleHandler(command, isRequired);
121 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
122 : if (commandHandler == null)
123 : Logger.LogDebug(string.Format("The command handler for '{0}' is not required.", commandType.FullName));
124 :
125 : return true;
126 : }
127 :
128 : #region Implementation of ICommandSender<TAuthenticationToken>
129 :
130 1 : public virtual void Publish<TCommand>(TCommand command)
131 : where TCommand : ICommand<TAuthenticationToken>
132 : {
133 : RouteHandlerDelegate commandHandler;
134 : if (!PrepareAndValidateCommand(command, out commandHandler))
135 : return;
136 :
137 : // This could be null if Akka won't handle the command and something else will.
138 : if (commandHandler != null)
139 : commandHandler.Delegate(command);
140 :
141 : // Let everything else know about the command (usually double handling a command is bad... but sometimes it might be useful... like pushing from AWS to Azure so both systems handle it... although an event really is the proper pattern to use here.
142 : CommandSender.Publish(command);
143 : }
144 :
145 0 : public virtual void Send<TCommand>(TCommand command)
146 : where TCommand : ICommand<TAuthenticationToken>
147 : {
148 : Publish(command);
149 : }
150 :
151 1 : public virtual void Publish<TCommand>(IEnumerable<TCommand> commands)
152 : where TCommand : ICommand<TAuthenticationToken>
153 : {
154 : IList<TCommand> sourceCommands = commands.ToList();
155 : foreach (TCommand command in sourceCommands)
156 : {
157 : RouteHandlerDelegate commandHandler;
158 : if (!PrepareAndValidateCommand(command, out commandHandler))
159 : return;
160 :
161 : // This could be null if Akka won't handle the command and something else will.
162 : if (commandHandler != null)
163 : commandHandler.Delegate(command);
164 : }
165 : // Let everything else know about the command (usually double handling a command is bad... but sometimes it might be useful... like pushing from AWS to Azure so both systems handle it... although an event really is the proper pattern to use here.
166 : CommandSender.Publish((IEnumerable<TCommand>)sourceCommands);
167 : }
168 :
169 0 : public virtual void Send<TCommand>(IEnumerable<TCommand> commands)
170 : where TCommand : ICommand<TAuthenticationToken>
171 : {
172 : Publish(commands);
173 : }
174 :
175 : /// <summary>
176 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/>
177 : /// </summary>
178 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
179 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
180 1 : public virtual TEvent SendAndWait<TCommand, TEvent>(TCommand command, IEventReceiver<TAuthenticationToken> eventReceiver = null)
181 : where TCommand : ICommand<TAuthenticationToken>
182 : {
183 : return SendAndWait<TCommand, TEvent>(command, -1, eventReceiver);
184 : }
185 :
186 : /// <summary>
187 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
188 : /// </summary>
189 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
190 : /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see cref="F:System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
191 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
192 1 : public virtual TEvent SendAndWait<TCommand, TEvent>(TCommand command, int millisecondsTimeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
193 : where TCommand : ICommand<TAuthenticationToken>
194 : {
195 : return SendAndWait(command, events => (TEvent)events.SingleOrDefault(@event => @event is TEvent), millisecondsTimeout, eventReceiver);
196 : }
197 :
198 : /// <summary>
199 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
200 : /// </summary>
201 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
202 : /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
203 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
204 1 : public virtual TEvent SendAndWait<TCommand, TEvent>(TCommand command, TimeSpan timeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
205 : where TCommand : ICommand<TAuthenticationToken>
206 : {
207 : long num = (long)timeout.TotalMilliseconds;
208 : if (num < -1L || num > int.MaxValue)
209 : throw new ArgumentOutOfRangeException("timeout", timeout, "SpinWait_SpinUntil_TimeoutWrong");
210 : return SendAndWait<TCommand, TEvent>(command, (int)timeout.TotalMilliseconds, eventReceiver);
211 : }
212 :
213 : /// <summary>
214 : /// Sends the provided <paramref name="command"></paramref> and waits until the specified condition is satisfied an event of <typeparamref name="TEvent"/>
215 : /// </summary>
216 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
217 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
218 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
219 1 : public virtual TEvent SendAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, IEventReceiver<TAuthenticationToken> eventReceiver = null)
220 : where TCommand : ICommand<TAuthenticationToken>
221 : {
222 : return SendAndWait(command, condition, -1, eventReceiver);
223 : }
224 :
225 : /// <summary>
226 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
227 : /// </summary>
228 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
229 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
230 : /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see cref="F:System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
231 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
232 1 : public virtual TEvent SendAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, int millisecondsTimeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
233 : where TCommand : ICommand<TAuthenticationToken>
234 : {
235 : if (eventReceiver != null)
236 : throw new NotSupportedException("Specifying a different event receiver is not yet supported.");
237 :
238 : TEvent result = (TEvent)(object)null;
239 : EventWaits.Add(command.CorrelationId, new List<IEvent<TAuthenticationToken>>());
240 :
241 : Send(command);
242 :
243 : SpinWait.SpinUntil(() =>
244 : {
245 : IList<IEvent<TAuthenticationToken>> events = EventWaits[command.CorrelationId];
246 :
247 : result = condition(events);
248 :
249 : return result != null;
250 : }, millisecondsTimeout, SpinWait.DefaultSleepInMilliseconds);
251 :
252 : return result;
253 : }
254 :
255 : /// <summary>
256 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
257 : /// </summary>
258 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
259 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
260 : /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
261 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
262 1 : public virtual TEvent SendAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, TimeSpan timeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
263 : where TCommand : ICommand<TAuthenticationToken>
264 : {
265 : long num = (long)timeout.TotalMilliseconds;
266 : if (num < -1L || num > int.MaxValue)
267 : throw new ArgumentOutOfRangeException("timeout", timeout, "SpinWait_SpinUntil_TimeoutWrong");
268 : return SendAndWait(command, condition, (int)timeout.TotalMilliseconds, eventReceiver);
269 : }
270 :
271 : #endregion
272 :
273 : #region Implementation of IHandlerRegistrar
274 :
275 : /// <summary>
276 : /// Register an event or command handler that will listen and respond to events or commands.
277 : /// </summary>
278 1 : public void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
279 : where TMessage : IMessage
280 : {
281 : Routes.RegisterHandler(handler, targetedType, holdMessageLock);
282 : }
283 :
284 : /// <summary>
285 : /// Register an event or command handler that will listen and respond to events or commands.
286 : /// </summary>
287 1 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true)
288 : where TMessage : IMessage
289 : {
290 : RegisterHandler(handler, null);
291 : }
292 :
293 : #endregion
294 : }
295 : }
|