Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.IO;
12 : using System.Linq;
13 : using cdmdotnet.Logging;
14 : using Cqrs.Domain;
15 : using Cqrs.Events;
16 : using Cqrs.Messages;
17 :
18 : namespace Cqrs.Azure.BlobStorage.Events
19 : {
20 : /// <summary>
21 : /// An Azure blob storage based <see cref="EventStore{TAuthenticationToken}"/>.
22 : /// </summary>
23 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
24 : public class BlobStorageEventStore<TAuthenticationToken>
25 : : EventStore<TAuthenticationToken>
26 1 : {
27 : /// <summary>
28 : /// Get the <see cref="RawBlobStorageEventStore"/>.
29 : /// </summary>
30 : protected RawBlobStorageEventStore BlobStorageStore { get; private set; }
31 :
32 : /// <summary>
33 : /// Initializes a new instance of the <see cref="BlobStorageEventStore{TAuthenticationToken}"/> class using the specified container.
34 : /// </summary>
35 1 : public BlobStorageEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, IBlobStorageStoreConnectionStringFactory blobStorageEventStoreConnectionStringFactory)
36 : : base(eventBuilder, eventDeserialiser, logger)
37 : {
38 : BlobStorageStore = new RawBlobStorageEventStore(logger, blobStorageEventStoreConnectionStringFactory);
39 : }
40 :
41 : #region Overrides of EventStore<TAuthenticationToken>
42 :
43 : /// <summary>
44 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
45 : /// </summary>
46 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
47 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
48 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
49 : /// <param name="fromVersion">Load events starting from this version</param>
50 1 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
51 : {
52 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
53 :
54 : IEnumerable<EventData> query = BlobStorageStore
55 : .GetByFolder(streamName)
56 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version > fromVersion)
57 : .OrderByDescending(eventData => eventData.Version);
58 :
59 : if (useLastEventOnly)
60 : query = query.AsQueryable().Take(1);
61 :
62 : return query
63 : .Select(EventDeserialiser.Deserialise)
64 : .ToList();
65 : }
66 :
67 : /// <summary>
68 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
69 : /// </summary>
70 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
71 1 : public override IEnumerable<EventData> Get(Guid correlationId)
72 : {
73 : IEnumerable<EventData> query = BlobStorageStore
74 : .GetByFolder(string.Format("..\\by-correlation\\{0:N}", correlationId))
75 : .Where(eventData => eventData.CorrelationId == correlationId)
76 : .OrderBy(eventData => eventData.Timestamp);
77 :
78 : return query.ToList();
79 : }
80 :
81 : /// <summary>
82 : /// Persist the provided <paramref name="eventData"/> into storage.
83 : /// </summary>
84 : /// <param name="eventData">The <see cref="EventData"/> to persist.</param>
85 1 : protected override void PersistEvent(EventData eventData)
86 : {
87 : Logger.LogDebug("Adding data to the blob storage event-store aggregate folder", "BlobStorageStore\\Add");
88 : BlobStorageStore.Add(eventData);
89 : Logger.LogDebug("Adding data to the blob storage event-store by-correlation folder", "BlobStorageStore\\Add");
90 : BlobStorageStore.AddToCorrelationFolder(eventData);
91 : }
92 :
93 : #endregion
94 :
95 : /// <summary>
96 : /// The raw <see cref="Cqrs.Azure.BlobStorage.BlobStorageStore{TEventData}"/>.
97 : /// </summary>
98 : public class RawBlobStorageEventStore
99 : : BlobStorageStore<EventData>
100 1 : {
101 : /// <summary>
102 : /// Initializes a new instance of the <see cref="RawBlobStorageEventStore"/> class using the specified container.
103 : /// </summary>
104 1 : public RawBlobStorageEventStore(ILogger logger, IBlobStorageStoreConnectionStringFactory blobStorageEventStoreConnectionStringFactory)
105 : : base(logger)
106 : {
107 : GetContainerName = blobStorageEventStoreConnectionStringFactory.GetBaseContainerName;
108 : IsContainerPublic = () => false;
109 : GenerateFileName = data => Path.Combine(data.AggregateId, string.Format("{0:D10}\\{1}",data.Version, data.EventId.ToString("N")));
110 :
111 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
112 : Initialise(blobStorageEventStoreConnectionStringFactory);
113 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
114 : }
115 :
116 : /// <summary>
117 : /// Add the provided <paramref name="data"/> into the correlation folder.
118 : /// </summary>
119 1 : public virtual void AddToCorrelationFolder(EventData data)
120 : {
121 : AsyncSaveData
122 : (
123 : data,
124 : (taskData, cloudBlockBlob) =>
125 : {
126 : try
127 : {
128 : cloudBlockBlob.UploadFromStream(Serialise(taskData));
129 : cloudBlockBlob.Properties.ContentType = "application/json";
130 : cloudBlockBlob.SetProperties();
131 : return cloudBlockBlob.Uri;
132 : }
133 : catch (Exception exception)
134 : {
135 : Logger.LogError("There was an issue persisting data to blob storage.", exception: exception);
136 : throw;
137 : }
138 : },
139 : taskData => string.Format("by-correlation\\{0:N}\\{1}", data.CorrelationId, GenerateFileName(data))
140 : );
141 : }
142 : }
143 : }
144 : }
|