Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using cdmdotnet.Logging;
13 : using Cqrs.Azure.BlobStorage;
14 : using Cqrs.Events;
15 : using Microsoft.WindowsAzure.Storage;
16 : using Microsoft.WindowsAzure.Storage.Table;
17 :
18 : namespace Cqrs.Azure.Storage.Events
19 : {
20 : public class TableStorageEventStore<TAuthenticationToken>
21 : : BlobStorage.Events.TableStorageEventStore<TAuthenticationToken>
22 0 : {
23 : /// <summary>
24 : /// Initializes a new instance of the <see cref="TableStorageEventStore{TAuthenticationToken}"/> class using the specified container.
25 : /// </summary>
26 1 : public TableStorageEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, ITableStorageStoreConnectionStringFactory tableStorageEventStoreConnectionStringFactory)
27 : : base(eventBuilder, eventDeserialiser, logger, tableStorageEventStoreConnectionStringFactory, (logger1, tableStorageEventStoreConnectionStringFactory1, isCorrelationIdTableStorageStore) => new RawTableStorageEventStore(logger1, tableStorageEventStoreConnectionStringFactory1, isCorrelationIdTableStorageStore))
28 : {
29 : }
30 :
31 : #region Overrides of EventStore<TAuthenticationToken>
32 :
33 0 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
34 : {
35 : string streamName = GenerateStreamName(aggregateRootType, aggregateId);
36 :
37 : // Create the table query.
38 : var rangeQuery = new TableQuery<DynamicTableEntity>().Where
39 : (
40 : TableQuery.CombineFilters
41 : (
42 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(streamName)),
43 : TableOperators.And,
44 : TableQuery.GenerateFilterCondition("AggregateId", QueryComparisons.Equal, streamName)
45 : )
46 : );
47 :
48 : var operationContext = new OperationContext();
49 : IEnumerable<EventData> query = TableStorageStore.ReadableSource.ExecuteQuery(rangeQuery)
50 : .Select(eventData => EntityPropertyConverter.ConvertBack<EventData>(eventData.Properties, operationContext))
51 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version > fromVersion)
52 : .OrderByDescending(eventData => eventData.Version);
53 :
54 : if (useLastEventOnly)
55 : query = query.AsQueryable().Take(1);
56 :
57 : return query
58 : .Select(eventData => EventDeserialiser.Deserialise(eventData))
59 : .ToList();
60 : }
61 :
62 0 : public override IEnumerable<EventData> Get(Guid correlationId)
63 : {
64 : // Create the table query.
65 : var rangeQuery = new TableQuery<DynamicTableEntity>().Where
66 : (
67 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(correlationId.ToString("N")))
68 : );
69 :
70 : var operationContext = new OperationContext();
71 : IEnumerable<EventData> query = CorrelationIdTableStorageStore.ReadableSource.ExecuteQuery(rangeQuery)
72 : .Select(eventData => EntityPropertyConverter.ConvertBack<EventData>(eventData.Properties, operationContext))
73 : .OrderBy(eventData => eventData.Timestamp);
74 :
75 : return query.ToList();
76 : }
77 :
78 : #endregion
79 :
80 : public class RawTableStorageEventStore
81 : : BlobStorage.Events.TableStorageEventStore<TAuthenticationToken>.RawTableStorageEventStore
82 0 : {
83 : /// <summary>
84 : /// Initializes a new instance of the <see cref="RawTableStorageEventStore"/> class using the specified container.
85 : /// </summary>
86 1 : public RawTableStorageEventStore(ILogger logger, ITableStorageStoreConnectionStringFactory tableStorageEventStoreConnectionStringFactory, bool isCorrelationIdTableStorageStore = false)
87 : : base(logger, tableStorageEventStoreConnectionStringFactory, isCorrelationIdTableStorageStore)
88 : {
89 : }
90 :
91 : #region Overrides of StorageStore<EventData,CloudTable>
92 :
93 0 : protected override string GetSafeSourceName(string sourceName)
94 : {
95 : string tableName = base.GetSafeSourceName(sourceName);
96 : if (tableName.Length > 34)
97 : tableName = tableName.Substring(tableName.Length - 34);
98 : return string.Format("{0}V2", tableName);
99 : }
100 :
101 : #endregion
102 :
103 : #region Overrides of TableStorageStore<EventData>
104 :
105 0 : protected override ITableEntity CreateTableEntity(EventData data)
106 : {
107 : var tableEntity = new EventDataTableEntity<EventData>(data, IsCorrelationIdTableStorageStore);
108 : //Flatten object of type TData and convert it to EntityProperty Dictionary
109 : Dictionary<string, EntityProperty> flattenedProperties = EntityPropertyConverter.Flatten(data, new OperationContext());
110 :
111 : // Create a DynamicTableEntity and set its PK and RK
112 : DynamicTableEntity dynamicTableEntity = new DynamicTableEntity(tableEntity.PartitionKey, tableEntity.RowKey)
113 : {
114 : Properties = flattenedProperties
115 : };
116 :
117 : return dynamicTableEntity;
118 : }
119 :
120 : #endregion
121 : }
122 : }
123 : }
|