Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Configuration;
12 : using System.Data.Linq;
13 : using System.Linq;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Configuration;
16 : using Cqrs.DataStores;
17 : using Cqrs.Domain;
18 : using Cqrs.Entities;
19 : using Cqrs.Exceptions;
20 : using Cqrs.Messages;
21 :
22 : namespace Cqrs.Events
23 : {
24 : /// <summary>
25 : /// A simplified SqlServer based <see cref="EventStore{TAuthenticationToken}"/> that uses LinqToSql and follows a rigid schema.
26 : /// </summary>
27 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
28 : public class SqlEventStore<TAuthenticationToken>
29 : : EventStore<TAuthenticationToken>
30 1 : {
31 : internal const string SqlEventStoreDbFileOrServerOrConnectionApplicationKey = @"SqlEventStoreDbFileOrServerOrConnection";
32 :
33 : internal const string SqlEventStoreConnectionNameApplicationKey = @"Cqrs.SqlEventStore.ConnectionStringName";
34 :
35 : internal const string OldSqlEventStoreGetByCorrelationIdCommandTimeout = @"SqlEventStoreGetByCorrelationIdCommandTimeout";
36 :
37 : internal const string SqlEventStoreGetByCorrelationIdCommandTimeout = @"Cqrs.SqlEventStore.GetByCorrelationId.CommandTimeout";
38 :
39 : internal const string SqlEventStoreTableNameApplicationKeyPattern = @"Cqrs.SqlEventStore.CustomTableNames.{0}";
40 :
41 : /// <summary>
42 : /// Gets or sets the <see cref="IConfigurationManager"/>.
43 : /// </summary>
44 : protected IConfigurationManager ConfigurationManager { get; private set; }
45 :
46 : /// <summary>
47 : /// Instantiate a new instance of the <see cref="SqlEventStore{TAuthenticationToken}"/> class.
48 : /// </summary>
49 1 : public SqlEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, IConfigurationManager configurationManager)
50 : : base(eventBuilder, eventDeserialiser, logger)
51 : {
52 : ConfigurationManager = configurationManager;
53 : }
54 :
55 : #region Overrides of EventStore<TAuthenticationToken>
56 :
57 : /// <summary>
58 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
59 : /// </summary>
60 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
61 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
62 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
63 : /// <param name="fromVersion">Load events starting from this version</param>
64 1 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
65 : {
66 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
67 :
68 : using (DataContext dbDataContext = CreateDbDataContext(aggregateRootType.FullName))
69 : {
70 : IEnumerable<EventData> query = GetEventStoreTable(dbDataContext)
71 : .AsQueryable()
72 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version > fromVersion)
73 : .OrderByDescending(eventData => eventData.Version);
74 :
75 : if (useLastEventOnly)
76 : query = query.AsQueryable().Take(1);
77 :
78 : return query
79 : .Select(EventDeserialiser.Deserialise)
80 : .ToList();
81 : }
82 : }
83 :
84 : /// <summary>
85 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="version"/>.
86 : /// </summary>
87 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
88 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
89 : /// <param name="version">Load events up-to and including from this version</param>
90 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetToVersion(Type aggregateRootType, Guid aggregateId, int version)
91 : {
92 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
93 :
94 : using (DataContext dbDataContext = CreateDbDataContext(aggregateRootType.FullName))
95 : {
96 : IEnumerable<EventData> query = GetEventStoreTable(dbDataContext)
97 : .AsQueryable()
98 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version <= version)
99 : .OrderByDescending(eventData => eventData.Version);
100 :
101 : return query
102 : .Select(EventDeserialiser.Deserialise)
103 : .ToList();
104 : }
105 : }
106 :
107 : /// <summary>
108 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="versionedDate"/>.
109 : /// </summary>
110 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
111 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
112 : /// <param name="versionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
113 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetToDate(Type aggregateRootType, Guid aggregateId, DateTime versionedDate)
114 : {
115 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
116 :
117 : using (DataContext dbDataContext = CreateDbDataContext(aggregateRootType.FullName))
118 : {
119 : IEnumerable<EventData> query = GetEventStoreTable(dbDataContext)
120 : .AsQueryable()
121 : .Where(eventData => eventData.AggregateId == streamName && eventData.Timestamp <= versionedDate)
122 : .OrderByDescending(eventData => eventData.Version);
123 :
124 : return query
125 : .Select(EventDeserialiser.Deserialise)
126 : .ToList();
127 : }
128 : }
129 :
130 : /// <summary>
131 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> from and including the provided <paramref name="fromVersionedDate"/> up to and including the provided <paramref name="toVersionedDate"/>.
132 : /// </summary>
133 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
134 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
135 : /// <param name="fromVersionedDate">Load events from and including from this <see cref="DateTime"/></param>
136 : /// <param name="toVersionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
137 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetBetweenDates(Type aggregateRootType, Guid aggregateId, DateTime fromVersionedDate, DateTime toVersionedDate)
138 : {
139 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
140 :
141 : using (DataContext dbDataContext = CreateDbDataContext(aggregateRootType.FullName))
142 : {
143 : IEnumerable<EventData> query = GetEventStoreTable(dbDataContext)
144 : .AsQueryable()
145 : .Where(eventData => eventData.AggregateId == streamName && eventData.Timestamp >= fromVersionedDate && eventData.Timestamp <= toVersionedDate)
146 : .OrderByDescending(eventData => eventData.Version);
147 :
148 : return query
149 : .Select(EventDeserialiser.Deserialise)
150 : .ToList();
151 : }
152 : }
153 :
154 : /// <summary>
155 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
156 : /// </summary>
157 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
158 1 : public override IEnumerable<EventData> Get(Guid correlationId)
159 : {
160 : using (DataContext dbDataContext = CreateDbDataContext())
161 : {
162 : string commandTimeoutValue;
163 : int commandTimeout;
164 : bool found = ConfigurationManager.TryGetSetting(SqlEventStoreGetByCorrelationIdCommandTimeout, out commandTimeoutValue);
165 : if (!found)
166 : found = ConfigurationManager.TryGetSetting(OldSqlEventStoreGetByCorrelationIdCommandTimeout, out commandTimeoutValue);
167 : if (found && int.TryParse(commandTimeoutValue, out commandTimeout))
168 : dbDataContext.CommandTimeout = commandTimeout;
169 :
170 : IEnumerable<EventData> query = GetEventStoreTable(dbDataContext)
171 : .AsQueryable()
172 : .Where(eventData => eventData.CorrelationId == correlationId)
173 : .OrderBy(eventData => eventData.Timestamp);
174 :
175 : return query.ToList();
176 : }
177 : }
178 :
179 : /// <summary>
180 : /// Persist the provided <paramref name="eventData"/> into SQL Server.
181 : /// </summary>
182 : /// <param name="eventData">The <see cref="EventData"/> to persist.</param>
183 1 : protected override void PersistEvent(EventData eventData)
184 : {
185 : using (DataContext dbDataContext = CreateDbDataContext(eventData.AggregateId.Substring(0, eventData.AggregateId.IndexOf("/", StringComparison.InvariantCultureIgnoreCase))))
186 : {
187 : Add(dbDataContext, eventData);
188 : }
189 : }
190 :
191 : #endregion
192 :
193 : /// <summary>
194 : /// Creates a new <see cref="DataContext"/> using connection string settings from <see cref="ConfigurationManager"/>.
195 : /// </summary>
196 1 : protected virtual DataContext CreateDbDataContext(string aggregateRootTypeName = null)
197 : {
198 : string connectionStringKey;
199 : string applicationKey;
200 : if (!ConfigurationManager.TryGetSetting(SqlEventStoreConnectionNameApplicationKey, out applicationKey) || string.IsNullOrEmpty(applicationKey))
201 : {
202 : if (!ConfigurationManager.TryGetSetting(SqlEventStoreDbFileOrServerOrConnectionApplicationKey, out connectionStringKey) || string.IsNullOrEmpty(connectionStringKey))
203 : {
204 : if (!ConfigurationManager.TryGetSetting(SqlDataStore<Entity>.SqlDataStoreDbFileOrServerOrConnectionApplicationKey, out connectionStringKey) || string.IsNullOrEmpty(connectionStringKey))
205 : {
206 : throw new MissingApplicationSettingForConnectionStringException(SqlEventStoreConnectionNameApplicationKey);
207 : }
208 : }
209 : }
210 : else
211 : {
212 : ConnectionStringSettings connectionString = System.Configuration.ConfigurationManager.ConnectionStrings[applicationKey];
213 : if (connectionString == null)
214 : throw new MissingConnectionStringException(applicationKey);
215 : connectionStringKey = connectionString.ConnectionString;
216 : }
217 :
218 : string tableName;
219 : if (!string.IsNullOrWhiteSpace(aggregateRootTypeName) && ConfigurationManager.TryGetSetting(string.Format(SqlEventStoreTableNameApplicationKeyPattern, aggregateRootTypeName), out tableName) && !string.IsNullOrEmpty(tableName))
220 : {
221 : bool autoname;
222 : if (bool.TryParse(tableName, out autoname))
223 : {
224 : if (autoname)
225 : return SqlEventStoreDataContext.New<EventData>(aggregateRootTypeName.Replace(".", "_"), connectionStringKey);
226 : }
227 : else
228 : return SqlEventStoreDataContext.New<EventData>(tableName, connectionStringKey);
229 : }
230 :
231 : return new SqlEventStoreDataContext(connectionStringKey);
232 : }
233 :
234 : /// <summary>
235 : /// Gets the <see cref="Table{TEntity}"/> of <see cref="EventData"/>.
236 : /// </summary>
237 : /// <param name="dbDataContext">The <see cref="DataContext"/> to use.</param>
238 1 : protected virtual Table<EventData> GetEventStoreTable(DataContext dbDataContext)
239 : {
240 : // Get a typed table to run queries.
241 : return dbDataContext.GetTable<EventData>();
242 : }
243 :
244 : /// <summary>
245 : /// Persist the provided <paramref name="data"/> into SQL Server using the provided <paramref name="dbDataContext"/>.
246 : /// </summary>
247 1 : protected virtual void Add(DataContext dbDataContext, EventData data)
248 : {
249 : Logger.LogDebug("Adding data to the SQL eventstore database", "SqlEventStore\\Add");
250 : try
251 : {
252 : DateTime start = DateTime.Now;
253 : GetEventStoreTable(dbDataContext).InsertOnSubmit(data);
254 : dbDataContext.SubmitChanges();
255 : DateTime end = DateTime.Now;
256 : Logger.LogDebug(string.Format("Adding data in the SQL eventstore database took {0}.", end - start), "SqlEventStore\\Add");
257 : }
258 : catch (Exception exception)
259 : {
260 : Logger.LogError("There was an issue persisting data to the SQL event store.", exception: exception);
261 : throw;
262 : }
263 : finally
264 : {
265 : Logger.LogDebug("Adding data to the SQL eventstore database... Done", "SqlEventStore\\Add");
266 : }
267 : }
268 : }
269 : }
|