Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Threading.Tasks;
13 : using Cqrs.Events;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Domain;
16 : using Cqrs.Messages;
17 : using Microsoft.Azure.Documents;
18 : using Microsoft.Azure.Documents.Client;
19 : using Microsoft.Azure.Documents.Linq;
20 :
21 : namespace Cqrs.Azure.DocumentDb.Events
22 : {
23 : /// <summary>
24 : /// A DocumentDb based <see cref="EventStore{TAuthenticationToken}"/>.
25 : /// </summary>
26 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
27 : public class AzureDocumentDbEventStore<TAuthenticationToken>
28 : : EventStore<TAuthenticationToken>
29 1 : {
30 : /// <summary>
31 : /// The properties that must be unique.
32 : /// </summary>
33 : protected readonly string[] UniqueIndexProperties = {"AggregateId", "Version"};
34 :
35 : /// <summary>
36 : /// Gets or sets the <see cref="IAzureDocumentDbEventStoreConnectionStringFactory"/>
37 : /// </summary>
38 : protected IAzureDocumentDbEventStoreConnectionStringFactory AzureDocumentDbEventStoreConnectionStringFactory { get; private set; }
39 :
40 : /// <summary>
41 : /// Gets or sets the <see cref="IAzureDocumentDbHelper"/>
42 : /// </summary>
43 : protected IAzureDocumentDbHelper AzureDocumentDbHelper { get; private set; }
44 :
45 : /// <summary>
46 : /// Instantiate a new instance of <see cref="AzureDocumentDbEventStore{TAuthenticationToken}"/>.
47 : /// </summary>
48 1 : public AzureDocumentDbEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, IAzureDocumentDbHelper azureDocumentDbHelper, IAzureDocumentDbEventStoreConnectionStringFactory azureDocumentDbEventStoreConnectionStringFactory)
49 : : base(eventBuilder, eventDeserialiser, logger)
50 : {
51 : AzureDocumentDbHelper = azureDocumentDbHelper;
52 : AzureDocumentDbEventStoreConnectionStringFactory = azureDocumentDbEventStoreConnectionStringFactory;
53 : }
54 :
55 : /// <summary>
56 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
57 : /// </summary>
58 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
59 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
60 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
61 : /// <param name="fromVersion">Load events starting from this version</param>
62 1 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
63 : {
64 : return GetAsync(aggregateRootType, aggregateId, useLastEventOnly, fromVersion).Result;
65 : }
66 :
67 : /// <summary>
68 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="version"/>.
69 : /// </summary>
70 : /// <param name="aggregateRootType"> <see cref="System.Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
71 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
72 : /// <param name="version">Load events up-to and including from this version</param>
73 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetToVersion(Type aggregateRootType, Guid aggregateId, int version)
74 : {
75 : return GetToVersionAsync(aggregateRootType, aggregateId, version).Result;
76 : }
77 :
78 : /// <summary>
79 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="versionedDate"/>.
80 : /// </summary>
81 : /// <param name="aggregateRootType"> <see cref="System.Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
82 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
83 : /// <param name="versionedDate">Load events up-to and including from this <see cref="System.DateTime"/></param>
84 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetToDate(Type aggregateRootType, Guid aggregateId, DateTime versionedDate)
85 : {
86 : return GetToDateAsync(aggregateRootType, aggregateId, versionedDate).Result;
87 : }
88 :
89 : /// <summary>
90 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> from and including the provided <paramref name="fromVersionedDate"/> up to and including the provided <paramref name="toVersionedDate"/>.
91 : /// </summary>
92 : /// <param name="aggregateRootType"> <see cref="System.Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
93 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
94 : /// <param name="fromVersionedDate">Load events from and including from this <see cref="System.DateTime"/></param>
95 : /// <param name="toVersionedDate">Load events up-to and including from this <see cref="System.DateTime"/></param>
96 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetBetweenDates(Type aggregateRootType, Guid aggregateId, DateTime fromVersionedDate, DateTime toVersionedDate)
97 : {
98 : return GetBetweenDatesAsync(aggregateRootType, aggregateId, fromVersionedDate, toVersionedDate).Result;
99 : }
100 :
101 : /// <summary>
102 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
103 : /// </summary>
104 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
105 1 : public override IEnumerable<EventData> Get(Guid correlationId)
106 : {
107 : return GetAsync(correlationId).Result;
108 : }
109 :
110 : /// <summary>
111 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <typeparamref name="T"/> with the ID matching the provided <paramref name="aggregateId"/>.
112 : /// </summary>
113 : /// <typeparam name="T"><see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</typeparam>
114 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
115 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
116 : /// <param name="fromVersion">Load events starting from this version</param>
117 : /// <returns></returns>
118 1 : protected async Task<IEnumerable<IEvent<TAuthenticationToken>>> GetAsync<T>(Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
119 : {
120 : return Get(typeof(T), aggregateId, useLastEventOnly, fromVersion);
121 : }
122 :
123 : /// <summary>
124 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
125 : /// </summary>
126 : /// <param name="aggregateRootType"><see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
127 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
128 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
129 : /// <param name="fromVersion">Load events starting from this version</param>
130 1 : protected async Task<IEnumerable<IEvent<TAuthenticationToken>>> GetAsync(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
131 : {
132 : using (DocumentClient client = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionClient())
133 : {
134 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionDatabaseName()).Result;
135 : //DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, string.Format("{0}_{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), typeof(T).FullName)).Result;
136 : string collectionName = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName();
137 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
138 :
139 : IOrderedQueryable<EventData> query = client.CreateDocumentQuery<EventData>(collection.SelfLink);
140 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
141 :
142 : IEnumerable<EventData> results = query.Where(x => x.AggregateId == streamName && x.Version > fromVersion);
143 :
144 : return AzureDocumentDbHelper.ExecuteFaultTollerantFunction(() =>
145 : results
146 : .ToList()
147 : .OrderByDescending(x => x.EventId)
148 : .Select(EventDeserialiser.Deserialise)
149 : );
150 : }
151 : }
152 :
153 : /// <summary>
154 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="version"/>.
155 : /// </summary>
156 : /// <param name="aggregateRootType"> <see cref="System.Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
157 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
158 : /// <param name="version">Load events up-to and including from this version</param>
159 1 : public async Task<IEnumerable<IEvent<TAuthenticationToken>>> GetToVersionAsync(Type aggregateRootType, Guid aggregateId, int version)
160 : {
161 : using (DocumentClient client = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionClient())
162 : {
163 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionDatabaseName()).Result;
164 : //DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, string.Format("{0}_{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), typeof(T).FullName)).Result;
165 : string collectionName = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName();
166 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
167 :
168 : IOrderedQueryable<EventData> query = client.CreateDocumentQuery<EventData>(collection.SelfLink);
169 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
170 :
171 : IEnumerable<EventData> results = query.Where(x => x.AggregateId == streamName && x.Version <= version);
172 :
173 : return AzureDocumentDbHelper.ExecuteFaultTollerantFunction(() =>
174 : results
175 : .ToList()
176 : .OrderByDescending(x => x.EventId)
177 : .Select(EventDeserialiser.Deserialise)
178 : );
179 : }
180 : }
181 :
182 : /// <summary>
183 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="versionedDate"/>.
184 : /// </summary>
185 : /// <param name="aggregateRootType"> <see cref="System.Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
186 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
187 : /// <param name="versionedDate">Load events up-to and including from this <see cref="System.DateTime"/></param>
188 1 : public async Task<IEnumerable<IEvent<TAuthenticationToken>>> GetToDateAsync(Type aggregateRootType, Guid aggregateId, DateTime versionedDate)
189 : {
190 : using (DocumentClient client = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionClient())
191 : {
192 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionDatabaseName()).Result;
193 : //DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, string.Format("{0}_{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), typeof(T).FullName)).Result;
194 : string collectionName = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName();
195 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
196 :
197 : IOrderedQueryable<EventData> query = client.CreateDocumentQuery<EventData>(collection.SelfLink);
198 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
199 :
200 : IEnumerable<EventData> results = query.Where(x => x.AggregateId == streamName && x.Timestamp <= versionedDate);
201 :
202 : return AzureDocumentDbHelper.ExecuteFaultTollerantFunction(() =>
203 : results
204 : .ToList()
205 : .OrderByDescending(x => x.EventId)
206 : .Select(EventDeserialiser.Deserialise)
207 : );
208 : }
209 : }
210 :
211 : /// <summary>
212 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> from and including the provided <paramref name="fromVersionedDate"/> up to and including the provided <paramref name="toVersionedDate"/>.
213 : /// </summary>
214 : /// <param name="aggregateRootType"> <see cref="System.Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
215 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
216 : /// <param name="fromVersionedDate">Load events from and including from this <see cref="System.DateTime"/></param>
217 : /// <param name="toVersionedDate">Load events up-to and including from this <see cref="System.DateTime"/></param>
218 1 : public async Task<IEnumerable<IEvent<TAuthenticationToken>>> GetBetweenDatesAsync(Type aggregateRootType, Guid aggregateId, DateTime fromVersionedDate, DateTime toVersionedDate)
219 : {
220 : using (DocumentClient client = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionClient())
221 : {
222 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionDatabaseName()).Result;
223 : //DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, string.Format("{0}_{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), typeof(T).FullName)).Result;
224 : string collectionName = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName();
225 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
226 :
227 : IOrderedQueryable<EventData> query = client.CreateDocumentQuery<EventData>(collection.SelfLink);
228 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
229 :
230 : IEnumerable<EventData> results = query.Where(eventData => eventData.AggregateId == streamName && eventData.Timestamp >= fromVersionedDate && eventData.Timestamp <= toVersionedDate);
231 :
232 : return AzureDocumentDbHelper.ExecuteFaultTollerantFunction(() =>
233 : results
234 : .ToList()
235 : .OrderByDescending(x => x.EventId)
236 : .Select(EventDeserialiser.Deserialise)
237 : );
238 : }
239 : }
240 :
241 : /// <summary>
242 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
243 : /// </summary>
244 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
245 1 : protected async Task<IEnumerable<EventData>> GetAsync(Guid correlationId)
246 : {
247 : using (DocumentClient client = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionClient())
248 : {
249 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionDatabaseName()).Result;
250 : //DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, string.Format("{0}_{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), typeof(T).FullName)).Result;
251 : string collectionName = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName();
252 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
253 :
254 : IOrderedQueryable<EventData> query = client.CreateDocumentQuery<EventData>(collection.SelfLink);
255 :
256 : IEnumerable<EventData> results = query.Where(x => x.CorrelationId == correlationId);
257 :
258 : return AzureDocumentDbHelper.ExecuteFaultTollerantFunction(() =>
259 : results
260 : .ToList()
261 : .OrderBy(x => x.Timestamp)
262 : );
263 : }
264 : }
265 :
266 : /// <summary>
267 : /// Persist the provided <paramref name="eventData"/> into storage.
268 : /// </summary>
269 : /// <param name="eventData">The <see cref="EventData"/> to persist.</param>
270 1 : protected override void PersistEvent(EventData eventData)
271 : {
272 : Logger.LogDebug("Persisting aggregate root event", string.Format("{0}\\PersitEvent", GetType().Name));
273 : try
274 : {
275 : using (DocumentClient client = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionClient())
276 : {
277 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionDatabaseName()).Result;
278 : //DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, string.Format("{0}_{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), eventData.EventType)).Result;
279 : //string collectionName = string.Format("{0}::{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), eventData.AggregateId.Substring(0, eventData.AggregateId.IndexOf("/", StringComparison.Ordinal)));
280 : string collectionName = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName();
281 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
282 :
283 : Logger.LogDebug("Creating document for event asynchronously", string.Format("{0}\\PersitEvent", GetType().Name));
284 : AzureDocumentDbHelper.ExecuteFaultTollerantFunction
285 : (
286 : () =>
287 : {
288 : Task<ResourceResponse<Document>> work = client.CreateDocumentAsync
289 : (
290 : collection.SelfLink,
291 : eventData
292 : );
293 : work.ConfigureAwait(false);
294 : work.Wait();
295 : }
296 : );
297 : }
298 : }
299 : finally
300 : {
301 : Logger.LogDebug("Persisting aggregate root event... Done", string.Format("{0}\\PersitEvent", GetType().Name));
302 : }
303 : }
304 : }
305 : }
|