Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Linq;
13 : using Akka.Actor;
14 : using Akka.Event;
15 : using Chinchilla.Logging;
16 : using Cqrs.Akka.Configuration;
17 : using Cqrs.Akka.Domain;
18 : using Cqrs.Configuration;
19 : using Cqrs.Domain;
20 : using Cqrs.Domain.Factories;
21 : using Cqrs.Ninject.Configuration;
22 : using Ninject;
23 : using Ninject.Activation;
24 :
25 : namespace Cqrs.Ninject.Akka
26 : {
27 : /// <summary>
28 : /// Provides an ability to resolve instances of objects and Akka.NET objects using Ninject
29 : /// </summary>
30 : public class AkkaNinjectDependencyResolver
31 : : NinjectDependencyResolver
32 : , IAkkaAggregateResolver
33 : , IAkkaSagaResolver
34 : , IHandlerResolver
35 1 : {
36 : /// <summary>
37 : /// The inner resolver used by Akka.NET
38 : /// </summary>
39 : protected global::Akka.DI.Ninject.NinjectDependencyResolver RawAkkaNinjectDependencyResolver { get; set; }
40 :
41 : /// <summary>
42 : /// The <see cref="ActorSystem"/> as part of Akka.NET.
43 : /// </summary>
44 : protected ActorSystem AkkaSystem { get; private set; }
45 :
46 : /// <summary>
47 : /// A generic type, quick reference, lookup for fast resolving of Akka.NET objects since the patterns calls for them to be treated like statics
48 : /// </summary>
49 : protected IDictionary<Type, IActorRef> AkkaActors { get; private set; }
50 :
51 : /// <summary>
52 : /// The <see cref="IAggregateFactory"/> that will be used to create new instances of Akka.NET objects.
53 : /// </summary>
54 : protected IAggregateFactory AggregateFactory { get; private set; }
55 :
56 : /// <summary>
57 : /// Instantiates a new instance of <see cref="AkkaNinjectDependencyResolver"/>
58 : /// </summary>
59 1 : public AkkaNinjectDependencyResolver(IKernel kernel, ActorSystem system)
60 : : base(kernel)
61 : {
62 : RawAkkaNinjectDependencyResolver = new global::Akka.DI.Ninject.NinjectDependencyResolver(kernel, AkkaSystem = system);
63 : AkkaActors = new ConcurrentDictionary<Type, IActorRef>();
64 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
65 : AggregateFactory = Resolve<IAggregateFactory>();
66 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
67 : }
68 :
69 : /// <summary>
70 : /// Checks if an instance of <see cref="IDependencyResolver"/> is already registered, if one is registered, it in unregistered and this instance is registered as the <see cref="IDependencyResolver"/>.
71 : /// It then checks if an instance of <see cref="IAkkaAggregateResolver"/> is already registered, if one is registered, it in unregistered and this instance is registered as the <see cref="IAkkaAggregateResolver"/>
72 : /// </summary>
73 1 : protected override void BindDependencyResolver()
74 : {
75 : bool isDependencyResolverBound = Kernel.GetBindings(typeof(IDependencyResolver)).Any();
76 : if (isDependencyResolverBound)
77 : Kernel.Unbind<IDependencyResolver>();
78 : Kernel.Bind<IDependencyResolver>()
79 : .ToConstant(this)
80 : .InSingletonScope();
81 :
82 : isDependencyResolverBound = Kernel.GetBindings(typeof(IAkkaAggregateResolver)).Any();
83 : if (!isDependencyResolverBound)
84 : {
85 : Kernel.Bind<IAkkaAggregateResolver>()
86 : .ToConstant(this)
87 : .InSingletonScope();
88 : }
89 : }
90 :
91 : /// <summary>
92 : /// Starts the <see cref="AkkaNinjectDependencyResolver"/>
93 : /// </summary>
94 : /// <remarks>
95 : /// This exists so the static constructor can be triggered.
96 : /// </remarks>
97 1 : public new static void Start(IKernel kernel = null, bool prepareProvidedKernel = false)
98 : {
99 : // Create the ActorSystem and Dependency Resolver
100 : ActorSystem system = ActorSystem.Create("Cqrs");
101 :
102 : Func<IKernel, NinjectDependencyResolver> originalDependencyResolverCreator = DependencyResolverCreator;
103 : Func<IKernel, NinjectDependencyResolver> dependencyResolverCreator = container => new AkkaNinjectDependencyResolver(container, system);
104 : if (originalDependencyResolverCreator == null)
105 : DependencyResolverCreator = dependencyResolverCreator;
106 : else
107 : DependencyResolverCreator = container =>
108 : {
109 : originalDependencyResolverCreator(container);
110 : return dependencyResolverCreator(container);
111 : };
112 :
113 : NinjectDependencyResolver.Start(kernel, prepareProvidedKernel);
114 :
115 : // Setup an actor that will handle deadletter type messages
116 : var deadletterWatchMonitorProps = Props.Create(() => new DeadletterToLoggerProxy(Current.Resolve<ILogger>()));
117 : var deadletterWatchActorRef = system.ActorOf(deadletterWatchMonitorProps, "DeadLetterMonitoringActor");
118 :
119 : // subscribe to the event stream for messages of type "DeadLetter"
120 : system.EventStream.Subscribe(deadletterWatchActorRef, typeof(DeadLetter));
121 :
122 : }
123 :
124 : /// <summary>
125 : /// Calls <see cref="ActorSystem.Shutdown"/>
126 : /// </summary>
127 1 : public static void Stop()
128 : {
129 : var di = Current as AkkaNinjectDependencyResolver;
130 : if (di != null)
131 : di.AkkaSystem.Shutdown();
132 : }
133 :
134 : #region Overrides of NinjectDependencyResolver
135 :
136 : /// <summary>
137 : /// Resolves instances of <paramref name="serviceType"/> using <see cref="Resolve(System.Type, Object)"/>.
138 : /// </summary>
139 1 : public override object Resolve(Type serviceType)
140 : {
141 : return Resolve(serviceType, null);
142 : }
143 :
144 : #endregion
145 :
146 : #region Implementation of IAkkaAggregateResolver
147 :
148 : /// <summary>
149 : /// Resolves instances of <typeparamref name="TAggregate"/> using <see cref="AkkaResolve"/>.
150 : /// </summary>
151 1 : public virtual IActorRef ResolveActor<TAggregate, TAuthenticationToken>(Guid rsn)
152 : where TAggregate : IAggregateRoot<TAuthenticationToken>
153 : {
154 : return (IActorRef)AkkaResolve(typeof(TAggregate), rsn, true);
155 : }
156 :
157 : /// <summary>
158 : /// Resolves instances of <typeparamref name="T"/> using <see cref="AkkaResolve"/>.
159 : /// </summary>
160 1 : public IActorRef ResolveActor<T>()
161 : {
162 : return (IActorRef)AkkaResolve(typeof(T), null, true);
163 : }
164 :
165 : #endregion
166 :
167 : #region Implementation of IAkkaSagaResolver
168 :
169 : /// <summary>
170 : /// Resolves instances of <typeparamref name="TSaga"/> using <see cref="ResolveSagaActor{TSaga,TAuthenticationToken}"/>.
171 : /// </summary>
172 : IActorRef IAkkaSagaResolver.ResolveActor<TSaga, TAuthenticationToken>(Guid rsn)
173 : {
174 : return ResolveSagaActor<TSaga, TAuthenticationToken>(rsn);
175 : }
176 :
177 : /// <summary>
178 : /// Resolves instances of <typeparamref name="TSaga"/> using <see cref="AkkaResolve"/>.
179 : /// </summary>
180 1 : public virtual IActorRef ResolveSagaActor<TSaga, TAuthenticationToken>(Guid rsn)
181 : where TSaga : ISaga<TAuthenticationToken>
182 : {
183 : return (IActorRef)AkkaResolve(typeof(TSaga), rsn, true);
184 : }
185 :
186 : #endregion
187 :
188 : /// <summary>
189 : /// Resolves instances of <paramref name="serviceType"/> using <see cref="IDependencyResolver.Resolve{T}"/>.
190 : /// </summary>
191 1 : protected virtual object RootResolve(Type serviceType)
192 : {
193 : return base.Resolve(serviceType);
194 : }
195 :
196 : /// <summary>
197 : /// Resolves instances of <paramref name="serviceType"/> using <see cref="AkkaResolve"/>.
198 : /// </summary>
199 1 : public virtual object Resolve(Type serviceType, object rsn)
200 : {
201 : return AkkaResolve(serviceType, rsn);
202 : }
203 :
204 : /// <summary>
205 : /// Resolves instances of <paramref name="serviceType"/> looking up <see cref="AkkaActors"/>, then <see cref="IDependencyResolver.Resolve{T}"/> and finally <see cref="AggregateFactory"/>.
206 : /// </summary>
207 1 : public virtual object AkkaResolve(Type serviceType, object rsn, bool isAForcedActorSearch = false)
208 : {
209 : do
210 : {
211 : IActorRef actorReference;
212 : try
213 : {
214 : if (AkkaActors.TryGetValue(serviceType, out actorReference))
215 : return actorReference;
216 : if (!isAForcedActorSearch)
217 : return base.Resolve(serviceType);
218 : }
219 : catch (ActivationException) { throw; }
220 : catch ( /*ActorInitialization*/Exception) { /* */ }
221 :
222 : Props properties;
223 : Type typeToTest = serviceType;
224 : while (typeToTest != null)
225 : {
226 : Type[] types = typeToTest.GenericTypeArguments;
227 : if (types.Length == 1)
228 : {
229 : Type aggregateType = typeof (AkkaAggregateRoot<>).MakeGenericType(typeToTest.GenericTypeArguments.Single());
230 : if (typeToTest == aggregateType)
231 : {
232 : typeToTest = aggregateType;
233 : break;
234 : }
235 : Type sagaType = typeof (AkkaSaga<>).MakeGenericType(typeToTest.GenericTypeArguments.Single());
236 : if (typeToTest == sagaType)
237 : {
238 : typeToTest = sagaType;
239 : break;
240 : }
241 : }
242 : typeToTest = typeToTest.BaseType;
243 : }
244 :
245 : // This sorts out an out-of-order binder issue
246 : if (AggregateFactory == null)
247 : AggregateFactory = Resolve<IAggregateFactory>();
248 :
249 : if (typeToTest == null || !(typeToTest).IsAssignableFrom(serviceType))
250 : properties = Props.Create(() => (ActorBase)RootResolve(serviceType));
251 : else
252 : properties = Props.Create(() => (ActorBase) AggregateFactory.Create(serviceType, rsn as Guid?, false));
253 : string actorName = serviceType.FullName.Replace("`", string.Empty);
254 : int index = actorName.IndexOf("[[", StringComparison.Ordinal);
255 : if (index > -1)
256 : actorName = actorName.Substring(0, index);
257 : try
258 : {
259 : actorReference = AkkaSystem.ActorOf(properties, string.Format("{0}~{1}", actorName, rsn));
260 : }
261 : catch (InvalidActorNameException)
262 : {
263 : // This means that the actor has been created since we tried to get it... funnily enough concurrency doesn't actually mean concurrency.
264 : continue;
265 : }
266 : AkkaActors.Add(serviceType, actorReference);
267 : return actorReference;
268 : } while (true);
269 : }
270 : }
271 : }
|