Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using Cqrs.Akka.Domain;
13 : using Cqrs.Domain;
14 : using Cqrs.Domain.Exceptions;
15 : using Cqrs.Domain.Factories;
16 : using Cqrs.Events;
17 : using Cqrs.Infrastructure;
18 : using Cqrs.Snapshots;
19 :
20 : namespace Cqrs.Akka.Snapshots
21 : {
22 : /// <summary>
23 : /// Provides basic repository methods for operations with instances of <see cref="IAggregateRoot{TAuthenticationToken}"/>
24 : /// utilising <see cref="Snapshot">snapshots</see> for optimised rehydration.
25 : /// </summary>
26 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of authentication token.</typeparam>
27 : public class AkkaSnapshotRepository<TAuthenticationToken>
28 : : IAkkaSnapshotAggregateRepository<TAuthenticationToken>
29 1 : {
30 : /// <summary>
31 : /// Gets or sets the <see cref="ISnapshotStore"/>.
32 : /// </summary>
33 : protected ISnapshotStore SnapshotStore { get; private set; }
34 :
35 : /// <summary>
36 : /// Gets or sets the <see cref="ISnapshotStrategy{TAuthenticationToken}"/>.
37 : /// </summary>
38 : protected ISnapshotStrategy<TAuthenticationToken> SnapshotStrategy { get; private set; }
39 :
40 : /// <summary>
41 : /// Gets or sets the <see cref="IAggregateRepository{TAuthenticationToken}"/>.
42 : /// </summary>
43 : protected IAggregateRepository<TAuthenticationToken> Repository { get; private set; }
44 :
45 : /// <summary>
46 : /// Gets or sets the <see cref="IEventStore{TAuthenticationToken}"/>.
47 : /// </summary>
48 : protected IEventStore<TAuthenticationToken> EventStore { get; private set; }
49 :
50 : /// <summary>
51 : /// Gets or sets the <see cref="IAggregateFactory"/>.
52 : /// </summary>
53 : protected IAggregateFactory AggregateFactory { get; private set; }
54 :
55 : /// <summary>
56 : /// Instantiates a new instance of <see cref="SnapshotRepository{TAuthenticationToken}"/>.
57 : /// </summary>
58 1 : public AkkaSnapshotRepository(ISnapshotStore snapshotStore, ISnapshotStrategy<TAuthenticationToken> snapshotStrategy, IAggregateRepository<TAuthenticationToken> repository, IEventStore<TAuthenticationToken> eventStore, IAggregateFactory aggregateFactory)
59 : {
60 : SnapshotStore = snapshotStore;
61 : SnapshotStrategy = snapshotStrategy;
62 : Repository = repository;
63 : EventStore = eventStore;
64 : AggregateFactory = aggregateFactory;
65 : }
66 :
67 : /// <summary>
68 : /// Calls <see cref="TryMakeSnapshot"/> then <see cref="IAggregateRepository{TAuthenticationToken}.Save{TAggregateRoot}"/> on <see cref="Repository"/>.
69 : /// </summary>
70 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
71 : /// <param name="aggregate">The <see cref="IAggregateRoot{TAuthenticationToken}"/> to save and persist.</param>
72 : /// <param name="expectedVersion">The version number the <see cref="IAggregateRoot{TAuthenticationToken}"/> is expected to be at.</param>
73 1 : public void Save<TAggregateRoot>(TAggregateRoot aggregate, int? expectedVersion = null)
74 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
75 : {
76 : // We need to grab these first as the changes will have been commitedd already by the time we go to make the snapshot.
77 : IEnumerable<IEvent<TAuthenticationToken>> uncommittedChanges = aggregate.GetUncommittedChanges();
78 : // Save the evets first then snapshot the system.
79 : Repository.Save(aggregate, expectedVersion);
80 : TryMakeSnapshot(aggregate, uncommittedChanges);
81 : }
82 :
83 : /// <summary>
84 : /// Retrieves an <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <typeparamref name="TAggregateRoot"/>,
85 : /// First using <see cref="TryRestoreAggregateFromSnapshot{TAggregateRoot}"/>, otherwise via <see cref="IAggregateRepository{TAuthenticationToken}.Get{TAggregateRoot}"/> on <see cref="Repository"/>
86 : /// Then does rehydration.
87 : /// </summary>
88 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
89 : /// <param name="aggregateId">The identifier of the <see cref="IAggregateRoot{TAuthenticationToken}"/> to retrieve.</param>
90 : /// <param name="events">
91 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
92 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
93 : /// </param>
94 1 : public TAggregateRoot Get<TAggregateRoot>(Guid aggregateId, IList<IEvent<TAuthenticationToken>> events = null)
95 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
96 : {
97 : var aggregate = AggregateFactory.Create<TAggregateRoot>();
98 : int snapshotVersion = TryRestoreAggregateFromSnapshot(aggregateId, aggregate);
99 : if (snapshotVersion == -1)
100 : {
101 : return Repository.Get<TAggregateRoot>(aggregateId);
102 : }
103 : IEnumerable<IEvent<TAuthenticationToken>> theseEvents = events ?? EventStore.Get<TAggregateRoot>(aggregateId, false, snapshotVersion).Where(desc => desc.Version > snapshotVersion);
104 : aggregate.LoadFromHistory(theseEvents);
105 :
106 : return aggregate;
107 : }
108 :
109 : /// <summary>
110 : /// Retrieves an <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <typeparamref name="TAggregateRoot"/> up to and including the provided <paramref name="version"/>.
111 : /// </summary>
112 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
113 : /// <param name="aggregateId">The identifier of the <see cref="IAggregateRoot{TAuthenticationToken}"/> to retrieve.</param>
114 : /// <param name="version">Load events up-to and including from this version</param>
115 : /// <param name="events">
116 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
117 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
118 : /// </param>
119 1 : public TAggregateRoot GetToVersion<TAggregateRoot>(Guid aggregateId, int version, IList<IEvent<TAuthenticationToken>> events = null)
120 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
121 : {
122 : throw new InvalidOperationException("Verion replay is not appriopriate with snapshots.");
123 : }
124 :
125 : /// <summary>
126 : /// Retrieves an <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <typeparamref name="TAggregateRoot"/> up to and including the provided <paramref name="versionedDate"/>.
127 : /// </summary>
128 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
129 : /// <param name="aggregateId">The identifier of the <see cref="IAggregateRoot{TAuthenticationToken}"/> to retrieve.</param>
130 : /// <param name="versionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
131 : /// <param name="events">
132 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
133 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
134 : /// </param>
135 1 : public TAggregateRoot GetToDate<TAggregateRoot>(Guid aggregateId, DateTime versionedDate, IList<IEvent<TAuthenticationToken>> events = null)
136 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
137 : {
138 : throw new InvalidOperationException("Verion replay is not appriopriate with snapshots.");
139 : }
140 :
141 : /// <summary>
142 : /// Calls <see cref="ISnapshotStrategy{TAuthenticationToken}.IsSnapshotable"/> on <see cref="SnapshotStrategy"/>
143 : /// If the <typeparamref name="TAggregateRoot"/> is snapshot-able <see cref="ISnapshotStore.Get{TAggregateRoot}"/> is called on <see cref="SnapshotStore"/>.
144 : /// The Restore method is then called on
145 : /// </summary>
146 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
147 : /// <param name="id">The identifier of the <see cref="IAggregateRoot{TAuthenticationToken}"/> to restore, since the <paramref name="aggregate"/> may be completely uninitialised.</param>
148 : /// <param name="aggregate">The <typeparamref name="TAggregateRoot"/></param>
149 : /// <returns>-1 if no restoration was made, otherwise version number the <typeparamref name="TAggregateRoot"/> was rehydrated to.</returns>
150 : /// <remarks>There may be more events after the snapshot that still need to rehydrated into the <typeparamref name="TAggregateRoot"/> after restoration.</remarks>
151 1 : protected virtual int TryRestoreAggregateFromSnapshot<TAggregateRoot>(Guid id, TAggregateRoot aggregate)
152 : {
153 : int version = -1;
154 : if (SnapshotStrategy.IsSnapshotable(aggregate.GetType()))
155 : {
156 : Snapshot snapshot = SnapshotStore.Get(aggregate.GetType(), id);
157 : if (snapshot != null)
158 : {
159 : aggregate.AsDynamic().Restore(snapshot);
160 : version = snapshot.Version;
161 : }
162 : }
163 : return version;
164 : }
165 :
166 : /// <summary>
167 : /// Calls <see cref="ISnapshotStrategy{TAuthenticationToken}.ShouldMakeSnapShot"/> on <see cref="SnapshotStrategy"/>
168 : /// If the <see cref="IAggregateRoot{TAuthenticationToken}"/> is snapshot-able <see cref="SnapshotAggregateRoot{TAuthenticationToken,TSnapshot}.GetSnapshot"/> is called
169 : /// The <see cref="Snapshot.Version"/> is calculated, finally <see cref="ISnapshotStore.Save"/> is called on <see cref="SnapshotStore"/>.
170 : /// </summary>
171 : /// <param name="aggregate">The <see cref="IAggregateRoot{TAuthenticationToken}"/> to try and snapshot.</param>
172 : /// <param name="uncommittedChanges">A collection of uncommited changes to assess. If null the aggregate will be asked to provide them.</param>
173 1 : protected virtual void TryMakeSnapshot(IAggregateRoot<TAuthenticationToken> aggregate, IEnumerable<IEvent<TAuthenticationToken>> uncommittedChanges)
174 : {
175 : if (!SnapshotStrategy.ShouldMakeSnapShot(aggregate, uncommittedChanges))
176 : return;
177 : dynamic snapshot = aggregate.AsDynamic().GetSnapshot().RealObject;
178 : var rsnapshot = snapshot as Snapshot;
179 : if (rsnapshot != null)
180 : {
181 : rsnapshot.Version = aggregate.Version;
182 : SnapshotStore.Save(rsnapshot);
183 : }
184 : else
185 : {
186 : snapshot.Version = aggregate.Version;
187 : SnapshotStore.Save(snapshot);
188 : }
189 : }
190 :
191 : #region Implementation of IAkkaAggregateRepository<TAuthenticationToken>
192 :
193 : /// <summary>
194 : /// If <paramref name="events"/> is null, loads the events from <see cref="IEventStore{TAuthenticationToken}"/>, checks for duplicates and then
195 : /// rehydrates the <paramref name="aggregate"/> with the events.
196 : /// </summary>
197 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
198 : /// <param name="aggregate">The <typeparamref name="TAggregateRoot"/> to rehydrate.</param>
199 : /// <param name="events">
200 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
201 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
202 : /// </param>
203 : /// <param name="throwExceptionOnNoEvents">If true will throw an instance of <see cref="AggregateNotFoundException{TAggregateRoot,TAuthenticationToken}"/> if no aggregate events or provided or found in the <see cref="IEventStore{TAuthenticationToken}"/>.</param>
204 1 : public void LoadAggregateHistory<TAggregateRoot>(TAggregateRoot aggregate, IList<IEvent<TAuthenticationToken>> events = null, bool throwExceptionOnNoEvents = true)
205 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
206 : {
207 : int snapshotVersion = TryRestoreAggregateFromSnapshot(aggregate.Id, aggregate);
208 :
209 : IEnumerable<IEvent<TAuthenticationToken>> theseEvents = events ?? EventStore.Get(aggregate.GetType(), aggregate.Id, false, snapshotVersion).Where(desc => desc.Version > snapshotVersion);
210 : aggregate.LoadFromHistory(theseEvents);
211 : }
212 :
213 : #endregion
214 : }
215 : }
|