Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Threading.Tasks;
13 : using Cqrs.Events;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Domain;
16 : using Cqrs.Messages;
17 : using Microsoft.Azure.Documents;
18 : using Microsoft.Azure.Documents.Client;
19 : using Microsoft.Azure.Documents.Linq;
20 :
21 : namespace Cqrs.Azure.DocumentDb.Events
22 : {
23 : /// <summary>
24 : /// A DocumentDb based <see cref="EventStore{TAuthenticationToken}"/>.
25 : /// </summary>
26 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
27 : public class AzureDocumentDbEventStore<TAuthenticationToken> : EventStore<TAuthenticationToken>
28 1 : {
29 : /// <summary>
30 : /// The properties that must be unique.
31 : /// </summary>
32 : protected readonly string[] UniqueIndexProperties = {"AggregateId", "Version"};
33 :
34 : /// <summary>
35 : /// Gets or sets the <see cref="IAzureDocumentDbEventStoreConnectionStringFactory"/>
36 : /// </summary>
37 : protected IAzureDocumentDbEventStoreConnectionStringFactory AzureDocumentDbEventStoreConnectionStringFactory { get; private set; }
38 :
39 : /// <summary>
40 : /// Gets or sets the <see cref="IAzureDocumentDbHelper"/>
41 : /// </summary>
42 : protected IAzureDocumentDbHelper AzureDocumentDbHelper { get; private set; }
43 :
44 : /// <summary>
45 : /// Instantiate a new instance of <see cref="AzureDocumentDbEventStore{TAuthenticationToken}"/>.
46 : /// </summary>
47 1 : public AzureDocumentDbEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, IAzureDocumentDbHelper azureDocumentDbHelper, IAzureDocumentDbEventStoreConnectionStringFactory azureDocumentDbEventStoreConnectionStringFactory)
48 : : base(eventBuilder, eventDeserialiser, logger)
49 : {
50 : AzureDocumentDbHelper = azureDocumentDbHelper;
51 : AzureDocumentDbEventStoreConnectionStringFactory = azureDocumentDbEventStoreConnectionStringFactory;
52 : }
53 :
54 : /// <summary>
55 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
56 : /// </summary>
57 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
58 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
59 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
60 : /// <param name="fromVersion">Load events starting from this version</param>
61 1 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
62 : {
63 : return GetAsync(aggregateRootType, aggregateId, useLastEventOnly, fromVersion).Result;
64 : }
65 :
66 : /// <summary>
67 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
68 : /// </summary>
69 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
70 1 : public override IEnumerable<EventData> Get(Guid correlationId)
71 : {
72 : return GetAsync(correlationId).Result;
73 : }
74 :
75 : /// <summary>
76 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <typeparamref name="T"/> with the ID matching the provided <paramref name="aggregateId"/>.
77 : /// </summary>
78 : /// <typeparam name="T"><see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</typeparam>
79 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
80 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
81 : /// <param name="fromVersion">Load events starting from this version</param>
82 : /// <returns></returns>
83 1 : protected async Task<IEnumerable<IEvent<TAuthenticationToken>>> GetAsync<T>(Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
84 : {
85 : return Get(typeof(T), aggregateId, useLastEventOnly, fromVersion);
86 : }
87 :
88 : /// <summary>
89 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
90 : /// </summary>
91 : /// <param name="aggregateRootType"><see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
92 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
93 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
94 : /// <param name="fromVersion">Load events starting from this version</param>
95 1 : protected async Task<IEnumerable<IEvent<TAuthenticationToken>>> GetAsync(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
96 : {
97 : using (DocumentClient client = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionClient())
98 : {
99 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionDatabaseName()).Result;
100 : //DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, string.Format("{0}_{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), typeof(T).FullName)).Result;
101 : string collectionName = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName();
102 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
103 :
104 : IOrderedQueryable<EventData> query = client.CreateDocumentQuery<EventData>(collection.SelfLink);
105 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
106 :
107 : IEnumerable<EventData> results = query.Where(x => x.AggregateId == streamName && x.Version > fromVersion);
108 :
109 : return AzureDocumentDbHelper.ExecuteFaultTollerantFunction(() =>
110 : results
111 : .ToList()
112 : .OrderByDescending(x => x.EventId)
113 : .Select(EventDeserialiser.Deserialise)
114 : );
115 : }
116 : }
117 :
118 : /// <summary>
119 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
120 : /// </summary>
121 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
122 1 : protected async Task<IEnumerable<EventData>> GetAsync(Guid correlationId)
123 : {
124 : using (DocumentClient client = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionClient())
125 : {
126 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionDatabaseName()).Result;
127 : //DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, string.Format("{0}_{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), typeof(T).FullName)).Result;
128 : string collectionName = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName();
129 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
130 :
131 : IOrderedQueryable<EventData> query = client.CreateDocumentQuery<EventData>(collection.SelfLink);
132 :
133 : IEnumerable<EventData> results = query.Where(x => x.CorrelationId == correlationId);
134 :
135 : return AzureDocumentDbHelper.ExecuteFaultTollerantFunction(() =>
136 : results
137 : .ToList()
138 : .OrderBy(x => x.Timestamp)
139 : );
140 : }
141 : }
142 :
143 : /// <summary>
144 : /// Persist the provided <paramref name="eventData"/> into storage.
145 : /// </summary>
146 : /// <param name="eventData">The <see cref="EventData"/> to persist.</param>
147 1 : protected override void PersistEvent(EventData eventData)
148 : {
149 : Logger.LogDebug("Persisting aggregate root event", string.Format("{0}\\PersitEvent", GetType().Name));
150 : try
151 : {
152 : using (DocumentClient client = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionClient())
153 : {
154 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionDatabaseName()).Result;
155 : //DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, string.Format("{0}_{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), eventData.EventType)).Result;
156 : //string collectionName = string.Format("{0}::{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), eventData.AggregateId.Substring(0, eventData.AggregateId.IndexOf("/", StringComparison.Ordinal)));
157 : string collectionName = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName();
158 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
159 :
160 : Logger.LogDebug("Creating document for event asynchronously", string.Format("{0}\\PersitEvent", GetType().Name));
161 : AzureDocumentDbHelper.ExecuteFaultTollerantFunction
162 : (
163 : () =>
164 : {
165 : Task<ResourceResponse<Document>> work = client.CreateDocumentAsync
166 : (
167 : collection.SelfLink,
168 : eventData,
169 : new RequestOptions {PreTriggerInclude = new[] {"ValidateUniqueConstraints"}}
170 : );
171 : work.ConfigureAwait(false);
172 : work.Wait();
173 : }
174 : );
175 : }
176 : }
177 : finally
178 : {
179 : Logger.LogDebug("Persisting aggregate root event... Done", string.Format("{0}\\PersitEvent", GetType().Name));
180 : }
181 : }
182 : }
183 : }
|