Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using cdmdotnet.Logging;
13 : using Cqrs.Domain;
14 : using Cqrs.Events;
15 : using Cqrs.Messages;
16 : using Microsoft.WindowsAzure.Storage.Table;
17 :
18 : namespace Cqrs.Azure.BlobStorage.Events
19 : {
20 : /// <summary>
21 : /// An Azure Storage based <see cref="EventStore{TAuthenticationToken}"/>.
22 : /// </summary>
23 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
24 : public class TableStorageEventStore<TAuthenticationToken>
25 : : EventStore<TAuthenticationToken>
26 1 : {
27 : /// <summary>
28 : /// The pattern used to generate the stream name.
29 : /// </summary>
30 : protected const string TableCqrsEventStoreStreamNamePattern = "{0}.{1}";
31 :
32 : /// <summary>
33 : /// Gets or sets the underlying <see cref="TableStorageStore"/> used for persisting and reading <see cref="IEvent{TAuthenticationToken}"/> data.
34 : /// </summary>
35 : protected RawTableStorageEventStore TableStorageStore { get; set; }
36 :
37 : /// <summary>
38 : /// Gets or sets the underlying <see cref="TableStorageStore"/> used specifically for <see cref="Get(Guid)"/>.
39 : /// </summary>
40 : protected RawTableStorageEventStore CorrelationIdTableStorageStore { get; set; }
41 :
42 : /// <summary>
43 : /// Initializes a new instance of the <see cref="TableStorageEventStore{TAuthenticationToken}"/> class using the specified container.
44 : /// </summary>
45 1 : public TableStorageEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, ITableStorageStoreConnectionStringFactory tableStorageEventStoreConnectionStringFactory, Func<ILogger, ITableStorageStoreConnectionStringFactory, bool, RawTableStorageEventStore> createRawTableStorageEventStoreFunction = null)
46 : : base(eventBuilder, eventDeserialiser, logger)
47 : {
48 : if (createRawTableStorageEventStoreFunction == null)
49 : createRawTableStorageEventStoreFunction = (logger1, tableStorageEventStoreConnectionStringFactory1, isCorrelationIdTableStorageStore) => new RawTableStorageEventStore(logger1, tableStorageEventStoreConnectionStringFactory1, isCorrelationIdTableStorageStore);
50 : TableStorageStore = createRawTableStorageEventStoreFunction(logger, tableStorageEventStoreConnectionStringFactory, false);
51 : CorrelationIdTableStorageStore = createRawTableStorageEventStoreFunction(logger, tableStorageEventStoreConnectionStringFactory, true);
52 : }
53 :
54 : #region Overrides of EventStore<TAuthenticationToken>
55 :
56 : /// <summary>
57 : /// Generate a unique stream name based on the provided <paramref name="aggregateRootType"/> and the <paramref name="aggregateId"/>.
58 : /// </summary>
59 : /// <param name="aggregateRootType">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
60 : /// <param name="aggregateId">The ID of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
61 1 : protected override string GenerateStreamName(Type aggregateRootType, Guid aggregateId)
62 : {
63 : return string.Format(TableCqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
64 : }
65 :
66 : /// <summary>
67 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
68 : /// </summary>
69 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
70 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
71 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
72 : /// <param name="fromVersion">Load events starting from this version</param>
73 1 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
74 : {
75 : string streamName = GenerateStreamName(aggregateRootType, aggregateId);
76 :
77 : // Create the table query.
78 : var rangeQuery = new TableQuery<EventDataTableEntity<EventData>>().Where
79 : (
80 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(streamName))
81 : );
82 :
83 : IEnumerable<EventData> query = TableStorageStore.ReadableSource.ExecuteQuery(rangeQuery)
84 : .Select(eventData => eventData.EventData)
85 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version > fromVersion)
86 : .OrderByDescending(eventData => eventData.Version);
87 :
88 : if (useLastEventOnly)
89 : query = query.AsQueryable().Take(1);
90 :
91 : return query
92 : .Select(eventData => EventDeserialiser.Deserialise(eventData))
93 : .ToList();
94 : }
95 :
96 : /// <summary>
97 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="version"/>.
98 : /// </summary>
99 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
100 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
101 : /// <param name="version">Load events up-to and including from this version</param>
102 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetToVersion(Type aggregateRootType, Guid aggregateId, int version)
103 : {
104 : string streamName = GenerateStreamName(aggregateRootType, aggregateId);
105 :
106 : // Create the table query.
107 : var rangeQuery = new TableQuery<EventDataTableEntity<EventData>>().Where
108 : (
109 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(streamName))
110 : );
111 :
112 : IEnumerable<EventData> query = TableStorageStore.ReadableSource.ExecuteQuery(rangeQuery)
113 : .Select(eventData => eventData.EventData)
114 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version <= version)
115 : .OrderByDescending(eventData => eventData.Version);
116 :
117 : return query
118 : .Select(eventData => EventDeserialiser.Deserialise(eventData))
119 : .ToList();
120 : }
121 :
122 : /// <summary>
123 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="versionedDate"/>.
124 : /// </summary>
125 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
126 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
127 : /// <param name="versionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
128 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetToDate(Type aggregateRootType, Guid aggregateId, DateTime versionedDate)
129 : {
130 : string streamName = GenerateStreamName(aggregateRootType, aggregateId);
131 :
132 : // Create the table query.
133 : var rangeQuery = new TableQuery<EventDataTableEntity<EventData>>().Where
134 : (
135 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(streamName))
136 : );
137 :
138 : IEnumerable<EventData> query = TableStorageStore.ReadableSource.ExecuteQuery(rangeQuery)
139 : .Select(eventData => eventData.EventData)
140 : .Where(eventData => eventData.AggregateId == streamName && eventData.Timestamp <= versionedDate)
141 : .OrderByDescending(eventData => eventData.Version);
142 :
143 : return query
144 : .Select(eventData => EventDeserialiser.Deserialise(eventData))
145 : .ToList();
146 : }
147 :
148 : /// <summary>
149 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> from and including the provided <paramref name="fromVersionedDate"/> up to and including the provided <paramref name="toVersionedDate"/>.
150 : /// </summary>
151 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
152 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
153 : /// <param name="fromVersionedDate">Load events from and including from this <see cref="DateTime"/></param>
154 : /// <param name="toVersionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
155 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetBetweenDates(Type aggregateRootType, Guid aggregateId, DateTime fromVersionedDate, DateTime toVersionedDate)
156 : {
157 : string streamName = GenerateStreamName(aggregateRootType, aggregateId);
158 :
159 : // Create the table query.
160 : var rangeQuery = new TableQuery<EventDataTableEntity<EventData>>().Where
161 : (
162 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(streamName))
163 : );
164 :
165 : IEnumerable<EventData> query = TableStorageStore.ReadableSource.ExecuteQuery(rangeQuery)
166 : .Select(eventData => eventData.EventData)
167 : .Where(eventData => eventData.AggregateId == streamName && eventData.Timestamp >= fromVersionedDate && eventData.Timestamp <= toVersionedDate)
168 : .OrderByDescending(eventData => eventData.Version);
169 :
170 : return query
171 : .Select(eventData => EventDeserialiser.Deserialise(eventData))
172 : .ToList();
173 : }
174 :
175 : /// <summary>
176 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
177 : /// </summary>
178 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
179 1 : public override IEnumerable<EventData> Get(Guid correlationId)
180 : {
181 : // Create the table query.
182 : var rangeQuery = new TableQuery<EventDataTableEntity<EventData>>().Where
183 : (
184 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(correlationId.ToString("N")))
185 : );
186 :
187 : IEnumerable<EventData> query = CorrelationIdTableStorageStore.ReadableSource.ExecuteQuery(rangeQuery)
188 : .Select(eventData => eventData.EventData)
189 : .OrderBy(eventData => eventData.Timestamp);
190 :
191 : return query.ToList();
192 : }
193 :
194 : /// <summary>
195 : /// Persist the provided <paramref name="eventData"/> into storage.
196 : /// </summary>
197 : /// <param name="eventData">The <see cref="EventData"/> to persist.</param>
198 1 : protected override void PersistEvent(EventData eventData)
199 : {
200 : Logger.LogDebug("Adding data to the table storage event-store aggregate folder", "TableStorageStore\\Add");
201 : TableStorageStore.Add(eventData);
202 : Logger.LogDebug("Adding data to the table storage event-store by-correlation folder", "TableStorageStore\\Add");
203 : CorrelationIdTableStorageStore.Add(eventData);
204 : }
205 :
206 : #endregion
207 :
208 : /// <summary>
209 : /// An Azure Storage based <see cref="Cqrs.Azure.BlobStorage.TableStorageStore{TData,TCollectionItemData}"/>.
210 : /// </summary>
211 : public class RawTableStorageEventStore
212 : : TableStorageStore<EventDataTableEntity<EventData>, EventData>
213 1 : {
214 : private string TableName { get; set; }
215 :
216 : /// <summary>
217 : /// Indicates if this is a <see cref="Cqrs.Azure.BlobStorage.TableStorageStore{TData,TCollectionItemData}"/>
218 : /// for <see cref="IEventStore{TAuthenticationToken}.Get(Guid)"/>
219 : /// </summary>
220 : protected bool IsCorrelationIdTableStorageStore { get; set; }
221 :
222 : /// <summary>
223 : /// Initializes a new instance of the <see cref="RawTableStorageEventStore"/> class using the specified container.
224 : /// </summary>
225 1 : public RawTableStorageEventStore(ILogger logger, ITableStorageStoreConnectionStringFactory tableStorageEventStoreConnectionStringFactory, bool isCorrelationIdTableStorageStore = false)
226 : : base(logger)
227 : {
228 : GetContainerName = tableStorageEventStoreConnectionStringFactory.GetBaseContainerName;
229 : IsContainerPublic = () => false;
230 :
231 : IsCorrelationIdTableStorageStore = isCorrelationIdTableStorageStore;
232 : TableName = IsCorrelationIdTableStorageStore ? "EventStoreByCorrelationId" : "EventStore";
233 :
234 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
235 : Initialise(tableStorageEventStoreConnectionStringFactory);
236 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
237 : }
238 :
239 : #region Overrides of StorageStore<EventData,CloudTable>
240 :
241 : /// <summary>
242 : /// Returns <see cref="TableName"/>.
243 : /// </summary>
244 : /// <param name="sourceName">Is not used.</param>
245 : /// <returns><see cref="TableName"/></returns>
246 1 : protected override string GetSafeSourceName(string sourceName)
247 : {
248 : return TableName;
249 : }
250 :
251 : #endregion
252 :
253 : #region Overrides of TableStorageStore<EventData>
254 :
255 : /// <summary>
256 : /// Creates a new <see cref="EventDataTableEntity{TEventData}"/>.
257 : /// </summary>
258 1 : protected override ITableEntity CreateTableEntity(EventData data)
259 : {
260 : return new EventDataTableEntity<EventData>(data, IsCorrelationIdTableStorageStore);
261 : }
262 :
263 : /// <summary>
264 : /// Will mark the <paramref name="data"/> as logically (or soft).
265 : /// </summary>
266 1 : public override void Remove(EventData data)
267 : {
268 : throw new InvalidOperationException("Event store entries are not deletable.");
269 : }
270 :
271 : /// <summary>
272 : /// Will throw an <see cref="InvalidOperationException"/> as this is not supported.
273 : /// </summary>
274 1 : protected override TableOperation GetUpdatableTableEntity(EventData data)
275 : {
276 : throw new InvalidOperationException("Event store entries are not updateable.");
277 : }
278 :
279 : /// <summary>
280 : /// Will throw an <see cref="InvalidOperationException"/> as this is not supported.
281 : /// </summary>
282 1 : protected override TableOperation GetUpdatableTableEntity(EventDataTableEntity<EventData> data)
283 : {
284 : return GetUpdatableTableEntity(data.EventData);
285 : }
286 :
287 : #endregion
288 : }
289 : }
290 : }
|