Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.IO;
12 : using System.Linq;
13 : using cdmdotnet.Logging;
14 : using Cqrs.Events;
15 :
16 : namespace Cqrs.Azure.BlobStorage.Events
17 : {
18 : public class BlobStorageEventStore<TAuthenticationToken>
19 : : EventStore<TAuthenticationToken>
20 0 : {
21 : protected RawBlobStorageEventStore BlobStorageStore { get; private set; }
22 :
23 : /// <summary>
24 : /// Initializes a new instance of the <see cref="BlobStorageEventStore{TAuthenticationToken}"/> class using the specified container.
25 : /// </summary>
26 1 : public BlobStorageEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, IBlobStorageStoreConnectionStringFactory blobStorageEventStoreConnectionStringFactory)
27 : : base(eventBuilder, eventDeserialiser, logger)
28 : {
29 : BlobStorageStore = new RawBlobStorageEventStore(logger, blobStorageEventStoreConnectionStringFactory);
30 : }
31 :
32 : #region Overrides of EventStore<TAuthenticationToken>
33 :
34 0 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
35 : {
36 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
37 :
38 : IEnumerable<EventData> query = BlobStorageStore
39 : .GetByFolder(streamName)
40 : .Where(eventData => eventData.AggregateId == streamName && eventData.Version > fromVersion)
41 : .OrderByDescending(eventData => eventData.Version);
42 :
43 : if (useLastEventOnly)
44 : query = query.AsQueryable().Take(1);
45 :
46 : return query
47 : .Select(EventDeserialiser.Deserialise)
48 : .ToList();
49 : }
50 :
51 0 : public override IEnumerable<EventData> Get(Guid correlationId)
52 : {
53 : IEnumerable<EventData> query = BlobStorageStore
54 : .GetByFolder(string.Format("..\\by-correlation\\{0:N}", correlationId))
55 : .Where(eventData => eventData.CorrelationId == correlationId)
56 : .OrderBy(eventData => eventData.Timestamp);
57 :
58 : return query.ToList();
59 : }
60 :
61 0 : protected override void PersistEvent(EventData eventData)
62 : {
63 : Logger.LogDebug("Adding data to the blob storage event-store aggregate folder", "BlobStorageStore\\Add");
64 : BlobStorageStore.Add(eventData);
65 : Logger.LogDebug("Adding data to the blob storage event-store by-correlation folder", "BlobStorageStore\\Add");
66 : BlobStorageStore.AddToCorrelationFolder(eventData);
67 : }
68 :
69 : #endregion
70 :
71 : public class RawBlobStorageEventStore
72 : : BlobStorageStore<EventData>
73 0 : {
74 : /// <summary>
75 : /// Initializes a new instance of the <see cref="RawBlobStorageEventStore"/> class using the specified container.
76 : /// </summary>
77 1 : public RawBlobStorageEventStore(ILogger logger, IBlobStorageStoreConnectionStringFactory blobStorageEventStoreConnectionStringFactory)
78 : : base(logger)
79 : {
80 : GetContainerName = blobStorageEventStoreConnectionStringFactory.GetBaseContainerName;
81 : IsContainerPublic = () => false;
82 : GenerateFileName = data => Path.Combine(data.AggregateId, string.Format("{0:D10}\\{1}",data.Version, data.EventId.ToString("N")));
83 :
84 : // ReSharper disable DoNotCallOverridableMethodsInConstructor
85 : Initialise(blobStorageEventStoreConnectionStringFactory);
86 : // ReSharper restore DoNotCallOverridableMethodsInConstructor
87 : }
88 :
89 0 : public void AddToCorrelationFolder(EventData data)
90 : {
91 : AsyncSaveData
92 : (
93 : data,
94 : (taskData, cloudBlockBlob) =>
95 : {
96 : try
97 : {
98 : cloudBlockBlob.UploadFromStream(Serialise(taskData));
99 : cloudBlockBlob.Properties.ContentType = "application/json";
100 : cloudBlockBlob.SetProperties();
101 : return cloudBlockBlob.Uri;
102 : }
103 : catch (Exception exception)
104 : {
105 : Logger.LogError("There was an issue persisting data to blob storage.", exception: exception);
106 : throw;
107 : }
108 : },
109 : taskData => string.Format("by-correlation\\{0:N}\\{1}", data.CorrelationId, GenerateFileName(data))
110 : );
111 : }
112 : }
113 : }
114 : }
|