Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using cdmdotnet.Logging;
13 : using Cqrs.Configuration;
14 : using Cqrs.Domain.Exceptions;
15 : using Cqrs.Domain.Factories;
16 : using Cqrs.Events;
17 :
18 : namespace Cqrs.Domain
19 : {
20 : /// <summary>
21 : /// Provides basic repository methods for operations with instances of <see cref="IAggregateRoot{TAuthenticationToken}"/> using an <see cref="IEventStore{TAuthenticationToken}"/>
22 : /// that also publishes events once saved.
23 : /// </summary>
24 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of authentication token.</typeparam>
25 : public class AggregateRepository<TAuthenticationToken>
26 : : IAggregateRepository<TAuthenticationToken>
27 1 : {
28 : /// <summary>
29 : /// Gets or sets the <see cref="IEventStore{TAuthenticationToken}"/> used to store and retrieve events from.
30 : /// </summary>
31 : protected IEventStore<TAuthenticationToken> EventStore { get; private set; }
32 :
33 : /// <summary>
34 : /// Gets or sets the Publisher used to publish events on once saved into the <see cref="EventStore"/>.
35 : /// </summary>
36 : protected IEventPublisher<TAuthenticationToken> Publisher { get; private set; }
37 :
38 : /// <summary>
39 : /// Gets or set the <see cref="IAggregateFactory"/>.
40 : /// </summary>
41 : protected IAggregateFactory AggregateFactory { get; private set; }
42 :
43 : /// <summary>
44 : /// Gets or set the <see cref="ICorrelationIdHelper"/>.
45 : /// </summary>
46 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
47 :
48 : /// <summary>
49 : /// Gets or sets the <see cref="IConfigurationManager"/>.
50 : /// </summary>
51 : protected IConfigurationManager ConfigurationManager { get; private set; }
52 :
53 : /// <summary>
54 : /// Instantiates a new instance of <see cref="AggregateRepository{TAuthenticationToken}"/>
55 : /// </summary>
56 1 : public AggregateRepository(IAggregateFactory aggregateFactory, IEventStore<TAuthenticationToken> eventStore, IEventPublisher<TAuthenticationToken> publisher, ICorrelationIdHelper correlationIdHelper, IConfigurationManager configurationManager)
57 : {
58 : EventStore = eventStore;
59 : Publisher = publisher;
60 : CorrelationIdHelper = correlationIdHelper;
61 : ConfigurationManager = configurationManager;
62 : AggregateFactory = aggregateFactory;
63 : }
64 :
65 : /// <summary>
66 : /// Save and persist the provided <paramref name="aggregate"/>, optionally providing the version number the <see cref="IAggregateRoot{TAuthenticationToken}"/> is expected to be at.
67 : /// </summary>
68 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
69 : /// <param name="aggregate">The <see cref="IAggregateRoot{TAuthenticationToken}"/> to save and persist.</param>
70 : /// <param name="expectedVersion">The version number the <see cref="IAggregateRoot{TAuthenticationToken}"/> is expected to be at.</param>
71 1 : public virtual void Save<TAggregateRoot>(TAggregateRoot aggregate, int? expectedVersion = null)
72 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
73 : {
74 : IList<IEvent<TAuthenticationToken>> uncommittedChanges = aggregate.GetUncommittedChanges().ToList();
75 : if (!uncommittedChanges.Any())
76 : return;
77 :
78 : if (expectedVersion != null)
79 : {
80 : IEnumerable<IEvent<TAuthenticationToken>> eventStoreResults = EventStore.Get(aggregate.GetType(), aggregate.Id, false, expectedVersion.Value);
81 : if (eventStoreResults.Any())
82 : throw new ConcurrencyException(aggregate.Id);
83 : }
84 :
85 : var eventsToPublish = new List<IEvent<TAuthenticationToken>>();
86 :
87 : int i = 0;
88 : int version = aggregate.Version;
89 : foreach (IEvent<TAuthenticationToken> @event in uncommittedChanges)
90 : {
91 : var eventWithIdentity = @event as IEventWithIdentity<TAuthenticationToken>;
92 : if (eventWithIdentity != null)
93 : {
94 : if (eventWithIdentity.Rsn == Guid.Empty)
95 : eventWithIdentity.Rsn = aggregate.Id;
96 : if (eventWithIdentity.Rsn == Guid.Empty)
97 : throw new AggregateOrEventMissingIdException(aggregate.GetType(), @event.GetType());
98 : }
99 : else
100 : {
101 : if (@event.Id == Guid.Empty)
102 : @event.Id = aggregate.Id;
103 : if (@event.Id == Guid.Empty)
104 : throw new AggregateOrEventMissingIdException(aggregate.GetType(), @event.GetType());
105 : }
106 :
107 : i++;
108 : version++;
109 :
110 : @event.Version = version;
111 : @event.TimeStamp = DateTimeOffset.UtcNow;
112 : @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
113 : EventStore.Save(aggregate.GetType(), @event);
114 : eventsToPublish.Add(@event);
115 : }
116 :
117 : aggregate.MarkChangesAsCommitted();
118 : foreach (IEvent<TAuthenticationToken> @event in eventsToPublish)
119 : PublishEvent(@event);
120 : }
121 :
122 : /// <summary>
123 : /// Publish the saved <paramref name="event"/>.
124 : /// </summary>
125 1 : protected virtual void PublishEvent(IEvent<TAuthenticationToken> @event)
126 : {
127 : Publisher.Publish(@event);
128 : }
129 :
130 : /// <summary>
131 : /// Retrieves an <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <typeparamref name="TAggregateRoot"/>.
132 : /// </summary>
133 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
134 : /// <param name="aggregateId">The identifier of the <see cref="IAggregateRoot{TAuthenticationToken}"/> to retrieve.</param>
135 : /// <param name="events">
136 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
137 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
138 : /// </param>
139 1 : public virtual TAggregateRoot Get<TAggregateRoot>(Guid aggregateId, IList<IEvent<TAuthenticationToken>> events = null)
140 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
141 : {
142 : return LoadAggregate<TAggregateRoot>(aggregateId, events);
143 : }
144 :
145 : /// <summary>
146 : /// Retrieves an <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <typeparamref name="TAggregateRoot"/> up to and including the provided <paramref name="version"/>.
147 : /// </summary>
148 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
149 : /// <param name="aggregateId">The identifier of the <see cref="IAggregateRoot{TAuthenticationToken}"/> to retrieve.</param>
150 : /// <param name="version">Load events up-to and including from this version</param>
151 : /// <param name="events">
152 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
153 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
154 : /// </param>
155 1 : public virtual TAggregateRoot GetToVersion<TAggregateRoot>(Guid aggregateId, int version, IList<IEvent<TAuthenticationToken>> events = null)
156 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
157 : {
158 : return LoadAggregateToVersion<TAggregateRoot>(aggregateId, version, events);
159 : }
160 :
161 : /// <summary>
162 : /// Retrieves an <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <typeparamref name="TAggregateRoot"/> up to and including the provided <paramref name="versionedDate"/>.
163 : /// </summary>
164 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
165 : /// <param name="aggregateId">The identifier of the <see cref="IAggregateRoot{TAuthenticationToken}"/> to retrieve.</param>
166 : /// <param name="versionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
167 : /// <param name="events">
168 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
169 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
170 : /// </param>
171 1 : public virtual TAggregateRoot GetToDate<TAggregateRoot>(Guid aggregateId, DateTime versionedDate, IList<IEvent<TAuthenticationToken>> events = null)
172 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
173 : {
174 : return LoadAggregateToDate<TAggregateRoot>(aggregateId, versionedDate, events);
175 : }
176 :
177 : /// <summary>
178 : /// Calls <see cref="IAggregateFactory.Create"/> to get a, <typeparamref name="TAggregateRoot"/>.
179 : /// </summary>
180 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
181 : /// <param name="id">The id of the <typeparamref name="TAggregateRoot"/> to create.</param>
182 1 : protected virtual TAggregateRoot CreateAggregate<TAggregateRoot>(Guid id)
183 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
184 : {
185 : var aggregate = AggregateFactory.Create<TAggregateRoot>(id);
186 :
187 : return aggregate;
188 : }
189 :
190 : /// <summary>
191 : /// Calls <see cref="IAggregateFactory.Create"/> to get a, <typeparamref name="TAggregateRoot"/> and then calls <see cref="LoadAggregateHistory{TAggregateRoot}"/>.
192 : /// </summary>
193 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
194 : /// <param name="id">The id of the <typeparamref name="TAggregateRoot"/> to create.</param>
195 : /// <param name="events">
196 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
197 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
198 : /// </param>
199 1 : protected virtual TAggregateRoot LoadAggregate<TAggregateRoot>(Guid id, IList<IEvent<TAuthenticationToken>> events = null)
200 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
201 : {
202 : bool tryDependencyResolutionFirst;
203 : if (!ConfigurationManager.TryGetSetting(string.Format("Cqrs.AggregateFactory.TryDependencyResolutionFirst.{0}", typeof(TAggregateRoot).FullName), out tryDependencyResolutionFirst))
204 : if (!ConfigurationManager.TryGetSetting("Cqrs.AggregateFactory.TryDependencyResolutionFirst", out tryDependencyResolutionFirst))
205 : tryDependencyResolutionFirst = false;
206 : var aggregate = AggregateFactory.Create<TAggregateRoot>(id, tryDependencyResolutionFirst);
207 :
208 : LoadAggregateHistory(aggregate, events);
209 : return aggregate;
210 : }
211 :
212 : /// <summary>
213 : /// Calls <see cref="IAggregateFactory.Create"/> to get a, <typeparamref name="TAggregateRoot"/> and then calls <see cref="LoadAggregateHistory{TAggregateRoot}"/>.
214 : /// </summary>
215 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
216 : /// <param name="id">The id of the <typeparamref name="TAggregateRoot"/> to create.</param>
217 : /// <param name="version">Load events up-to and including from this version</param>
218 : /// <param name="events">
219 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
220 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
221 : /// </param>
222 1 : protected virtual TAggregateRoot LoadAggregateToVersion<TAggregateRoot>(Guid id, int version, IList<IEvent<TAuthenticationToken>> events = null)
223 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
224 : {
225 : bool tryDependencyResolutionFirst;
226 : if (!ConfigurationManager.TryGetSetting(string.Format("Cqrs.AggregateFactory.TryDependencyResolutionFirst.{0}", typeof(TAggregateRoot).FullName), out tryDependencyResolutionFirst))
227 : if (!ConfigurationManager.TryGetSetting("Cqrs.AggregateFactory.TryDependencyResolutionFirst", out tryDependencyResolutionFirst))
228 : tryDependencyResolutionFirst = false;
229 : var aggregate = AggregateFactory.Create<TAggregateRoot>(id, tryDependencyResolutionFirst);
230 :
231 : LoadAggregateHistoryToVersion(aggregate, version, events);
232 : return aggregate;
233 : }
234 :
235 : /// <summary>
236 : /// Calls <see cref="IAggregateFactory.Create"/> to get a, <typeparamref name="TAggregateRoot"/> and then calls <see cref="LoadAggregateHistory{TAggregateRoot}"/>.
237 : /// </summary>
238 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
239 : /// <param name="id">The id of the <typeparamref name="TAggregateRoot"/> to create.</param>
240 : /// <param name="versionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
241 : /// <param name="events">
242 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
243 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
244 : /// </param>
245 1 : protected virtual TAggregateRoot LoadAggregateToDate<TAggregateRoot>(Guid id, DateTime versionedDate, IList<IEvent<TAuthenticationToken>> events = null)
246 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
247 : {
248 : bool tryDependencyResolutionFirst;
249 : if (!ConfigurationManager.TryGetSetting(string.Format("Cqrs.AggregateFactory.TryDependencyResolutionFirst.{0}", typeof(TAggregateRoot).FullName), out tryDependencyResolutionFirst))
250 : if (!ConfigurationManager.TryGetSetting("Cqrs.AggregateFactory.TryDependencyResolutionFirst", out tryDependencyResolutionFirst))
251 : tryDependencyResolutionFirst = false;
252 : var aggregate = AggregateFactory.Create<TAggregateRoot>(id, tryDependencyResolutionFirst);
253 :
254 : LoadAggregateHistoryToDate(aggregate, versionedDate, events);
255 : return aggregate;
256 : }
257 :
258 : /// <summary>
259 : /// If <paramref name="events"/> is null, loads the events from <see cref="EventStore"/>, checks for duplicates and then
260 : /// rehydrates the <paramref name="aggregate"/> with the events.
261 : /// </summary>
262 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
263 : /// <param name="aggregate">The <typeparamref name="TAggregateRoot"/> to rehydrate.</param>
264 : /// <param name="events">
265 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
266 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
267 : /// </param>
268 : /// <param name="throwExceptionOnNoEvents">If true will throw an instance of <see cref="AggregateNotFoundException{TAggregateRoot,TAuthenticationToken}"/> if no aggregate events or provided or found in the <see cref="EventStore"/>.</param>
269 1 : public virtual void LoadAggregateHistory<TAggregateRoot>(TAggregateRoot aggregate, IList<IEvent<TAuthenticationToken>> events = null, bool throwExceptionOnNoEvents = true)
270 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
271 : {
272 : IList<IEvent<TAuthenticationToken>> theseEvents = events ?? EventStore.Get<TAggregateRoot>(aggregate.Id).ToList();
273 : if (!theseEvents.Any())
274 : {
275 : if (throwExceptionOnNoEvents)
276 : throw new AggregateNotFoundException<TAggregateRoot, TAuthenticationToken>(aggregate.Id);
277 : return;
278 : }
279 :
280 : var duplicatedEvents =
281 : theseEvents.GroupBy(x => x.Version)
282 : .Select(x => new { Version = x.Key, Total = x.Count() })
283 : .FirstOrDefault(x => x.Total > 1);
284 : if (duplicatedEvents != null)
285 : throw new DuplicateEventException<TAggregateRoot, TAuthenticationToken>(aggregate.Id, duplicatedEvents.Version);
286 :
287 : aggregate.LoadFromHistory(theseEvents);
288 : }
289 :
290 : /// <summary>
291 : /// If <paramref name="events"/> is null, loads the events from <see cref="EventStore"/>, checks for duplicates and then
292 : /// rehydrates the <paramref name="aggregate"/> with the events.
293 : /// </summary>
294 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
295 : /// <param name="aggregate">The <typeparamref name="TAggregateRoot"/> to rehydrate.</param>
296 : /// <param name="version">Load events up-to and including from this version</param>
297 : /// <param name="events">
298 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
299 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
300 : /// </param>
301 : /// <param name="throwExceptionOnNoEvents">If true will throw an instance of <see cref="AggregateNotFoundException{TAggregateRoot,TAuthenticationToken}"/> if no aggregate events or provided or found in the <see cref="EventStore"/>.</param>
302 1 : public virtual void LoadAggregateHistoryToVersion<TAggregateRoot>(TAggregateRoot aggregate, int version, IList<IEvent<TAuthenticationToken>> events = null, bool throwExceptionOnNoEvents = true)
303 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
304 : {
305 : IList<IEvent<TAuthenticationToken>> theseEvents = events ?? EventStore.GetToVersion<TAggregateRoot>(aggregate.Id, version).ToList();
306 : if (!theseEvents.Any())
307 : {
308 : if (throwExceptionOnNoEvents)
309 : throw new AggregateNotFoundException<TAggregateRoot, TAuthenticationToken>(aggregate.Id);
310 : return;
311 : }
312 :
313 : var duplicatedEvents =
314 : theseEvents.GroupBy(x => x.Version)
315 : .Select(x => new { Version = x.Key, Total = x.Count() })
316 : .FirstOrDefault(x => x.Total > 1);
317 : if (duplicatedEvents != null)
318 : throw new DuplicateEventException<TAggregateRoot, TAuthenticationToken>(aggregate.Id, duplicatedEvents.Version);
319 :
320 : aggregate.LoadFromHistory(theseEvents);
321 : }
322 :
323 : /// <summary>
324 : /// If <paramref name="events"/> is null, loads the events from <see cref="EventStore"/>, checks for duplicates and then
325 : /// rehydrates the <paramref name="aggregate"/> with the events.
326 : /// </summary>
327 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
328 : /// <param name="aggregate">The <typeparamref name="TAggregateRoot"/> to rehydrate.</param>
329 : /// <param name="versionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
330 : /// <param name="events">
331 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
332 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
333 : /// </param>
334 : /// <param name="throwExceptionOnNoEvents">If true will throw an instance of <see cref="AggregateNotFoundException{TAggregateRoot,TAuthenticationToken}"/> if no aggregate events or provided or found in the <see cref="EventStore"/>.</param>
335 1 : public virtual void LoadAggregateHistoryToDate<TAggregateRoot>(TAggregateRoot aggregate, DateTime versionedDate, IList<IEvent<TAuthenticationToken>> events = null, bool throwExceptionOnNoEvents = true)
336 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
337 : {
338 : IList<IEvent<TAuthenticationToken>> theseEvents = events ?? EventStore.GetToDate<TAggregateRoot>(aggregate.Id, versionedDate).ToList();
339 : if (!theseEvents.Any())
340 : {
341 : if (throwExceptionOnNoEvents)
342 : throw new AggregateNotFoundException<TAggregateRoot, TAuthenticationToken>(aggregate.Id);
343 : return;
344 : }
345 :
346 : var duplicatedEvents =
347 : theseEvents.GroupBy(x => x.Version)
348 : .Select(x => new { Version = x.Key, Total = x.Count() })
349 : .FirstOrDefault(x => x.Total > 1);
350 : if (duplicatedEvents != null)
351 : throw new DuplicateEventException<TAggregateRoot, TAuthenticationToken>(aggregate.Id, duplicatedEvents.Version);
352 :
353 : aggregate.LoadFromHistory(theseEvents);
354 : }
355 : }
356 : }
|