Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Linq.Expressions;
13 : using System.Reflection;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Configuration;
16 : using Cqrs.Domain;
17 : using Cqrs.Events;
18 : using Cqrs.Messages;
19 : using Cqrs.MongoDB.DataStores.Indexes;
20 : using Cqrs.MongoDB.Events.Indexes;
21 : using Cqrs.MongoDB.Factories;
22 : using MongoDB.Bson.Serialization;
23 : using MongoDB.Driver;
24 : using MongoDB.Driver.Linq;
25 :
26 : namespace Cqrs.MongoDB.Events
27 : {
28 : /// <summary>
29 : /// A MongoDB based <see cref="EventStore{TAuthenticationToken}"/>.
30 : /// </summary>
31 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
32 : public class MongoDbEventStore<TAuthenticationToken>
33 : : EventStore<TAuthenticationToken>
34 1 : {
35 : /// <summary>
36 : /// Gets or sets the <see cref="IMongoCollection{TData}"/>
37 : /// </summary>
38 : protected IMongoCollection<MongoDbEventData> MongoCollection { get; private set; }
39 :
40 : /// <summary>
41 : /// Gets or sets the <see cref="IMongoDbEventStoreConnectionStringFactory"/>
42 : /// </summary>
43 : protected IMongoDbEventStoreConnectionStringFactory MongoDbEventStoreConnectionStringFactory { get; private set; }
44 :
45 : /// <summary>
46 : /// Gets or sets the <see cref="IConfigurationManager"/>
47 : /// </summary>
48 : protected IConfigurationManager ConfigurationManager { get; private set; }
49 :
50 : static MongoDbEventStore()
51 : {
52 : IDictionary<Type, IList<object>> randomCallToStartStaticProperty = MongoDbDataStoreFactory.IndexTypesByEntityType;
53 :
54 : foreach (Assembly assembly in AppDomain.CurrentDomain.GetAssemblies())
55 : {
56 : var eventTypes = assembly
57 : .GetTypes()
58 : .Where
59 : (
60 : type =>
61 : typeof(IEvent<TAuthenticationToken>).IsAssignableFrom(type)
62 : && !type.IsAbstract
63 : );
64 : foreach (Type eventType in eventTypes)
65 : BsonClassMap.LookupClassMap(eventType);
66 : }
67 : }
68 :
69 : /// <summary>
70 : /// Instantiate a new instance of <see cref="MongoDbEventStore{TAuthenticationToken}"/>
71 : /// triggering any require index checks.
72 : /// </summary>
73 1 : public MongoDbEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, IMongoDbEventStoreConnectionStringFactory mongoDbEventStoreConnectionStringFactory, IConfigurationManager configurationManager)
74 : : base(eventBuilder, eventDeserialiser, logger)
75 : {
76 : MongoDbEventStoreConnectionStringFactory = mongoDbEventStoreConnectionStringFactory;
77 : ConfigurationManager = configurationManager;
78 :
79 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
80 : MongoCollection = GetCollection();
81 : VerifyIndexes();
82 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
83 : }
84 :
85 : /// <summary>
86 : /// Get a <see cref="IMongoCollection{TDocument}"/>
87 : /// </summary>
88 1 : protected virtual IMongoCollection<MongoDbEventData> GetCollection()
89 : {
90 : var mongoClient = new MongoClient(MongoDbEventStoreConnectionStringFactory.GetEventStoreConnectionString());
91 : IMongoDatabase mongoDatabase = mongoClient.GetDatabase(MongoDbEventStoreConnectionStringFactory.GetEventStoreDatabaseName());
92 :
93 : return mongoDatabase.GetCollection<MongoDbEventData>(MongoDbEventStoreConnectionStringFactory.GetEventStoreDatabaseName());
94 : }
95 :
96 : /// <summary>
97 : /// Verify all required <see cref="MongoDbIndex{TEntity}"/> are defined and ready to go.
98 : /// </summary>
99 1 : protected virtual void VerifyIndexes()
100 : {
101 : VerifyIndex(new ByCorrelationIdMongoDbIndex());
102 : VerifyIndex(new ByAggregateIdAndVersionMongoDbIndex());
103 : VerifyIndex(new ByTimestampMongoDbIndex());
104 : VerifyIndex(new ByTimestampAndEventTypeMongoDbIndex());
105 : }
106 :
107 : /// <summary>
108 : /// Verify the provided <paramref name="mongoIndex"/> is defined and ready to go.
109 : /// </summary>
110 1 : protected virtual void VerifyIndex(MongoDbIndex<MongoDbEventData> mongoIndex)
111 : {
112 : IndexKeysDefinitionBuilder<MongoDbEventData> indexKeysBuilder = Builders<MongoDbEventData>.IndexKeys;
113 : IndexKeysDefinition<MongoDbEventData> indexKey = null;
114 :
115 : IList<Expression<Func<MongoDbEventData, object>>> selectors = mongoIndex.Selectors.ToList();
116 : for (int i = 0; i < selectors.Count; i++)
117 : {
118 : Expression<Func<MongoDbEventData, object>> expression = selectors[i];
119 : if (mongoIndex.IsAcending)
120 : {
121 : if (i == 0)
122 : indexKey = indexKeysBuilder.Ascending(expression);
123 : else
124 : indexKey = indexKey.Ascending(expression);
125 : }
126 : else
127 : {
128 : if (i == 0)
129 : indexKey = indexKeysBuilder.Descending(expression);
130 : else
131 : indexKey = indexKey.Descending(expression);
132 : }
133 : }
134 :
135 : bool throwExceptions;
136 : if (!bool.TryParse(ConfigurationManager.GetSetting("Cqrs.MongoDb.EventStore.ThrowExceptionsOnIndexPreparation"), out throwExceptions))
137 : throwExceptions = true;
138 : try
139 : {
140 : MongoCollection.Indexes.CreateOne
141 : (
142 : indexKey,
143 : new CreateIndexOptions
144 : {
145 : Unique = mongoIndex.IsUnique,
146 : Name = mongoIndex.Name
147 : }
148 : );
149 :
150 : }
151 : catch
152 : {
153 : if (throwExceptions)
154 : throw;
155 : }
156 : }
157 :
158 : #region Overrides of EventStore<TAuthenticationToken>
159 :
160 : /// <summary>
161 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
162 : /// </summary>
163 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
164 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
165 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
166 : /// <param name="fromVersion">Load events starting from this version</param>
167 1 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
168 : {
169 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
170 :
171 : IEnumerable<MongoDbEventData> query = MongoCollection
172 : .AsQueryable()
173 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version > fromVersion)
174 : .OrderByDescending(eventData => eventData.Version);
175 :
176 : if (useLastEventOnly)
177 : query = query.AsQueryable().Take(1);
178 :
179 : return query
180 : .Select(EventDeserialiser.Deserialise)
181 : .ToList();
182 : }
183 :
184 : /// <summary>
185 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="version"/>.
186 : /// </summary>
187 : /// <param name="aggregateRootType"> <see cref="System.Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
188 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
189 : /// <param name="version">Load events up-to and including from this version</param>
190 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetToVersion(Type aggregateRootType, Guid aggregateId, int version)
191 : {
192 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
193 :
194 : IEnumerable<MongoDbEventData> query = MongoCollection
195 : .AsQueryable()
196 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version <= version)
197 : .OrderByDescending(eventData => eventData.Version);
198 :
199 : return query
200 : .Select(EventDeserialiser.Deserialise)
201 : .ToList();
202 : }
203 :
204 : /// <summary>
205 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="versionedDate"/>.
206 : /// </summary>
207 : /// <param name="aggregateRootType"> <see cref="System.Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
208 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
209 : /// <param name="versionedDate">Load events up-to and including from this <see cref="System.DateTime"/></param>
210 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetToDate(Type aggregateRootType, Guid aggregateId, DateTime versionedDate)
211 : {
212 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
213 :
214 : IEnumerable<MongoDbEventData> query = MongoCollection
215 : .AsQueryable()
216 : .Where(eventData => eventData.AggregateId == streamName && eventData.Timestamp <= versionedDate)
217 : .OrderByDescending(eventData => eventData.Version);
218 :
219 : return query
220 : .Select(EventDeserialiser.Deserialise)
221 : .ToList();
222 : }
223 :
224 : /// <summary>
225 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> from and including the provided <paramref name="fromVersionedDate"/> up to and including the provided <paramref name="toVersionedDate"/>.
226 : /// </summary>
227 : /// <param name="aggregateRootType"> <see cref="System.Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
228 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
229 : /// <param name="fromVersionedDate">Load events from and including from this <see cref="System.DateTime"/></param>
230 : /// <param name="toVersionedDate">Load events up-to and including from this <see cref="System.DateTime"/></param>
231 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetBetweenDates(Type aggregateRootType, Guid aggregateId, DateTime fromVersionedDate, DateTime toVersionedDate)
232 : {
233 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
234 :
235 : IEnumerable<MongoDbEventData> query = MongoCollection
236 : .AsQueryable()
237 : .Where(eventData => eventData.AggregateId == streamName && eventData.Timestamp >= fromVersionedDate && eventData.Timestamp <= toVersionedDate)
238 : .OrderByDescending(eventData => eventData.Version);
239 :
240 : return query
241 : .Select(EventDeserialiser.Deserialise)
242 : .ToList();
243 : }
244 :
245 : /// <summary>
246 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
247 : /// </summary>
248 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
249 1 : public override IEnumerable<EventData> Get(Guid correlationId)
250 : {
251 : IEnumerable<MongoDbEventData> query = MongoCollection
252 : .AsQueryable()
253 : .Where(eventData => eventData.CorrelationId == correlationId)
254 : .OrderBy(eventData => eventData.Timestamp);
255 :
256 : return query.ToList();
257 : }
258 :
259 : /// <summary>
260 : /// Persist the provided <paramref name="eventData"/> into storage.
261 : /// </summary>
262 : /// <param name="eventData">The <see cref="EventData"/> to persist.</param>
263 1 : protected override void PersistEvent(EventData eventData)
264 : {
265 : var safeEventData = eventData as MongoDbEventData;
266 : if (safeEventData == null)
267 : safeEventData = new MongoDbEventData(eventData);
268 : Logger.LogDebug("Adding an event to the MongoDB database", "MongoDbEventStore\\PersistEvent");
269 : try
270 : {
271 : DateTime start = DateTime.Now;
272 : MongoCollection.InsertOne(safeEventData);
273 : DateTime end = DateTime.Now;
274 : Logger.LogDebug(string.Format("Adding data in the MongoDB database took {0}.", end - start), "MongoDbEventStore\\PersistEvent");
275 : }
276 : finally
277 : {
278 : Logger.LogDebug("Adding an event to the MongoDB database... Done", "MongoDbEventStore\\PersistEvent");
279 : }
280 : }
281 :
282 : #endregion
283 : }
284 : }
|