Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Collections.ObjectModel;
12 : using System.Linq;
13 : using System.Runtime.Serialization;
14 : using System.Threading;
15 : using Chinchilla.Logging;
16 : using Cqrs.Commands;
17 : using Cqrs.Configuration;
18 : using Cqrs.Domain.Exceptions;
19 : using Cqrs.Events;
20 : using Cqrs.Infrastructure;
21 :
22 : namespace Cqrs.Domain
23 : {
24 : /// <summary>
25 : /// An independent component that reacts to domain <see cref="IEvent{TAuthenticationToken}"/> in a cross-<see cref="IAggregateRoot{TAuthenticationToken}"/>, eventually consistent manner. Time can also be a trigger. A <see cref="Saga{TAuthenticationToken}"/> can sometimes be purely reactive, and sometimes represent workflows.
26 : ///
27 : /// From an implementation perspective, a <see cref="Saga{TAuthenticationToken}"/> is a state machine that is driven forward by incoming <see cref="IEvent{TAuthenticationToken}"/> (which may come from many <see cref="AggregateRoot{TAuthenticationToken}"/> or other <see cref="Saga{TAuthenticationToken}"/>). Some states will have side effects, such as sending <see cref="ICommand{TAuthenticationToken}"/>, talking to external web services, or sending emails.
28 : /// </summary>
29 : /// <remarks>
30 : /// Isn't a <see cref="Saga{TAuthenticationToken}"/> just leaked domain logic?
31 : /// No.
32 : /// A <see cref="Saga{TAuthenticationToken}"/> can doing things that no individual <see cref="AggregateRoot{TAuthenticationToken}"/> can sensibly do. Thus, it's not a logic leak since the logic didn't belong in an <see cref="AggregateRoot{TAuthenticationToken}"/> anyway. Furthermore, we're not breaking encapsulation in any way, since <see cref="Saga{TAuthenticationToken}"/> operate with <see cref="ICommand{TAuthenticationToken}"/> and <see cref="IEvent{TAuthenticationToken}"/>, which are part of the public API.
33 : ///
34 : /// How can I make my <see cref="Saga{TAuthenticationToken}"/> react to an <see cref="IEvent{TAuthenticationToken}"/> that did not happen?
35 : /// The <see cref="Saga{TAuthenticationToken}"/>, besides reacting to domain <see cref="IEvent{TAuthenticationToken}"/>, can be "woken up" by recurrent internal alarms. Implementing such alarms is easy. See cron in Unix, or triggered WebJobs in Azure for examples.
36 : ///
37 : /// How does the <see cref="Saga{TAuthenticationToken}"/> interact with the write side?
38 : /// By sending an <see cref="ICommand{TAuthenticationToken}"/> to it.
39 : /// </remarks>
40 : public abstract class Saga<TAuthenticationToken> : ISaga<TAuthenticationToken>
41 1 : {
42 : private ReaderWriterLockSlim Lock { get; set; }
43 :
44 : private ICollection<ISagaEvent<TAuthenticationToken>> Changes { get; set; }
45 :
46 : private ICollection<ICommand<TAuthenticationToken>> Commands { get; set; }
47 :
48 : /// <summary>
49 : /// The identifier of this <see cref="ISaga{TAuthenticationToken}"/>.
50 : /// </summary>
51 : [DataMember]
52 : public Guid Rsn
53 : {
54 : get { return Id; }
55 : private set { Id = value; }
56 : }
57 :
58 : /// <summary>
59 : /// The identifier of this <see cref="ISaga{TAuthenticationToken}"/>.
60 : /// </summary>
61 : [DataMember]
62 : public Guid Id { get; protected set; }
63 :
64 : /// <summary>
65 : /// The current version of this <see cref="ISaga{TAuthenticationToken}"/>.
66 : /// </summary>
67 : public int Version { get; protected set; }
68 :
69 : /// <summary>
70 : /// Gets or set the <see cref="ICommandPublisher{TAuthenticationToken}"/>.
71 : /// </summary>
72 : protected ICommandPublisher<TAuthenticationToken> CommandPublisher { get; private set; }
73 :
74 : /// <summary>
75 : /// Gets or set the <see cref="IDependencyResolver"/>.
76 : /// </summary>
77 : protected IDependencyResolver DependencyResolver { get; private set; }
78 :
79 : /// <summary>
80 : /// Gets or set the <see cref="ILogger"/>.
81 : /// </summary>
82 : protected ILogger Logger { get; private set; }
83 :
84 : /// <summary>
85 : /// A constructor for the <see cref="Cqrs.Domain.Factories.IAggregateFactory"/>
86 : /// </summary>
87 1 : protected Saga()
88 : {
89 : Lock = new ReaderWriterLockSlim();
90 : Changes = new ReadOnlyCollection<ISagaEvent<TAuthenticationToken>>(new List<ISagaEvent<TAuthenticationToken>>());
91 : Commands = new ReadOnlyCollection<ICommand<TAuthenticationToken>>(new List<ICommand<TAuthenticationToken>>());
92 : Initialise();
93 : }
94 :
95 : /// <summary>
96 : /// Initialise any properties
97 : /// </summary>
98 1 : protected virtual void Initialise()
99 : {
100 : }
101 :
102 : /// <summary>
103 : /// A constructor for the <see cref="Cqrs.Domain.Factories.IAggregateFactory"/>
104 : /// </summary>
105 1 : protected Saga(IDependencyResolver dependencyResolver, ILogger logger)
106 : : this()
107 : {
108 : DependencyResolver = dependencyResolver;
109 : Logger = logger;
110 : CommandPublisher = DependencyResolver.Resolve<ICommandPublisher<TAuthenticationToken>>();
111 : }
112 :
113 : /// <summary>
114 : /// A constructor for the <see cref="Cqrs.Domain.Factories.IAggregateFactory"/>
115 : /// </summary>
116 1 : protected Saga(IDependencyResolver dependencyResolver, ILogger logger, Guid rsn)
117 : : this(dependencyResolver, logger)
118 : {
119 : Rsn = rsn;
120 : }
121 :
122 : /// <summary>
123 : /// Get all applied changes that haven't yet been committed.
124 : /// </summary>
125 1 : public virtual IEnumerable<ISagaEvent<TAuthenticationToken>> GetUncommittedChanges()
126 : {
127 : return Changes;
128 : }
129 :
130 : /// <summary>
131 : /// Mark all applied changes as committed, increment <see cref="Version"/> and flush the <see cref="Changes">internal collection of changes</see>.
132 : /// </summary>
133 1 : public virtual void MarkChangesAsCommitted()
134 : {
135 : Lock.EnterWriteLock();
136 : try
137 : {
138 : Version = Version + Changes.Count;
139 : Changes = new ReadOnlyCollection<ISagaEvent<TAuthenticationToken>>(new List<ISagaEvent<TAuthenticationToken>>());
140 : }
141 : finally
142 : {
143 : Lock.ExitWriteLock();
144 : }
145 : }
146 :
147 : /// <summary>
148 : /// Apply all the <see cref="IEvent{TAuthenticationToken}">events</see> in <paramref name="history"/>
149 : /// using event replay to this instance.
150 : /// </summary>
151 1 : public virtual void LoadFromHistory(IEnumerable<ISagaEvent<TAuthenticationToken>> history)
152 : {
153 : Type sagaType = GetType();
154 : foreach (ISagaEvent<TAuthenticationToken> @event in history.OrderBy(e => e.Version))
155 : {
156 : if (@event.Version != Version + 1)
157 : throw new EventsOutOfOrderException(@event.Rsn, sagaType, Version + 1, @event.Version);
158 : ApplyChange(@event, true);
159 : }
160 : }
161 :
162 : /// <summary>
163 : /// Get all pending commands that haven't yet been published yet.
164 : /// </summary>
165 1 : public virtual IEnumerable<ICommand<TAuthenticationToken>> GetUnpublishedCommands()
166 : {
167 : return Commands;
168 : }
169 :
170 : /// <summary>
171 : /// Queue the provided <paramref name="command"/> for publishing.
172 : /// </summary>
173 1 : protected virtual void QueueCommand(ICommand<TAuthenticationToken> command)
174 : {
175 : Lock.EnterWriteLock();
176 : try
177 : {
178 : Commands = new ReadOnlyCollection<ICommand<TAuthenticationToken>>(Commands.Concat(new []{command}).ToList());
179 : }
180 : finally
181 : {
182 : Lock.ExitWriteLock();
183 : }
184 : }
185 :
186 : /// <summary>
187 : /// Mark all published commands as published and flush the internal collection of commands.
188 : /// </summary>
189 1 : public virtual void MarkCommandsAsPublished()
190 : {
191 : Lock.EnterWriteLock();
192 : try
193 : {
194 : Commands = new ReadOnlyCollection<ICommand<TAuthenticationToken>>(new List<ICommand<TAuthenticationToken>>());
195 : }
196 : finally
197 : {
198 : Lock.ExitWriteLock();
199 : }
200 : }
201 :
202 : /// <summary>
203 : /// Call the "Apply" method with a signature matching the provided <paramref name="event"/> without using event replay to this instance.
204 : /// </summary>
205 : /// <remarks>
206 : /// This means a method named "Apply", with return type void and one parameter must exist to be applied.
207 : /// If no method exists, nothing is applied
208 : /// The parameter type must match exactly the <see cref="Type"/> of the provided <paramref name="event"/>.
209 : /// </remarks>
210 1 : protected virtual void ApplyChange(ISagaEvent<TAuthenticationToken> @event)
211 : {
212 : ApplyChange(@event, false);
213 : }
214 :
215 : /// <summary>
216 : /// Calls the "SetId" method dynamically if the method exists, then calls <see cref="ApplyChange(Cqrs.Events.ISagaEvent{TAuthenticationToken})"/>
217 : /// </summary>
218 1 : protected virtual void ApplyChange(IEvent<TAuthenticationToken> @event)
219 : {
220 : var sagaEvent = new SagaEvent<TAuthenticationToken>(@event);
221 : // Set ID
222 : this.AsDynamic().SetId(sagaEvent);
223 : ApplyChange(sagaEvent);
224 : }
225 :
226 : /// <summary>
227 : /// Sets the <see cref="IEvent{TAuthenticationToken}.Id"/> from <see cref="ISagaEvent{TAuthenticationToken}.Event"/> back onto <paramref name="sagaEvent"/>.
228 : /// </summary>
229 1 : protected virtual void SetId(ISagaEvent<TAuthenticationToken> sagaEvent)
230 : {
231 : sagaEvent.Rsn = sagaEvent.Event.GetIdentity();
232 : }
233 :
234 : private void ApplyChange(ISagaEvent<TAuthenticationToken> @event, bool isEventReplay)
235 : {
236 : ApplyChanges(new[] { @event }, isEventReplay);
237 : }
238 :
239 : /// <summary>
240 : /// Call the "Apply" method with a signature matching each <see cref="ISagaEvent{TAuthenticationToken}"/> in the provided <paramref name="events"/> without using event replay to this instance.
241 : /// </summary>
242 : /// <remarks>
243 : /// This means a method named "Apply", with return type void and one parameter must exist to be applied.
244 : /// If no method exists, nothing is applied
245 : /// The parameter type must match exactly the <see cref="Type"/> of the <see cref="IEvent{TAuthenticationToken}"/> in the provided <paramref name="events"/>.
246 : /// </remarks>
247 1 : protected virtual void ApplyChanges(IEnumerable<ISagaEvent<TAuthenticationToken>> events)
248 : {
249 : ApplyChanges(events, false);
250 : }
251 :
252 : /// <summary>
253 : /// Calls the "SetId" method dynamically if the method exists on the first <see cref="IEvent{TAuthenticationToken}"/> in the provided <paramref name="events"/>,
254 : /// then calls <see cref="ApplyChanges(System.Collections.Generic.IEnumerable{Cqrs.Events.ISagaEvent{TAuthenticationToken}})"/>
255 : /// </summary>
256 1 : protected virtual void ApplyChanges(IEnumerable<IEvent<TAuthenticationToken>> events)
257 : {
258 : IList<IEvent<TAuthenticationToken>> list = events.ToList();
259 : IList<ISagaEvent<TAuthenticationToken>> sagaEvents = new List<ISagaEvent<TAuthenticationToken>>();
260 : for (int i = 0; i < list.Count; i++)
261 : {
262 : var sagaEvent = new SagaEvent<TAuthenticationToken>(list[i]);
263 : // Set ID
264 : if (i == 0)
265 : this.AsDynamic().SetId(sagaEvent);
266 : sagaEvents.Add(sagaEvent);
267 : }
268 : ApplyChanges(sagaEvents);
269 : }
270 :
271 : private void ApplyChanges(IEnumerable<ISagaEvent<TAuthenticationToken>> events, bool isEventReplay)
272 : {
273 : Lock.EnterWriteLock();
274 : IList<ISagaEvent<TAuthenticationToken>> changes = new List<ISagaEvent<TAuthenticationToken>>();
275 : try
276 : {
277 : try
278 : {
279 : dynamic dynamicThis = this.AsDynamic();
280 : foreach (ISagaEvent<TAuthenticationToken> @event in events)
281 : {
282 : dynamicThis.Apply(@event.Event);
283 : if (!isEventReplay)
284 : {
285 : changes.Add(@event);
286 : }
287 : else
288 : {
289 : Id = @event.Rsn;
290 : Version++;
291 : }
292 : }
293 : }
294 : finally
295 : {
296 : Changes = new ReadOnlyCollection<ISagaEvent<TAuthenticationToken>>(Changes.Concat(changes).ToList());
297 : }
298 : }
299 : finally
300 : {
301 : Lock.ExitWriteLock();
302 : }
303 : }
304 : }
305 : }
|