Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Linq.Expressions;
13 : using System.Reflection;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Configuration;
16 : using Cqrs.Domain;
17 : using Cqrs.Events;
18 : using Cqrs.Messages;
19 : using Cqrs.MongoDB.DataStores.Indexes;
20 : using Cqrs.MongoDB.Events.Indexes;
21 : using Cqrs.MongoDB.Factories;
22 : using MongoDB.Bson.Serialization;
23 : using MongoDB.Driver;
24 : using MongoDB.Driver.Linq;
25 :
26 : namespace Cqrs.MongoDB.Events
27 : {
28 : /// <summary>
29 : /// A MongoDB based <see cref="EventStore{TAuthenticationToken}"/>.
30 : /// </summary>
31 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
32 : public class MongoDbEventStore<TAuthenticationToken> : EventStore<TAuthenticationToken>
33 1 : {
34 : /// <summary>
35 : /// Gets or sets the <see cref="IMongoCollection{TData}"/>
36 : /// </summary>
37 : protected IMongoCollection<MongoDbEventData> MongoCollection { get; private set; }
38 :
39 : /// <summary>
40 : /// Gets or sets the <see cref="IMongoDbEventStoreConnectionStringFactory"/>
41 : /// </summary>
42 : protected IMongoDbEventStoreConnectionStringFactory MongoDbEventStoreConnectionStringFactory { get; private set; }
43 :
44 : /// <summary>
45 : /// Gets or sets the <see cref="IConfigurationManager"/>
46 : /// </summary>
47 : protected IConfigurationManager ConfigurationManager { get; private set; }
48 :
49 : static MongoDbEventStore()
50 : {
51 : IDictionary<Type, IList<object>> randomCallToStartStaticProperty = MongoDbDataStoreFactory.IndexTypesByEntityType;
52 :
53 : foreach (Assembly assembly in AppDomain.CurrentDomain.GetAssemblies())
54 : {
55 : var eventTypes = assembly
56 : .GetTypes()
57 : .Where
58 : (
59 : type =>
60 : typeof(IEvent<TAuthenticationToken>).IsAssignableFrom(type)
61 : && !type.IsAbstract
62 : );
63 : foreach (Type eventType in eventTypes)
64 : BsonClassMap.LookupClassMap(eventType);
65 : }
66 : }
67 :
68 : /// <summary>
69 : /// Instantiate a new instance of <see cref="MongoDbEventStore{TAuthenticationToken}"/>
70 : /// triggering any require index checks.
71 : /// </summary>
72 1 : public MongoDbEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, IMongoDbEventStoreConnectionStringFactory mongoDbEventStoreConnectionStringFactory, IConfigurationManager configurationManager)
73 : : base(eventBuilder, eventDeserialiser, logger)
74 : {
75 : MongoDbEventStoreConnectionStringFactory = mongoDbEventStoreConnectionStringFactory;
76 : ConfigurationManager = configurationManager;
77 :
78 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
79 : MongoCollection = GetCollection();
80 : VerifyIndexes();
81 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
82 : }
83 :
84 : /// <summary>
85 : /// Get a <see cref="IMongoCollection{TDocument}"/>
86 : /// </summary>
87 1 : protected virtual IMongoCollection<MongoDbEventData> GetCollection()
88 : {
89 : var mongoClient = new MongoClient(MongoDbEventStoreConnectionStringFactory.GetEventStoreConnectionString());
90 : IMongoDatabase mongoDatabase = mongoClient.GetDatabase(MongoDbEventStoreConnectionStringFactory.GetEventStoreDatabaseName());
91 :
92 : return mongoDatabase.GetCollection<MongoDbEventData>(MongoDbEventStoreConnectionStringFactory.GetEventStoreDatabaseName());
93 : }
94 :
95 : /// <summary>
96 : /// Verify all required <see cref="MongoDbIndex{TEntity}"/> are defined and ready to go.
97 : /// </summary>
98 1 : protected virtual void VerifyIndexes()
99 : {
100 : VerifyIndex(new ByCorrelationIdMongoDbIndex());
101 : VerifyIndex(new ByAggregateIdAndVersionMongoDbIndex());
102 : VerifyIndex(new ByTimestampMongoDbIndex());
103 : VerifyIndex(new ByTimestampAndEventTypeMongoDbIndex());
104 : }
105 :
106 : /// <summary>
107 : /// Verify the provided <paramref name="mongoIndex"/> is defined and ready to go.
108 : /// </summary>
109 1 : protected virtual void VerifyIndex(MongoDbIndex<MongoDbEventData> mongoIndex)
110 : {
111 : IndexKeysDefinitionBuilder<MongoDbEventData> indexKeysBuilder = Builders<MongoDbEventData>.IndexKeys;
112 : IndexKeysDefinition<MongoDbEventData> indexKey = null;
113 :
114 : IList<Expression<Func<MongoDbEventData, object>>> selectors = mongoIndex.Selectors.ToList();
115 : for (int i = 0; i < selectors.Count; i++)
116 : {
117 : Expression<Func<MongoDbEventData, object>> expression = selectors[i];
118 : if (mongoIndex.IsAcending)
119 : {
120 : if (i == 0)
121 : indexKey = indexKeysBuilder.Ascending(expression);
122 : else
123 : indexKey = indexKey.Ascending(expression);
124 : }
125 : else
126 : {
127 : if (i == 0)
128 : indexKey = indexKeysBuilder.Descending(expression);
129 : else
130 : indexKey = indexKey.Descending(expression);
131 : }
132 : }
133 :
134 : bool throwExceptions;
135 : if (!bool.TryParse(ConfigurationManager.GetSetting("Cqrs.MongoDb.EventStore.ThrowExceptionsOnIndexPreparation"), out throwExceptions))
136 : throwExceptions = true;
137 : try
138 : {
139 : MongoCollection.Indexes.CreateOne
140 : (
141 : indexKey,
142 : new CreateIndexOptions
143 : {
144 : Unique = mongoIndex.IsUnique,
145 : Name = mongoIndex.Name
146 : }
147 : );
148 :
149 : }
150 : catch
151 : {
152 : if (throwExceptions)
153 : throw;
154 : }
155 : }
156 :
157 : #region Overrides of EventStore<TAuthenticationToken>
158 :
159 : /// <summary>
160 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
161 : /// </summary>
162 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
163 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
164 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
165 : /// <param name="fromVersion">Load events starting from this version</param>
166 1 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
167 : {
168 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
169 :
170 : IEnumerable<MongoDbEventData> query = MongoCollection
171 : .AsQueryable()
172 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version > fromVersion)
173 : .OrderByDescending(eventData => eventData.Version);
174 :
175 : if (useLastEventOnly)
176 : query = query.AsQueryable().Take(1);
177 :
178 : return query
179 : .Select(EventDeserialiser.Deserialise)
180 : .ToList();
181 : }
182 :
183 : /// <summary>
184 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
185 : /// </summary>
186 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
187 1 : public override IEnumerable<EventData> Get(Guid correlationId)
188 : {
189 : IEnumerable<MongoDbEventData> query = MongoCollection
190 : .AsQueryable()
191 : .Where(eventData => eventData.CorrelationId == correlationId)
192 : .OrderBy(eventData => eventData.Timestamp);
193 :
194 : return query.ToList();
195 : }
196 :
197 : /// <summary>
198 : /// Persist the provided <paramref name="eventData"/> into storage.
199 : /// </summary>
200 : /// <param name="eventData">The <see cref="EventData"/> to persist.</param>
201 1 : protected override void PersistEvent(EventData eventData)
202 : {
203 : var safeEventData = eventData as MongoDbEventData;
204 : if (safeEventData == null)
205 : safeEventData = new MongoDbEventData(eventData);
206 : Logger.LogDebug("Adding an event to the MongoDB database", "MongoDbEventStore\\PersistEvent");
207 : try
208 : {
209 : DateTime start = DateTime.Now;
210 : MongoCollection.InsertOne(safeEventData);
211 : DateTime end = DateTime.Now;
212 : Logger.LogDebug(string.Format("Adding data in the MongoDB database took {0}.", end - start), "MongoDbEventStore\\PersistEvent");
213 : }
214 : finally
215 : {
216 : Logger.LogDebug("Adding an event to the MongoDB database... Done", "MongoDbEventStore\\PersistEvent");
217 : }
218 : }
219 :
220 : #endregion
221 : }
222 : }
|