Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using Akka.Actor;
13 : using cdmdotnet.Logging;
14 : using Cqrs.Authentication;
15 : using Cqrs.Bus;
16 : using Cqrs.Events;
17 : using Cqrs.Messages;
18 :
19 : namespace Cqrs.Akka.Events
20 : {
21 : /// <summary>
22 : /// An <see cref="IEventPublisher{TAuthenticationToken}"/> that proxies <see cref="IEvent{TAuthenticationToken}"/> back onto the <see cref="IActorRef"/> and then publishes the <see cref="IEvent{TAuthenticationToken}"/> on the public event bus.
23 : /// </summary>
24 : public class AkkaEventBus<TAuthenticationToken>
25 : : IAkkaEventPublisher<TAuthenticationToken>
26 : , IEventHandlerRegistrar
27 1 : {
28 : protected static RouteManager Routes { get; private set; }
29 :
30 : static AkkaEventBus()
31 : {
32 : Routes = new RouteManager();
33 : }
34 :
35 : protected IEventPublisher<TAuthenticationToken> EventPublisher { get; private set; }
36 :
37 : protected IEventReceiver<TAuthenticationToken> EventReceiver { get; private set; }
38 :
39 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
40 :
41 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
42 :
43 : protected IBusHelper BusHelper { get; private set; }
44 :
45 : protected ILogger Logger { get; private set; }
46 :
47 0 : public AkkaEventBus(IBusHelper busHelper, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IEventPublisher<TAuthenticationToken> eventPublisher, IEventReceiver<TAuthenticationToken> eventReceiver)
48 : {
49 : BusHelper = busHelper;
50 : AuthenticationTokenHelper = authenticationTokenHelper;
51 : CorrelationIdHelper = correlationIdHelper;
52 : Logger = logger;
53 : EventPublisher = eventPublisher;
54 : EventReceiver = eventReceiver;
55 : }
56 :
57 : #region Implementation of IEventPublisher<TAuthenticationToken>
58 :
59 0 : public void Publish<TEvent>(TEvent @event)
60 : where TEvent : IEvent<TAuthenticationToken>
61 : {
62 : IEnumerable<RouteHandlerDelegate> handlers;
63 : if (!PrepareAndValidateEvent(@event, "Akka", out handlers))
64 : return;
65 :
66 : // This could be null if Akka won't handle the command and something else will.
67 : foreach (RouteHandlerDelegate eventHandler in handlers)
68 : eventHandler.Delegate(@event);
69 :
70 : // Let everything else know about the event
71 : EventPublisher.Publish(@event);
72 : }
73 :
74 0 : public virtual void Publish<TEvent>(IEnumerable<TEvent> events)
75 : where TEvent : IEvent<TAuthenticationToken>
76 : {
77 : events = events.ToList();
78 : foreach (TEvent @event in events)
79 : {
80 : IEnumerable<RouteHandlerDelegate> handlers;
81 : if (!PrepareAndValidateEvent(@event, "Akka", out handlers))
82 : return;
83 :
84 : // This could be null if Akka won't handle the command and something else will.
85 : foreach (RouteHandlerDelegate eventHandler in handlers)
86 : eventHandler.Delegate(@event);
87 : }
88 :
89 : // Let everything else know about the event
90 : EventPublisher.Publish(events);
91 : }
92 :
93 : #endregion
94 :
95 0 : public virtual void PrepareEvent<TEvent>(TEvent @event, string framework)
96 : where TEvent : IEvent<TAuthenticationToken>
97 : {
98 : if (@event.AuthenticationToken == null || @event.AuthenticationToken.Equals(default(TAuthenticationToken)))
99 : @event.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
100 : @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
101 :
102 : if (string.IsNullOrWhiteSpace(@event.OriginatingFramework))
103 : {
104 : @event.TimeStamp = DateTimeOffset.UtcNow;
105 : @event.OriginatingFramework = framework;
106 : }
107 : var frameworks = new List<string>();
108 : if (@event.Frameworks != null)
109 : frameworks.AddRange(@event.Frameworks);
110 : frameworks.Add(framework);
111 : @event.Frameworks = frameworks;
112 : }
113 :
114 0 : public virtual bool PrepareAndValidateEvent<TEvent>(TEvent @event, string framework, out IEnumerable<RouteHandlerDelegate> handlers)
115 : where TEvent : IEvent<TAuthenticationToken>
116 : {
117 : Type eventType = @event.GetType();
118 :
119 : if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
120 : {
121 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
122 : if (@event.Frameworks.Count() != 1)
123 : {
124 : Logger.LogInfo("The provided event has already been processed in Akka.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, eventType.FullName));
125 : handlers = Enumerable.Empty<RouteHandlerDelegate>();
126 : return false;
127 : }
128 : }
129 :
130 : PrepareEvent(@event, framework);
131 :
132 :
133 : bool isRequired = BusHelper.IsEventRequired(eventType);
134 :
135 : handlers = Routes.GetHandlers(@event, isRequired);
136 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
137 : if (handlers == null || !handlers.Any())
138 : Logger.LogDebug(string.Format("An event handler for '{0}' is not required.", eventType.FullName));
139 :
140 : return true;
141 : }
142 :
143 :
144 : #region Implementation of IHandlerRegistrar
145 :
146 : /// <summary>
147 : /// Register an event or command handler that will listen and respond to events or commands.
148 : /// </summary>
149 : /// <remarks>
150 : /// In many cases the <paramref name="targetedType"/> will be the event handler class itself, what you actually want is the target of what is being updated
151 : /// </remarks>
152 1 : public void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true) where TMessage : IMessage
153 : {
154 : Routes.RegisterHandler(handler, targetedType, holdMessageLock);
155 : }
156 :
157 : /// <summary>
158 : /// Register an event or command handler that will listen and respond to events or commands.
159 : /// </summary>
160 1 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true) where TMessage : IMessage
161 : {
162 : RegisterHandler(handler, null);
163 : }
164 :
165 : #endregion
166 : }
167 : }
|