Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Linq.Expressions;
13 : using System.Reflection;
14 : using Chinchilla.Logging;
15 : using Cqrs.Configuration;
16 : using Cqrs.Domain;
17 : using Cqrs.Events;
18 : using Cqrs.MongoDB.DataStores.Indexes;
19 : using Cqrs.MongoDB.Events.Indexes;
20 : using Cqrs.MongoDB.Factories;
21 : using Cqrs.Snapshots;
22 : using MongoDB.Bson.Serialization;
23 : using MongoDB.Driver;
24 : using MongoDB.Driver.Linq;
25 :
26 : namespace Cqrs.MongoDB.Events
27 : {
28 : /// <summary>
29 : /// Stores the most recent <see cref="Snapshot">snapshots</see> for replay and <see cref="IAggregateRoot{TAuthenticationToken}"/> rehydration on a <see cref="SnapshotAggregateRoot{TAuthenticationToken,TSnapshot}"/> in MongoDB.
30 : /// </summary>
31 : public class MongoDbSnapshotStore
32 : : SnapshotStore
33 1 : {
34 : /// <summary>
35 : /// Gets or sets the <see cref="IMongoCollection{TData}"/>
36 : /// </summary>
37 : protected IMongoCollection<MongoDbEventData> MongoCollection { get; private set; }
38 :
39 : /// <summary>
40 : /// Gets or sets the <see cref="IMongoDbEventStoreConnectionStringFactory"/>
41 : /// </summary>
42 : protected IMongoDbSnapshotStoreConnectionStringFactory MongoDbSnapshotStoreConnectionStringFactory { get; private set; }
43 :
44 : static MongoDbSnapshotStore()
45 : {
46 : IDictionary<Type, IList<object>> randomCallToStartStaticProperty = MongoDbDataStoreFactory.IndexTypesByEntityType;
47 :
48 : foreach (Assembly assembly in AppDomain.CurrentDomain.GetAssemblies())
49 : {
50 : var eventTypes = assembly
51 : .GetTypes()
52 : .Where
53 : (
54 : type =>
55 : typeof(EventData).IsAssignableFrom(type)
56 : && !type.IsAbstract
57 : );
58 : foreach (Type eventType in eventTypes)
59 : BsonClassMap.LookupClassMap(eventType);
60 : }
61 : }
62 :
63 : /// <summary>
64 : /// Instantiate a new instance of <see cref="MongoDbEventStore{TAuthenticationToken}"/>
65 : /// triggering any require index checks.
66 : /// </summary>
67 1 : public MongoDbSnapshotStore(IConfigurationManager configurationManager, ISnapshotDeserialiser eventDeserialiser, ILogger logger, ICorrelationIdHelper correlationIdHelper, ISnapshotBuilder snapshotBuilder, IMongoDbSnapshotStoreConnectionStringFactory mongoDbSnapshotStoreConnectionStringFactory)
68 : : base(configurationManager, eventDeserialiser, snapshotBuilder, logger, correlationIdHelper)
69 : {
70 : MongoDbSnapshotStoreConnectionStringFactory = mongoDbSnapshotStoreConnectionStringFactory;
71 :
72 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
73 : MongoCollection = GetCollection();
74 : VerifyIndexes();
75 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
76 : }
77 :
78 : /// <summary>
79 : /// Get a <see cref="IMongoCollection{TDocument}"/>
80 : /// </summary>
81 1 : protected virtual IMongoCollection<MongoDbEventData> GetCollection()
82 : {
83 : var mongoClient = new MongoClient(MongoDbSnapshotStoreConnectionStringFactory.GetSnapshotStoreConnectionString());
84 : IMongoDatabase mongoDatabase = mongoClient.GetDatabase(MongoDbSnapshotStoreConnectionStringFactory.GetSnapshotStoreDatabaseName());
85 :
86 : return mongoDatabase.GetCollection<MongoDbEventData>(MongoDbSnapshotStoreConnectionStringFactory.GetSnapshotStoreDatabaseName());
87 : }
88 :
89 : /// <summary>
90 : /// Verify all required <see cref="MongoDbIndex{TEntity}"/> are defined and ready to go.
91 : /// </summary>
92 1 : protected virtual void VerifyIndexes()
93 : {
94 : VerifyIndex(new ByCorrelationIdMongoDbIndex());
95 : VerifyIndex(new ByAggregateIdAndVersionMongoDbIndex());
96 : VerifyIndex(new ByTimestampMongoDbIndex());
97 : VerifyIndex(new ByTimestampAndEventTypeMongoDbIndex());
98 : }
99 :
100 : /// <summary>
101 : /// Verify the provided <paramref name="mongoIndex"/> is defined and ready to go.
102 : /// </summary>
103 1 : protected virtual void VerifyIndex(MongoDbIndex<MongoDbEventData> mongoIndex)
104 : {
105 : IndexKeysDefinitionBuilder<MongoDbEventData> indexKeysBuilder = Builders<MongoDbEventData>.IndexKeys;
106 : IndexKeysDefinition<MongoDbEventData> indexKey = null;
107 :
108 : IList<Expression<Func<MongoDbEventData, object>>> selectors = mongoIndex.Selectors.ToList();
109 : for (int i = 0; i < selectors.Count; i++)
110 : {
111 : Expression<Func<MongoDbEventData, object>> expression = selectors[i];
112 : if (mongoIndex.IsAcending)
113 : {
114 : if (i == 0)
115 : indexKey = indexKeysBuilder.Ascending(expression);
116 : else
117 : indexKey = indexKey.Ascending(expression);
118 : }
119 : else
120 : {
121 : if (i == 0)
122 : indexKey = indexKeysBuilder.Descending(expression);
123 : else
124 : indexKey = indexKey.Descending(expression);
125 : }
126 : }
127 :
128 : bool throwExceptions;
129 : if (!bool.TryParse(ConfigurationManager.GetSetting("Cqrs.MongoDb.SnapshotStore.ThrowExceptionsOnIndexPreparation"), out throwExceptions))
130 : throwExceptions = true;
131 : try
132 : {
133 : MongoCollection.Indexes.CreateOne
134 : (
135 : indexKey,
136 : new CreateIndexOptions
137 : {
138 : Unique = mongoIndex.IsUnique,
139 : Name = mongoIndex.Name
140 : }
141 : );
142 :
143 : }
144 : catch
145 : {
146 : if (throwExceptions)
147 : throw;
148 : }
149 : }
150 :
151 : #region Implementation of ISnapshotStore
152 :
153 : /// <summary>
154 : /// Get the latest <see cref="Snapshot"/> from storage.
155 : /// </summary>
156 : /// <returns>The most recent <see cref="Snapshot"/> of</returns>
157 1 : protected override Snapshot Get(Type aggregateRootType, string streamName)
158 : {
159 : MongoDbEventData query = MongoCollection
160 : .AsQueryable()
161 : .Where(snapshot => snapshot.AggregateId == streamName)
162 : .OrderByDescending(eventData => eventData.Version)
163 : .Take(1)
164 : .SingleOrDefault();
165 :
166 : return EventDeserialiser.Deserialise(query);
167 : }
168 :
169 : /// <summary>
170 : /// Saves the provided <paramref name="snapshot"/> into storage.
171 : /// </summary>
172 : /// <param name="snapshot">the <see cref="Snapshot"/> to save and store.</param>
173 1 : public override void Save(Snapshot snapshot)
174 : {
175 : EventData eventData = BuildEventData(snapshot);
176 : var safeEventData = new MongoDbEventData(eventData);
177 :
178 : Logger.LogDebug("Adding an event to the MongoDB database", "MongoDbSnapshotStore\\Save");
179 : try
180 : {
181 : DateTime start = DateTime.Now;
182 : MongoCollection.InsertOne(safeEventData);
183 : DateTime end = DateTime.Now;
184 : Logger.LogDebug(string.Format("Adding data in the MongoDB database took {0}.", end - start), "MongoDbSnapshotStore\\Save");
185 : }
186 : finally
187 : {
188 : Logger.LogDebug("Adding an event to the MongoDB database... Done", "MongoDbSnapshotStore\\Save");
189 : }
190 : }
191 :
192 : #endregion
193 : }
194 : }
|