Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using Akka.Actor;
13 : using cdmdotnet.Logging;
14 : using Cqrs.Authentication;
15 : using Cqrs.Bus;
16 : using Cqrs.Events;
17 : using Cqrs.Messages;
18 :
19 : namespace Cqrs.Akka.Events
20 : {
21 : /// <summary>
22 : /// An <see cref="IEventPublisher{TAuthenticationToken}"/> that proxies <see cref="IEvent{TAuthenticationToken}"/> back onto the <see cref="IActorRef"/> and then publishes the <see cref="IEvent{TAuthenticationToken}"/> on the public event bus.
23 : /// </summary>
24 : public class AkkaEventBus<TAuthenticationToken>
25 : : IAkkaEventPublisher<TAuthenticationToken>
26 : , IEventHandlerRegistrar
27 1 : {
28 : /// <summary>
29 : /// Gets the <see cref="RouteManager"/>
30 : /// </summary>
31 : protected static RouteManager Routes { get; private set; }
32 :
33 : /// <summary>
34 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}">Authentication Token Helper</see>
35 : /// </summary>
36 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
37 :
38 : /// <summary>
39 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>
40 : /// </summary>
41 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
42 :
43 : /// <summary>
44 : /// Gets or sets the <see cref="IBusHelper"/>
45 : /// </summary>
46 : protected IBusHelper BusHelper { get; private set; }
47 :
48 : /// <summary>
49 : /// Gets or sets the <see cref="ILogger"/>
50 : /// </summary>
51 : protected ILogger Logger { get; private set; }
52 :
53 : /// <summary>
54 : /// Gets or sets the <see cref="IEventPublisher{TAuthenticationToken}"/>
55 : /// </summary>
56 : protected IEventPublisher<TAuthenticationToken> EventPublisher { get; private set; }
57 :
58 : /// <summary>
59 : /// Gets or sets the <see cref="IEventReceiver{TAuthenticationToken}"/>
60 : /// </summary>
61 : protected IEventReceiver<TAuthenticationToken> EventReceiver { get; private set; }
62 :
63 : static AkkaEventBus()
64 : {
65 : Routes = new RouteManager();
66 : }
67 :
68 : /// <summary>
69 : /// Instantiates a new instance of <see cref="AkkaEventBus{TAuthenticationToken}"/>
70 : /// </summary>
71 1 : public AkkaEventBus(IBusHelper busHelper, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IEventPublisher<TAuthenticationToken> eventPublisher, IEventReceiver<TAuthenticationToken> eventReceiver)
72 : {
73 : BusHelper = busHelper;
74 : AuthenticationTokenHelper = authenticationTokenHelper;
75 : CorrelationIdHelper = correlationIdHelper;
76 : Logger = logger;
77 : EventPublisher = eventPublisher;
78 : EventReceiver = eventReceiver;
79 : }
80 :
81 : #region Implementation of IEventPublisher<TAuthenticationToken>
82 :
83 : /// <summary>
84 : /// Publishes the provided <paramref name="event"/> on the event bus.
85 : /// </summary>
86 1 : public void Publish<TEvent>(TEvent @event)
87 : where TEvent : IEvent<TAuthenticationToken>
88 : {
89 : IEnumerable<RouteHandlerDelegate> handlers;
90 : if (!PrepareAndValidateEvent(@event, "Akka", out handlers))
91 : return;
92 :
93 : // This could be null if Akka won't handle the command and something else will.
94 : foreach (RouteHandlerDelegate eventHandler in handlers)
95 : eventHandler.Delegate(@event);
96 :
97 : // Let everything else know about the event
98 : EventPublisher.Publish(@event);
99 : }
100 :
101 : /// <summary>
102 : /// Publishes the provided <paramref name="events"/> on the event bus.
103 : /// </summary>
104 1 : public virtual void Publish<TEvent>(IEnumerable<TEvent> events)
105 : where TEvent : IEvent<TAuthenticationToken>
106 : {
107 : events = events.ToList();
108 : foreach (TEvent @event in events)
109 : {
110 : IEnumerable<RouteHandlerDelegate> handlers;
111 : if (!PrepareAndValidateEvent(@event, "Akka", out handlers))
112 : return;
113 :
114 : // This could be null if Akka won't handle the command and something else will.
115 : foreach (RouteHandlerDelegate eventHandler in handlers)
116 : eventHandler.Delegate(@event);
117 : }
118 :
119 : // Let everything else know about the event
120 : EventPublisher.Publish(events);
121 : }
122 :
123 : #endregion
124 :
125 : /// <summary>
126 : /// Prepares an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
127 : /// </summary>
128 : /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
129 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
130 : /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
131 1 : public virtual void PrepareEvent<TEvent>(TEvent @event, string framework)
132 : where TEvent : IEvent<TAuthenticationToken>
133 : {
134 : if (@event.AuthenticationToken == null || @event.AuthenticationToken.Equals(default(TAuthenticationToken)))
135 : @event.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
136 : @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
137 :
138 : if (string.IsNullOrWhiteSpace(@event.OriginatingFramework))
139 : {
140 : @event.TimeStamp = DateTimeOffset.UtcNow;
141 : @event.OriginatingFramework = framework;
142 : }
143 : var frameworks = new List<string>();
144 : if (@event.Frameworks != null)
145 : frameworks.AddRange(@event.Frameworks);
146 : frameworks.Add(framework);
147 : @event.Frameworks = frameworks;
148 : }
149 :
150 : /// <summary>
151 : /// Prepares and validates an <see cref="IEvent{TAuthenticationToken}"/> to be sent specifying the framework it is sent via.
152 : /// </summary>
153 : /// <typeparam name="TEvent">The <see cref="Type"/> of<see cref="IEvent{TAuthenticationToken}"/> being sent.</typeparam>
154 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to send.</param>
155 : /// <param name="framework">The framework the <paramref name="event"/> is being sent from.</param>
156 : /// <param name="handlers">The located <see cref="RouteHandlerDelegate">handlers</see> to be executed by passing the <paramref name="event"/>.</param>
157 1 : public virtual bool PrepareAndValidateEvent<TEvent>(TEvent @event, string framework, out IEnumerable<RouteHandlerDelegate> handlers)
158 : where TEvent : IEvent<TAuthenticationToken>
159 : {
160 : Type eventType = @event.GetType();
161 :
162 : if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
163 : {
164 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
165 : if (@event.Frameworks.Count() != 1)
166 : {
167 : Logger.LogInfo("The provided event has already been processed in Akka.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, eventType.FullName));
168 : handlers = Enumerable.Empty<RouteHandlerDelegate>();
169 : return false;
170 : }
171 : }
172 :
173 : PrepareEvent(@event, framework);
174 :
175 : bool isRequired = BusHelper.IsEventRequired(eventType);
176 :
177 : handlers = Routes.GetHandlers(@event, isRequired);
178 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
179 : if (handlers == null || !handlers.Any())
180 : Logger.LogDebug(string.Format("An event handler for '{0}' is not required.", eventType.FullName));
181 :
182 : return true;
183 : }
184 :
185 : #region Implementation of IHandlerRegistrar
186 :
187 : /// <summary>
188 : /// Register an event or command handler that will listen and respond to events or commands.
189 : /// </summary>
190 : /// <remarks>
191 : /// In many cases the <paramref name="targetedType"/> will be the event handler class itself, what you actually want is the target of what is being updated
192 : /// </remarks>
193 1 : public void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true) where TMessage : IMessage
194 : {
195 : Routes.RegisterHandler(handler, targetedType, holdMessageLock);
196 : }
197 :
198 : /// <summary>
199 : /// Register an event or command handler that will listen and respond to events or commands.
200 : /// </summary>
201 1 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true) where TMessage : IMessage
202 : {
203 : RegisterHandler(handler, null);
204 : }
205 :
206 : /// <summary>
207 : /// Register an event handler that will listen and respond to all events.
208 : /// </summary>
209 1 : public void RegisterGlobalEventHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true) where TMessage : IMessage
210 : {
211 : Routes.RegisterGlobalEventHandler(handler, holdMessageLock);
212 : }
213 :
214 : #endregion
215 : }
216 : }
|