Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.IO;
12 : using System.Linq;
13 : using Chinchilla.Logging;
14 : using Cqrs.Domain;
15 : using Cqrs.Events;
16 : using Cqrs.Messages;
17 :
18 : namespace Cqrs.Azure.BlobStorage.Events
19 : {
20 : /// <summary>
21 : /// An Azure blob storage based <see cref="EventStore{TAuthenticationToken}"/>.
22 : /// </summary>
23 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
24 : public class BlobStorageEventStore<TAuthenticationToken>
25 : : EventStore<TAuthenticationToken>
26 1 : {
27 : /// <summary>
28 : /// Get the <see cref="RawBlobStorageEventStore"/>.
29 : /// </summary>
30 : protected RawBlobStorageEventStore BlobStorageStore { get; private set; }
31 :
32 : /// <summary>
33 : /// Initializes a new instance of the <see cref="BlobStorageEventStore{TAuthenticationToken}"/> class using the specified container.
34 : /// </summary>
35 1 : public BlobStorageEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, IBlobStorageStoreConnectionStringFactory blobStorageEventStoreConnectionStringFactory)
36 : : base(eventBuilder, eventDeserialiser, logger)
37 : {
38 : BlobStorageStore = new RawBlobStorageEventStore(logger, blobStorageEventStoreConnectionStringFactory);
39 : }
40 :
41 : #region Overrides of EventStore<TAuthenticationToken>
42 :
43 : /// <summary>
44 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
45 : /// </summary>
46 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
47 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
48 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
49 : /// <param name="fromVersion">Load events starting from this version</param>
50 1 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
51 : {
52 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
53 :
54 : IEnumerable<EventData> query = BlobStorageStore
55 : .GetByFolder(streamName)
56 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version > fromVersion)
57 : .OrderByDescending(eventData => eventData.Version);
58 :
59 : if (useLastEventOnly)
60 : query = query.AsQueryable().Take(1);
61 :
62 : return query
63 : .Select(EventDeserialiser.Deserialise)
64 : .ToList();
65 : }
66 :
67 : /// <summary>
68 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="version"/>.
69 : /// </summary>
70 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
71 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
72 : /// <param name="version">Load events up-to and including from this version</param>
73 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetToVersion(Type aggregateRootType, Guid aggregateId, int version)
74 : {
75 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
76 :
77 : IEnumerable<EventData> query = BlobStorageStore
78 : .GetByFolder(streamName)
79 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version <= version)
80 : .OrderByDescending(eventData => eventData.Version);
81 :
82 : return query
83 : .Select(EventDeserialiser.Deserialise)
84 : .ToList();
85 : }
86 :
87 : /// <summary>
88 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="versionedDate"/>.
89 : /// </summary>
90 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
91 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
92 : /// <param name="versionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
93 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetToDate(Type aggregateRootType, Guid aggregateId, DateTime versionedDate)
94 : {
95 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
96 :
97 : IEnumerable<EventData> query = BlobStorageStore
98 : .GetByFolder(streamName)
99 : .Where(eventData => eventData.AggregateId == streamName && eventData.Timestamp <= versionedDate)
100 : .OrderByDescending(eventData => eventData.Version);
101 :
102 : return query
103 : .Select(EventDeserialiser.Deserialise)
104 : .ToList();
105 : }
106 :
107 : /// <summary>
108 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> from and including the provided <paramref name="fromVersionedDate"/> up to and including the provided <paramref name="toVersionedDate"/>.
109 : /// </summary>
110 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
111 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
112 : /// <param name="fromVersionedDate">Load events from and including from this <see cref="DateTime"/></param>
113 : /// <param name="toVersionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
114 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetBetweenDates(Type aggregateRootType, Guid aggregateId, DateTime fromVersionedDate,
115 : DateTime toVersionedDate)
116 : {
117 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
118 :
119 : IEnumerable<EventData> query = BlobStorageStore
120 : .GetByFolder(streamName)
121 : .Where(eventData => eventData.AggregateId == streamName && eventData.Timestamp >= fromVersionedDate && eventData.Timestamp <= toVersionedDate)
122 : .OrderByDescending(eventData => eventData.Version);
123 :
124 : return query
125 : .Select(EventDeserialiser.Deserialise)
126 : .ToList();
127 : }
128 :
129 : /// <summary>
130 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
131 : /// </summary>
132 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
133 1 : public override IEnumerable<EventData> Get(Guid correlationId)
134 : {
135 : IEnumerable<EventData> query = BlobStorageStore
136 : .GetByFolder(string.Format("..\\by-correlation\\{0:N}", correlationId))
137 : .Where(eventData => eventData.CorrelationId == correlationId)
138 : .OrderBy(eventData => eventData.Timestamp);
139 :
140 : return query.ToList();
141 : }
142 :
143 : /// <summary>
144 : /// Persist the provided <paramref name="eventData"/> into storage.
145 : /// </summary>
146 : /// <param name="eventData">The <see cref="EventData"/> to persist.</param>
147 1 : protected override void PersistEvent(EventData eventData)
148 : {
149 : Logger.LogDebug("Adding data to the blob storage event-store aggregate folder", "BlobStorageStore\\Add");
150 : BlobStorageStore.Add(eventData);
151 : Logger.LogDebug("Adding data to the blob storage event-store by-correlation folder", "BlobStorageStore\\Add");
152 : BlobStorageStore.AddToCorrelationFolder(eventData);
153 : }
154 :
155 : #endregion
156 :
157 : /// <summary>
158 : /// The raw <see cref="Cqrs.Azure.BlobStorage.BlobStorageStore{TEventData}"/>.
159 : /// </summary>
160 : public class RawBlobStorageEventStore
161 : : BlobStorageStore<EventData>
162 1 : {
163 : /// <summary>
164 : /// Initializes a new instance of the <see cref="RawBlobStorageEventStore"/> class using the specified container.
165 : /// </summary>
166 1 : public RawBlobStorageEventStore(ILogger logger, IBlobStorageStoreConnectionStringFactory blobStorageEventStoreConnectionStringFactory)
167 : : base(logger)
168 : {
169 : GetContainerName = blobStorageEventStoreConnectionStringFactory.GetBaseContainerName;
170 : IsContainerPublic = () => false;
171 : GenerateFileName = data => Path.Combine(data.AggregateId, string.Format("{0:D10}\\{1}",data.Version, data.EventId.ToString("N")));
172 :
173 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
174 : Initialise(blobStorageEventStoreConnectionStringFactory);
175 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
176 : }
177 :
178 : /// <summary>
179 : /// Add the provided <paramref name="data"/> into the correlation folder.
180 : /// </summary>
181 1 : public virtual void AddToCorrelationFolder(EventData data)
182 : {
183 : AsyncSaveData
184 : (
185 : data,
186 : (taskData, cloudBlockBlob) =>
187 : {
188 : try
189 : {
190 : cloudBlockBlob.UploadFromStream(Serialise(taskData));
191 : cloudBlockBlob.Properties.ContentType = "application/json";
192 : cloudBlockBlob.SetProperties();
193 : return cloudBlockBlob.Uri;
194 : }
195 : catch (Exception exception)
196 : {
197 : Logger.LogError("There was an issue persisting data to blob storage.", exception: exception);
198 : throw;
199 : }
200 : },
201 : taskData => string.Format("by-correlation\\{0:N}\\{1}", data.CorrelationId, GenerateFileName(data))
202 : );
203 : }
204 : }
205 : }
206 : }
|