Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using cdmdotnet.Logging;
13 : using Cqrs.Events;
14 : using Microsoft.WindowsAzure.Storage.Table;
15 :
16 : namespace Cqrs.Azure.BlobStorage.Events
17 : {
18 : public class TableStorageEventStore<TAuthenticationToken>
19 : : EventStore<TAuthenticationToken>
20 0 : {
21 : protected const string TableCqrsEventStoreStreamNamePattern = "{0}.{1}";
22 :
23 : protected RawTableStorageEventStore TableStorageStore { get; set; }
24 :
25 : protected RawTableStorageEventStore CorrelationIdTableStorageStore { get; set; }
26 :
27 : /// <summary>
28 : /// Initializes a new instance of the <see cref="TableStorageEventStore{TAuthenticationToken}"/> class using the specified container.
29 : /// </summary>
30 1 : public TableStorageEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, ITableStorageStoreConnectionStringFactory tableStorageEventStoreConnectionStringFactory, Func<ILogger, ITableStorageStoreConnectionStringFactory, bool, RawTableStorageEventStore> createRawTableStorageEventStoreFunction = null)
31 : : base(eventBuilder, eventDeserialiser, logger)
32 : {
33 : if (createRawTableStorageEventStoreFunction == null)
34 : createRawTableStorageEventStoreFunction = (logger1, tableStorageEventStoreConnectionStringFactory1, isCorrelationIdTableStorageStore) => new RawTableStorageEventStore(logger1, tableStorageEventStoreConnectionStringFactory1, isCorrelationIdTableStorageStore);
35 : TableStorageStore = createRawTableStorageEventStoreFunction(logger, tableStorageEventStoreConnectionStringFactory, false);
36 : CorrelationIdTableStorageStore = createRawTableStorageEventStoreFunction(logger, tableStorageEventStoreConnectionStringFactory, true);
37 : }
38 :
39 : #region Overrides of EventStore<TAuthenticationToken>
40 :
41 0 : protected override string GenerateStreamName(Type aggregateRootType, Guid aggregateId)
42 : {
43 : return string.Format(TableCqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
44 : }
45 :
46 0 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
47 : {
48 : string streamName = GenerateStreamName(aggregateRootType, aggregateId);
49 :
50 : // Create the table query.
51 : var rangeQuery = new TableQuery<EventDataTableEntity<EventData>>().Where
52 : (
53 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(streamName))
54 : );
55 :
56 : IEnumerable<EventData> query = TableStorageStore.ReadableSource.ExecuteQuery(rangeQuery)
57 : .Select(eventData => eventData.EventData)
58 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version > fromVersion)
59 : .OrderByDescending(eventData => eventData.Version);
60 :
61 : if (useLastEventOnly)
62 : query = query.AsQueryable().Take(1);
63 :
64 : return query
65 : .Select(eventData => EventDeserialiser.Deserialise(eventData))
66 : .ToList();
67 : }
68 :
69 0 : public override IEnumerable<EventData> Get(Guid correlationId)
70 : {
71 : // Create the table query.
72 : var rangeQuery = new TableQuery<EventDataTableEntity<EventData>>().Where
73 : (
74 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(correlationId.ToString("N")))
75 : );
76 :
77 : IEnumerable<EventData> query = CorrelationIdTableStorageStore.ReadableSource.ExecuteQuery(rangeQuery)
78 : .Select(eventData => eventData.EventData)
79 : .OrderBy(eventData => eventData.Timestamp);
80 :
81 : return query.ToList();
82 : }
83 :
84 0 : protected override void PersistEvent(EventData eventData)
85 : {
86 : Logger.LogDebug("Adding data to the table storage event-store aggregate folder", "TableStorageStore\\Add");
87 : TableStorageStore.Add(eventData);
88 : Logger.LogDebug("Adding data to the table storage event-store by-correlation folder", "TableStorageStore\\Add");
89 : CorrelationIdTableStorageStore.Add(eventData);
90 : }
91 :
92 : #endregion
93 :
94 : public class RawTableStorageEventStore
95 : : TableStorageStore<EventDataTableEntity<EventData>, EventData>
96 0 : {
97 : private string TableName { get; set; }
98 :
99 : protected bool IsCorrelationIdTableStorageStore { get; set; }
100 :
101 : /// <summary>
102 : /// Initializes a new instance of the <see cref="RawTableStorageEventStore"/> class using the specified container.
103 : /// </summary>
104 1 : public RawTableStorageEventStore(ILogger logger, ITableStorageStoreConnectionStringFactory tableStorageEventStoreConnectionStringFactory, bool isCorrelationIdTableStorageStore = false)
105 : : base(logger)
106 : {
107 : GetContainerName = tableStorageEventStoreConnectionStringFactory.GetBaseContainerName;
108 : IsContainerPublic = () => false;
109 :
110 : IsCorrelationIdTableStorageStore = isCorrelationIdTableStorageStore;
111 : TableName = IsCorrelationIdTableStorageStore ? "EventStoreByCorrelationId" : "EventStore";
112 :
113 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
114 : Initialise(tableStorageEventStoreConnectionStringFactory);
115 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
116 : }
117 :
118 : #region Overrides of StorageStore<EventData,CloudTable>
119 :
120 0 : protected override string GetSafeSourceName(string sourceName)
121 : {
122 : return TableName;
123 : }
124 :
125 : #endregion
126 :
127 : #region Overrides of TableStorageStore<EventData>
128 :
129 0 : protected override ITableEntity CreateTableEntity(EventData data)
130 : {
131 : return new EventDataTableEntity<EventData>(data, IsCorrelationIdTableStorageStore);
132 : }
133 :
134 : /// <summary>
135 : /// Will mark the <paramref name="data"/> as logically (or soft).
136 : /// </summary>
137 1 : public override void Remove(EventData data)
138 : {
139 : throw new InvalidOperationException("Event store entries are not deletable.");
140 : }
141 :
142 0 : protected override TableOperation GetUpdatableTableEntity(EventData data)
143 : {
144 : throw new InvalidOperationException("Event store entries are not updatable.");
145 : }
146 :
147 0 : protected override TableOperation GetUpdatableTableEntity(EventDataTableEntity<EventData> data)
148 : {
149 : return GetUpdatableTableEntity(data.EventData);
150 : }
151 :
152 : #endregion
153 : }
154 : }
155 : }
|