Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Threading.Tasks;
13 : using Cqrs.Events;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Configuration;
16 : using Cqrs.Snapshots;
17 : using Microsoft.Azure.Documents;
18 : using Microsoft.Azure.Documents.Client;
19 :
20 : namespace Cqrs.Azure.DocumentDb.Events
21 : {
22 : /// <summary>
23 : /// A DocumentDb based <see cref="SnapshotStore"/>.
24 : /// </summary>
25 : public class AzureDocumentDbSnapshotStore
26 : : SnapshotStore
27 1 : {
28 : /// <summary>
29 : /// The properties that must be unique.
30 : /// </summary>
31 : protected readonly string[] UniqueIndexProperties = {"AggregateId", "Version"};
32 :
33 : /// <summary>
34 : /// Gets or sets the <see cref="IAzureDocumentDbSnapshotStoreConnectionStringFactory"/>
35 : /// </summary>
36 : protected IAzureDocumentDbSnapshotStoreConnectionStringFactory AzureDocumentDbSnapshotStoreConnectionStringFactory { get; private set; }
37 :
38 : /// <summary>
39 : /// Gets or sets the <see cref="IAzureDocumentDbHelper"/>
40 : /// </summary>
41 : protected IAzureDocumentDbHelper AzureDocumentDbHelper { get; private set; }
42 :
43 : /// <summary>
44 : /// Instantiate a new instance of <see cref="AzureDocumentDbSnapshotStore"/>.
45 : /// </summary>
46 1 : public AzureDocumentDbSnapshotStore(IConfigurationManager configurationManager, ISnapshotDeserialiser eventDeserialiser, ILogger logger, ICorrelationIdHelper correlationIdHelper, ISnapshotBuilder snapshotBuilder, IAzureDocumentDbHelper azureDocumentDbHelper, IAzureDocumentDbSnapshotStoreConnectionStringFactory azureDocumentDbSnapshotStoreConnectionStringFactory)
47 : : base(configurationManager, eventDeserialiser, snapshotBuilder, logger, correlationIdHelper)
48 : {
49 : AzureDocumentDbHelper = azureDocumentDbHelper;
50 : AzureDocumentDbSnapshotStoreConnectionStringFactory = azureDocumentDbSnapshotStoreConnectionStringFactory;
51 : }
52 :
53 : #region Overrides of SnapshotStore
54 :
55 : /// <summary>
56 : /// Get the latest <see cref="Snapshot"/> from storage.
57 : /// </summary>
58 : /// <returns>The most recent <see cref="Snapshot"/> of</returns>
59 1 : protected override Snapshot Get(Type aggregateRootType, string streamName)
60 : {
61 : return GetAsync(aggregateRootType, streamName).Result;
62 : }
63 :
64 : /// <summary>
65 : /// Saves the provided <paramref name="snapshot"/> into storage.
66 : /// </summary>
67 : /// <param name="snapshot">the <see cref="Snapshot"/> to save and store.</param>
68 1 : public override void Save(Snapshot snapshot)
69 : {
70 : Logger.LogDebug("Persisting aggregate root snapshot", string.Format("{0}\\Save", GetType().Name));
71 : try
72 : {
73 : using (DocumentClient client = AzureDocumentDbSnapshotStoreConnectionStringFactory.GetSnapshotStoreConnectionClient())
74 : {
75 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbSnapshotStoreConnectionStringFactory.GetSnapshotStoreConnectionDatabaseName()).Result;
76 : string collectionName = AzureDocumentDbSnapshotStoreConnectionStringFactory.GetSnapshotStoreConnectionCollectionName();
77 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
78 :
79 : Logger.LogDebug("Creating document for snapshot asynchronously", string.Format("{0}\\Save", GetType().Name));
80 : AzureDocumentDbHelper.ExecuteFaultTollerantFunction
81 : (
82 : () =>
83 : {
84 : Task<ResourceResponse<Document>> work = client.CreateDocumentAsync
85 : (
86 : collection.SelfLink,
87 : BuildEventData(snapshot)
88 : );
89 : work.ConfigureAwait(false);
90 : work.Wait();
91 : }
92 : );
93 : }
94 : }
95 : finally
96 : {
97 : Logger.LogDebug("Persisting aggregate root snapshot... Done", string.Format("{0}\\Save", GetType().Name));
98 : }
99 : }
100 :
101 : #endregion
102 :
103 : /// <summary>
104 : /// Get the latest <see cref="Snapshot"/> from storage.
105 : /// </summary>
106 : /// <returns>The most recent <see cref="Snapshot"/> of</returns>
107 1 : protected virtual async Task<Snapshot> GetAsync(Type aggregateRootType, string streamName)
108 : {
109 : using (DocumentClient client = AzureDocumentDbSnapshotStoreConnectionStringFactory.GetSnapshotStoreConnectionClient())
110 : {
111 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbSnapshotStoreConnectionStringFactory.GetSnapshotStoreConnectionDatabaseName()).Result;
112 : string collectionName = AzureDocumentDbSnapshotStoreConnectionStringFactory.GetSnapshotStoreConnectionCollectionName();
113 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
114 :
115 : IOrderedQueryable<EventData> query = client.CreateDocumentQuery<EventData>(collection.SelfLink);
116 :
117 : IEnumerable<EventData> results = query.Where(snapshot => snapshot.AggregateId == streamName);
118 :
119 : return AzureDocumentDbHelper.ExecuteFaultTollerantFunction(() =>
120 : results
121 : .ToList()
122 : .OrderByDescending(eventData => eventData.Version)
123 : .Take(1)
124 : .Select(EventDeserialiser.Deserialise)
125 : .SingleOrDefault()
126 : );
127 : }
128 : }
129 : }
130 : }
|