Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using cdmdotnet.Logging;
13 : using Cqrs.Azure.BlobStorage;
14 : using Cqrs.Domain;
15 : using Cqrs.Events;
16 : using Cqrs.Messages;
17 : using Microsoft.WindowsAzure.Storage;
18 : using Microsoft.WindowsAzure.Storage.Table;
19 :
20 : namespace Cqrs.Azure.Storage.Events
21 : {
22 : /// <summary>
23 : /// An Azure Storage based <see cref="EventStore{TAuthenticationToken}"/>.
24 : /// </summary>
25 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
26 : public class TableStorageEventStore<TAuthenticationToken>
27 : : BlobStorage.Events.TableStorageEventStore<TAuthenticationToken>
28 1 : {
29 : /// <summary>
30 : /// Initializes a new instance of the <see cref="TableStorageEventStore{TAuthenticationToken}"/> class using the specified container.
31 : /// </summary>
32 1 : public TableStorageEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, ITableStorageStoreConnectionStringFactory tableStorageEventStoreConnectionStringFactory)
33 : : base(eventBuilder, eventDeserialiser, logger, tableStorageEventStoreConnectionStringFactory, (logger1, tableStorageEventStoreConnectionStringFactory1, isCorrelationIdTableStorageStore) => new RawTableStorageEventStorer(logger1, tableStorageEventStoreConnectionStringFactory1, isCorrelationIdTableStorageStore))
34 : {
35 : }
36 :
37 : #region Overrides of EventStore<TAuthenticationToken>
38 :
39 : /// <summary>
40 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
41 : /// </summary>
42 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
43 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
44 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
45 : /// <param name="fromVersion">Load events starting from this version</param>
46 1 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
47 : {
48 : string streamName = GenerateStreamName(aggregateRootType, aggregateId);
49 :
50 : // Create the table query.
51 : var rangeQuery = new TableQuery<DynamicTableEntity>().Where
52 : (
53 : TableQuery.CombineFilters
54 : (
55 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(streamName)),
56 : TableOperators.And,
57 : TableQuery.GenerateFilterCondition("AggregateId", QueryComparisons.Equal, streamName)
58 : )
59 : );
60 :
61 : var operationContext = new OperationContext();
62 : IEnumerable<EventData> query = TableStorageStore.ReadableSource.ExecuteQuery(rangeQuery)
63 : #pragma warning disable 0436
64 : .Select(eventData => EntityPropertyConverter.ConvertBack<EventData>(eventData.Properties, operationContext))
65 : #pragma warning restore 0436
66 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version > fromVersion)
67 : .OrderByDescending(eventData => eventData.Version);
68 :
69 : if (useLastEventOnly)
70 : query = query.AsQueryable().Take(1);
71 :
72 : return query
73 : .Select(eventData => EventDeserialiser.Deserialise(eventData))
74 : .ToList();
75 : }
76 :
77 : /// <summary>
78 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
79 : /// </summary>
80 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
81 1 : public override IEnumerable<EventData> Get(Guid correlationId)
82 : {
83 : // Create the table query.
84 : var rangeQuery = new TableQuery<DynamicTableEntity>().Where
85 : (
86 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(correlationId.ToString("N")))
87 : );
88 :
89 : var operationContext = new OperationContext();
90 : IEnumerable<EventData> query = CorrelationIdTableStorageStore.ReadableSource.ExecuteQuery(rangeQuery)
91 : #pragma warning disable 0436
92 : .Select(eventData => EntityPropertyConverter.ConvertBack<EventData>(eventData.Properties, operationContext))
93 : #pragma warning restore 0436
94 : .OrderBy(eventData => eventData.Timestamp);
95 :
96 : return query.ToList();
97 : }
98 :
99 : #endregion
100 :
101 : /// <summary>
102 : /// An Azure Storage based <see cref="TableStorageStore{TData,TCollectionItemData}"/>.
103 : /// </summary>
104 : public class RawTableStorageEventStorer
105 : : RawTableStorageEventStore
106 1 : {
107 : /// <summary>
108 : /// Initializes a new instance of the RawTableStorageEventStore class using the specified container.
109 : /// </summary>
110 1 : public RawTableStorageEventStorer(ILogger logger, ITableStorageStoreConnectionStringFactory tableStorageEventStoreConnectionStringFactory, bool isCorrelationIdTableStorageStore = false)
111 : : base(logger, tableStorageEventStoreConnectionStringFactory, isCorrelationIdTableStorageStore)
112 : {
113 : }
114 :
115 : #region Overrides of StorageStore<EventData,CloudTable>
116 :
117 : /// <summary>
118 : /// The value differs from RawTableStorageEventStore.GetSafeSourceName(string) in that it appends "V2" to the end of the name.
119 : /// </summary>
120 1 : protected override string GetSafeSourceName(string sourceName)
121 : {
122 : string tableName = base.GetSafeSourceName(sourceName);
123 : if (tableName.Length > 34)
124 : tableName = tableName.Substring(tableName.Length - 34);
125 : return string.Format("{0}V2", tableName);
126 : }
127 :
128 : #endregion
129 :
130 : #region Overrides of TableStorageStore<EventData>
131 :
132 : /// <summary>
133 : /// Creates a new <see cref="DynamicTableEntity"/> copying the provided <paramref name="data"/>
134 : /// into <see cref="DynamicTableEntity.Properties"/>.
135 : /// </summary>
136 1 : protected override ITableEntity CreateTableEntity(EventData data)
137 : {
138 : var tableEntity = new EventDataTableEntity<EventData>(data, IsCorrelationIdTableStorageStore);
139 : //Flatten object of type TData and convert it to EntityProperty Dictionary
140 : #pragma warning disable 0436
141 : Dictionary<string, EntityProperty> flattenedProperties = EntityPropertyConverter.Flatten(data, new OperationContext());
142 : #pragma warning restore 0436
143 :
144 : // Create a DynamicTableEntity and set its PK and RK
145 : DynamicTableEntity dynamicTableEntity = new DynamicTableEntity(tableEntity.PartitionKey, tableEntity.RowKey)
146 : {
147 : Properties = flattenedProperties
148 : };
149 :
150 : return dynamicTableEntity;
151 : }
152 :
153 : #endregion
154 : }
155 : }
156 : }
|