Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Threading;
13 : using System.Threading.Tasks;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Domain.Exceptions;
16 : using Microsoft.Azure.Documents;
17 : using Microsoft.Azure.Documents.Client;
18 : using System.Collections.ObjectModel;
19 :
20 : namespace Cqrs.Azure.DocumentDb
21 : {
22 : /// <summary>
23 : /// A helper for Azure Document DB.
24 : /// </summary>
25 : public class AzureDocumentDbHelper : IAzureDocumentDbHelper
26 1 : {
27 : /// <summary>
28 : /// Gets or sets the <see cref="ILogger"/>.
29 : /// </summary>
30 : protected ILogger Logger { get; private set; }
31 :
32 : /// <summary>
33 : /// Gets the <see cref="IAzureDocumentDbConnectionCache"/>
34 : /// </summary>
35 : protected IAzureDocumentDbConnectionCache AzureDocumentDbConnectionCache { get; private set; }
36 :
37 : /// <summary>
38 : /// Instantiates a new instance of <see cref="AzureDocumentDbHelper"/>.
39 : /// </summary>
40 1 : public AzureDocumentDbHelper(ILogger logger, IAzureDocumentDbConnectionCache azureDocumentDbConnectionCache)
41 : {
42 : Logger = logger;
43 : AzureDocumentDbConnectionCache = azureDocumentDbConnectionCache;
44 : }
45 :
46 : /// <summary>
47 : /// Gets a <see cref="Database"/> creating one if it doesn't already exist.
48 : /// </summary>
49 : /// <param name="client">The <see cref="DocumentClient"/> to use.</param>
50 : /// <param name="databaseName">The name of the database to get.</param>
51 1 : public async Task<Database> CreateOrReadDatabase(DocumentClient client, string databaseName)
52 : {
53 : Logger.LogDebug("Getting Azure database", "AzureDocumentDbHelper\\CreateOrReadDatabase");
54 : DateTime start = DateTime.Now;
55 : Database result;
56 : string databaseCacheKey = string.Format("AzureDocumentDbDatabase::{0}", databaseName);
57 : if (AzureDocumentDbConnectionCache.TryGetDatabase(databaseCacheKey, out result))
58 : {
59 : Logger.LogDebug(string.Format("Returning cached database took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadDatabase");
60 : try
61 : {
62 : return result;
63 : }
64 : finally
65 : {
66 : Logger.LogDebug("Returning cached database took... Done", "AzureDocumentDbHelper\\CreateOrReadDatabase");
67 : }
68 : }
69 : try
70 : {
71 : IEnumerable<Database> query = client.CreateDatabaseQuery()
72 : .Where(database => database.Id == databaseName)
73 : .AsEnumerable();
74 : Logger.LogDebug("Checking if the database exists", "AzureDocumentDbHelper\\CreateOrReadDatabase");
75 : start = DateTime.Now;
76 : result = query.SingleOrDefault();
77 : if (result != null)
78 : {
79 : Logger.LogDebug(string.Format("Returning the existing database took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadDatabase");
80 : try
81 : {
82 : AzureDocumentDbConnectionCache.SetDatabase(databaseCacheKey, result);
83 : return result;
84 : }
85 : finally
86 : {
87 : Logger.LogDebug("Returning the existing database... Done", "AzureDocumentDbHelper\\CreateOrReadDatabase");
88 : }
89 : }
90 : Logger.LogDebug("Creating and returning a new database", "AzureDocumentDbHelper\\CreateOrReadDatabase");
91 : start = DateTime.Now;
92 :
93 : result = ExecuteFaultTollerantFunction(() => client.CreateDatabaseAsync(new Database { Id = databaseName }).Result);
94 :
95 : Logger.LogDebug(string.Format("Getting Azure database took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadDatabase");
96 : AzureDocumentDbConnectionCache.SetDatabase(databaseCacheKey, result);
97 : return result;
98 : }
99 : finally
100 : {
101 : Logger.LogDebug("Getting Azure database... Done", "AzureDocumentDbHelper\\CreateOrReadDatabase");
102 : }
103 : }
104 :
105 : /// <summary>
106 : /// Gets a <see cref="DocumentCollection"/> creating one if it doesn't already exist.
107 : /// </summary>
108 : /// <param name="client">The <see cref="DocumentClient"/> to use.</param>
109 : /// <param name="database">The <see cref="Database"/> to look in.</param>
110 : /// <param name="collectionName">The name of the collection to get.</param>
111 : /// <param name="uniqiueIndexPropertyNames">Any unique properties the collection should enforce.</param>
112 1 : public async Task<DocumentCollection> CreateOrReadCollection(DocumentClient client, Database database, string collectionName, string[] uniqiueIndexPropertyNames = null)
113 : {
114 : Logger.LogDebug("Getting Azure collection", "AzureDocumentDbHelper\\CreateOrReadCollection");
115 : DateTime start = DateTime.Now;
116 : DocumentCollection result;
117 : string documentCollectionCacheKey = string.Format("AzureDocumentDbDocumentCollection::{0}", collectionName);
118 : if (AzureDocumentDbConnectionCache.TryGetDocumentCollection(documentCollectionCacheKey, out result))
119 : {
120 : Logger.LogDebug(string.Format("Returning cached collection took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadCollection");
121 : try
122 : {
123 : return result;
124 : }
125 : finally
126 : {
127 : Logger.LogDebug("Returning cached collection took... Done", "AzureDocumentDbHelper\\CreateOrReadCollection");
128 : }
129 : }
130 : try
131 : {
132 : IEnumerable<DocumentCollection> query = client.CreateDocumentCollectionQuery(database.SelfLink)
133 : .Where(documentCollection => documentCollection.Id == collectionName)
134 : .AsEnumerable();
135 : Logger.LogDebug("Checking if the collection exists", "AzureDocumentDbHelper\\CreateOrReadCollection");
136 : start = DateTime.Now;
137 : result = query.SingleOrDefault();
138 : if (result != null)
139 : {
140 : Logger.LogDebug(string.Format("Returning the existing document collection took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadCollection");
141 : try
142 : {
143 : // AzureDocumentDbConnectionCache.SetDocumentCollection(documentCollectionCacheKey, result);
144 : return result;
145 : }
146 : finally
147 : {
148 : Logger.LogDebug("Returning the existing document collection... Done", "AzureDocumentDbHelper\\CreateOrReadCollection");
149 : }
150 : }
151 : Logger.LogDebug("Creating and returning a new collection", "AzureDocumentDbHelper\\CreateOrReadCollection");
152 : start = DateTime.Now;
153 :
154 : DocumentCollection myCollection = new DocumentCollection();
155 : myCollection.Id = collectionName;
156 : if (uniqiueIndexPropertyNames != null)
157 : {
158 : var paths = new Collection<string>{string.Format("/{0}", uniqiueIndexPropertyNames.First().Replace('.', '/'))};
159 : foreach (string name in uniqiueIndexPropertyNames.Skip(1))
160 : paths.Add(string.Format("/{0}", name.Replace('.', '/')));
161 :
162 : myCollection.UniqueKeyPolicy = new UniqueKeyPolicy
163 : {
164 : UniqueKeys = new Collection<UniqueKey>{new UniqueKey { Paths = paths } }
165 : };
166 : }
167 :
168 : result = ExecuteFaultTollerantFunction(() => client.CreateDocumentCollectionAsync(database.SelfLink, myCollection).Result);
169 :
170 : Logger.LogDebug(string.Format("Getting Azure document collection took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadCollection");
171 : // AzureDocumentDbConnectionCache.SetDocumentCollection(documentCollectionCacheKey, result);
172 : return result;
173 : }
174 : finally
175 : {
176 : Logger.LogDebug("Getting Azure collection... Done", "AzureDocumentDbHelper\\CreateOrReadCollection");
177 : }
178 : }
179 :
180 : /// <summary>
181 : /// Process the provided <paramref name="documentClientException"/> checking for operations that can be retired.
182 : /// </summary>
183 : /// <param name="documentClientException">The <see cref="DocumentClientException"/> to check.</param>
184 1 : protected virtual void ProcessFaultTollerantExceptions(DocumentClientException documentClientException)
185 : {
186 : var statusCode = (int)documentClientException.StatusCode;
187 : if (statusCode == 429 || statusCode == 503)
188 : Thread.Sleep(documentClientException.RetryAfter);
189 : else
190 : {
191 : Logger.LogWarning("Non-fault tolerant exception raised via DocumentClientException.", "AzureDocumentDbDataStore\\ProcessFaultTollerantExceptions");
192 : if (documentClientException.Error.Message == "Resource with specified id or name already exists.")
193 : throw new DuplicateCreateCommandException(documentClientException);
194 : throw new DocumentDbException(documentClientException);
195 : }
196 : }
197 :
198 : /// <summary>
199 : /// Execute the provided <paramref name="func"/> in a fault tolerant way.
200 : /// </summary>
201 : /// <param name="func">The <see cref="Func{T}"/> to execute.</param>
202 1 : public virtual T ExecuteFaultTollerantFunction<T>(Func<T> func)
203 : {
204 : while (true)
205 : {
206 : try
207 : {
208 : return func();
209 : }
210 : catch (DocumentClientException documentClientException)
211 : {
212 : Logger.LogWarning("DocumentClientException thrown.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction");
213 : ProcessFaultTollerantExceptions(documentClientException);
214 : }
215 : catch (AggregateException aggregateException)
216 : {
217 : var documentClientException = aggregateException.InnerException as DocumentClientException;
218 : if (documentClientException != null)
219 : {
220 : Logger.LogWarning("DocumentClientException thrown via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", documentClientException);
221 : ProcessFaultTollerantExceptions(documentClientException);
222 : }
223 : else
224 : Logger.LogWarning("Non DocumentClientException raised via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", aggregateException);
225 : }
226 : }
227 : }
228 :
229 : /// <summary>
230 : /// Execute the provided <paramref name="func"/> in a fault tolerant way.
231 : /// </summary>
232 : /// <param name="func">The <see cref="Action"/> to execute.</param>
233 1 : public virtual void ExecuteFaultTollerantFunction(Action func)
234 : {
235 : while (true)
236 : {
237 : try
238 : {
239 : func();
240 : return;
241 : }
242 : catch (DocumentClientException documentClientException)
243 : {
244 : ProcessFaultTollerantExceptions(documentClientException);
245 : }
246 : catch (AggregateException aggregateException)
247 : {
248 : var documentClientException = aggregateException.InnerException as DocumentClientException;
249 : if (documentClientException != null)
250 : {
251 : Logger.LogWarning("DocumentClientException thrown via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", documentClientException);
252 : ProcessFaultTollerantExceptions(documentClientException);
253 : }
254 : else
255 : Logger.LogWarning("Non DocumentClientException raised via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", aggregateException);
256 : }
257 : }
258 : }
259 : }
260 : }
|