Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using cdmdotnet.Logging;
13 : using Cqrs.Domain;
14 : using Cqrs.Events;
15 : using Cqrs.Messages;
16 : using Microsoft.WindowsAzure.Storage.Table;
17 :
18 : namespace Cqrs.Azure.BlobStorage.Events
19 : {
20 : /// <summary>
21 : /// An Azure Storage based <see cref="EventStore{TAuthenticationToken}"/>.
22 : /// </summary>
23 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
24 : public class TableStorageEventStore<TAuthenticationToken>
25 : : EventStore<TAuthenticationToken>
26 1 : {
27 : /// <summary>
28 : /// The pattern used to generate the stream name.
29 : /// </summary>
30 : protected const string TableCqrsEventStoreStreamNamePattern = "{0}.{1}";
31 :
32 : /// <summary>
33 : /// Gets or sets the underlying <see cref="TableStorageStore"/> used for persisting and reading <see cref="IEvent{TAuthenticationToken}"/> data.
34 : /// </summary>
35 : protected RawTableStorageEventStore TableStorageStore { get; set; }
36 :
37 : /// <summary>
38 : /// Gets or sets the underlying <see cref="TableStorageStore"/> used specifically for <see cref="Get(Guid)"/>.
39 : /// </summary>
40 : protected RawTableStorageEventStore CorrelationIdTableStorageStore { get; set; }
41 :
42 : /// <summary>
43 : /// Initializes a new instance of the <see cref="TableStorageEventStore{TAuthenticationToken}"/> class using the specified container.
44 : /// </summary>
45 1 : public TableStorageEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, ITableStorageStoreConnectionStringFactory tableStorageEventStoreConnectionStringFactory, Func<ILogger, ITableStorageStoreConnectionStringFactory, bool, RawTableStorageEventStore> createRawTableStorageEventStoreFunction = null)
46 : : base(eventBuilder, eventDeserialiser, logger)
47 : {
48 : if (createRawTableStorageEventStoreFunction == null)
49 : createRawTableStorageEventStoreFunction = (logger1, tableStorageEventStoreConnectionStringFactory1, isCorrelationIdTableStorageStore) => new RawTableStorageEventStore(logger1, tableStorageEventStoreConnectionStringFactory1, isCorrelationIdTableStorageStore);
50 : TableStorageStore = createRawTableStorageEventStoreFunction(logger, tableStorageEventStoreConnectionStringFactory, false);
51 : CorrelationIdTableStorageStore = createRawTableStorageEventStoreFunction(logger, tableStorageEventStoreConnectionStringFactory, true);
52 : }
53 :
54 : #region Overrides of EventStore<TAuthenticationToken>
55 :
56 : /// <summary>
57 : /// Generate a unique stream name based on the provided <paramref name="aggregateRootType"/> and the <paramref name="aggregateId"/>.
58 : /// </summary>
59 : /// <param name="aggregateRootType">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
60 : /// <param name="aggregateId">The ID of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
61 1 : protected override string GenerateStreamName(Type aggregateRootType, Guid aggregateId)
62 : {
63 : return string.Format(TableCqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
64 : }
65 :
66 : /// <summary>
67 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
68 : /// </summary>
69 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
70 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
71 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
72 : /// <param name="fromVersion">Load events starting from this version</param>
73 1 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
74 : {
75 : string streamName = GenerateStreamName(aggregateRootType, aggregateId);
76 :
77 : // Create the table query.
78 : var rangeQuery = new TableQuery<EventDataTableEntity<EventData>>().Where
79 : (
80 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(streamName))
81 : );
82 :
83 : IEnumerable<EventData> query = TableStorageStore.ReadableSource.ExecuteQuery(rangeQuery)
84 : .Select(eventData => eventData.EventData)
85 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version > fromVersion)
86 : .OrderByDescending(eventData => eventData.Version);
87 :
88 : if (useLastEventOnly)
89 : query = query.AsQueryable().Take(1);
90 :
91 : return query
92 : .Select(eventData => EventDeserialiser.Deserialise(eventData))
93 : .ToList();
94 : }
95 :
96 : /// <summary>
97 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
98 : /// </summary>
99 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
100 1 : public override IEnumerable<EventData> Get(Guid correlationId)
101 : {
102 : // Create the table query.
103 : var rangeQuery = new TableQuery<EventDataTableEntity<EventData>>().Where
104 : (
105 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(correlationId.ToString("N")))
106 : );
107 :
108 : IEnumerable<EventData> query = CorrelationIdTableStorageStore.ReadableSource.ExecuteQuery(rangeQuery)
109 : .Select(eventData => eventData.EventData)
110 : .OrderBy(eventData => eventData.Timestamp);
111 :
112 : return query.ToList();
113 : }
114 :
115 : /// <summary>
116 : /// Persist the provided <paramref name="eventData"/> into storage.
117 : /// </summary>
118 : /// <param name="eventData">The <see cref="EventData"/> to persist.</param>
119 1 : protected override void PersistEvent(EventData eventData)
120 : {
121 : Logger.LogDebug("Adding data to the table storage event-store aggregate folder", "TableStorageStore\\Add");
122 : TableStorageStore.Add(eventData);
123 : Logger.LogDebug("Adding data to the table storage event-store by-correlation folder", "TableStorageStore\\Add");
124 : CorrelationIdTableStorageStore.Add(eventData);
125 : }
126 :
127 : #endregion
128 :
129 : /// <summary>
130 : /// An Azure Storage based <see cref="Cqrs.Azure.BlobStorage.TableStorageStore{TData,TCollectionItemData}"/>.
131 : /// </summary>
132 : public class RawTableStorageEventStore
133 : : TableStorageStore<EventDataTableEntity<EventData>, EventData>
134 1 : {
135 : private string TableName { get; set; }
136 :
137 : /// <summary>
138 : /// Indicates if this is a <see cref="Cqrs.Azure.BlobStorage.TableStorageStore{TData,TCollectionItemData}"/>
139 : /// for <see cref="IEventStore{TAuthenticationToken}.Get(Guid)"/>
140 : /// </summary>
141 : protected bool IsCorrelationIdTableStorageStore { get; set; }
142 :
143 : /// <summary>
144 : /// Initializes a new instance of the <see cref="RawTableStorageEventStore"/> class using the specified container.
145 : /// </summary>
146 1 : public RawTableStorageEventStore(ILogger logger, ITableStorageStoreConnectionStringFactory tableStorageEventStoreConnectionStringFactory, bool isCorrelationIdTableStorageStore = false)
147 : : base(logger)
148 : {
149 : GetContainerName = tableStorageEventStoreConnectionStringFactory.GetBaseContainerName;
150 : IsContainerPublic = () => false;
151 :
152 : IsCorrelationIdTableStorageStore = isCorrelationIdTableStorageStore;
153 : TableName = IsCorrelationIdTableStorageStore ? "EventStoreByCorrelationId" : "EventStore";
154 :
155 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
156 : Initialise(tableStorageEventStoreConnectionStringFactory);
157 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
158 : }
159 :
160 : #region Overrides of StorageStore<EventData,CloudTable>
161 :
162 : /// <summary>
163 : /// Returns <see cref="TableName"/>.
164 : /// </summary>
165 : /// <param name="sourceName">Is not used.</param>
166 : /// <returns><see cref="TableName"/></returns>
167 1 : protected override string GetSafeSourceName(string sourceName)
168 : {
169 : return TableName;
170 : }
171 :
172 : #endregion
173 :
174 : #region Overrides of TableStorageStore<EventData>
175 :
176 : /// <summary>
177 : /// Creates a new <see cref="EventDataTableEntity{TEventData}"/>.
178 : /// </summary>
179 1 : protected override ITableEntity CreateTableEntity(EventData data)
180 : {
181 : return new EventDataTableEntity<EventData>(data, IsCorrelationIdTableStorageStore);
182 : }
183 :
184 : /// <summary>
185 : /// Will mark the <paramref name="data"/> as logically (or soft).
186 : /// </summary>
187 1 : public override void Remove(EventData data)
188 : {
189 : throw new InvalidOperationException("Event store entries are not deletable.");
190 : }
191 :
192 : /// <summary>
193 : /// Will throw an <see cref="InvalidOperationException"/> as this is not supported.
194 : /// </summary>
195 1 : protected override TableOperation GetUpdatableTableEntity(EventData data)
196 : {
197 : throw new InvalidOperationException("Event store entries are not updateable.");
198 : }
199 :
200 : /// <summary>
201 : /// Will throw an <see cref="InvalidOperationException"/> as this is not supported.
202 : /// </summary>
203 1 : protected override TableOperation GetUpdatableTableEntity(EventDataTableEntity<EventData> data)
204 : {
205 : return GetUpdatableTableEntity(data.EventData);
206 : }
207 :
208 : #endregion
209 : }
210 : }
211 : }
|