Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Text;
13 : using System.Threading;
14 : using System.Threading.Tasks;
15 : using cdmdotnet.Logging;
16 : using Cqrs.Domain.Exceptions;
17 : using Microsoft.Azure.Documents;
18 : using Microsoft.Azure.Documents.Client;
19 : using Microsoft.Azure.Documents.Linq;
20 :
21 : namespace Cqrs.Azure.DocumentDb
22 : {
23 : /// <summary>
24 : /// A helper for Azure Document DB.
25 : /// </summary>
26 : public class AzureDocumentDbHelper : IAzureDocumentDbHelper
27 1 : {
28 : /// <summary>
29 : /// Gets or sets the <see cref="ILogger"/>.
30 : /// </summary>
31 : protected ILogger Logger { get; private set; }
32 :
33 : /// <summary>
34 : /// Gets the <see cref="IAzureDocumentDbConnectionCache"/>
35 : /// </summary>
36 : protected IAzureDocumentDbConnectionCache AzureDocumentDbConnectionCache { get; private set; }
37 :
38 : /// <summary>
39 : /// Instantiates a new instance of <see cref="AzureDocumentDbHelper"/>.
40 : /// </summary>
41 1 : public AzureDocumentDbHelper(ILogger logger, IAzureDocumentDbConnectionCache azureDocumentDbConnectionCache)
42 : {
43 : Logger = logger;
44 : AzureDocumentDbConnectionCache = azureDocumentDbConnectionCache;
45 : }
46 :
47 : /// <summary>
48 : /// Gets a <see cref="Database"/> creating one if it doesn't already exist.
49 : /// </summary>
50 : /// <param name="client">The <see cref="DocumentClient"/> to use.</param>
51 : /// <param name="databaseName">The name of the database to get.</param>
52 1 : public async Task<Database> CreateOrReadDatabase(DocumentClient client, string databaseName)
53 : {
54 : Logger.LogDebug("Getting Azure database", "AzureDocumentDbHelper\\CreateOrReadDatabase");
55 : DateTime start = DateTime.Now;
56 : Database result;
57 : string databaseCacheKey = string.Format("AzureDocumentDbDatabase::{0}", databaseName);
58 : if (AzureDocumentDbConnectionCache.TryGetDatabase(databaseCacheKey, out result))
59 : {
60 : Logger.LogDebug(string.Format("Returning cached database took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadDatabase");
61 : try
62 : {
63 : return result;
64 : }
65 : finally
66 : {
67 : Logger.LogDebug("Returning cached database took... Done", "AzureDocumentDbHelper\\CreateOrReadDatabase");
68 : }
69 : }
70 : try
71 : {
72 : IEnumerable<Database> query = client.CreateDatabaseQuery()
73 : .Where(database => database.Id == databaseName)
74 : .AsEnumerable();
75 : Logger.LogDebug("Checking if the database exists", "AzureDocumentDbHelper\\CreateOrReadDatabase");
76 : start = DateTime.Now;
77 : result = query.SingleOrDefault();
78 : if (result != null)
79 : {
80 : Logger.LogDebug(string.Format("Returning the existing database took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadDatabase");
81 : try
82 : {
83 : AzureDocumentDbConnectionCache.SetDatabase(databaseCacheKey, result);
84 : return result;
85 : }
86 : finally
87 : {
88 : Logger.LogDebug("Returning the existing database... Done", "AzureDocumentDbHelper\\CreateOrReadDatabase");
89 : }
90 : }
91 : Logger.LogDebug("Creating and returning a new database", "AzureDocumentDbHelper\\CreateOrReadDatabase");
92 : start = DateTime.Now;
93 :
94 : result = ExecuteFaultTollerantFunction(() => client.CreateDatabaseAsync(new Database { Id = databaseName }).Result);
95 :
96 : Logger.LogDebug(string.Format("Getting Azure database took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadDatabase");
97 : AzureDocumentDbConnectionCache.SetDatabase(databaseCacheKey, result);
98 : return result;
99 : }
100 : finally
101 : {
102 : Logger.LogDebug("Getting Azure database... Done", "AzureDocumentDbHelper\\CreateOrReadDatabase");
103 : }
104 : }
105 :
106 : /// <summary>
107 : /// Gets a <see cref="DocumentCollection"/> creating one if it doesn't already exist.
108 : /// </summary>
109 : /// <param name="client">The <see cref="DocumentClient"/> to use.</param>
110 : /// <param name="database">The <see cref="Database"/> to look in.</param>
111 : /// <param name="collectionName">The name of the collection to get.</param>
112 : /// <param name="uniqiueIndexPropertyNames">Any unique properties the collection should enforce.</param>
113 1 : public async Task<DocumentCollection> CreateOrReadCollection(DocumentClient client, Database database, string collectionName, string[] uniqiueIndexPropertyNames = null)
114 : {
115 : Logger.LogDebug("Getting Azure collection", "AzureDocumentDbHelper\\CreateOrReadCollection");
116 : DateTime start = DateTime.Now;
117 : DocumentCollection result;
118 : string documentCollectionCacheKey = string.Format("AzureDocumentDbDocumentCollection::{0}", collectionName);
119 : if (AzureDocumentDbConnectionCache.TryGetDocumentCollection(documentCollectionCacheKey, out result))
120 : {
121 : Logger.LogDebug(string.Format("Returning cached collection took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadCollection");
122 : try
123 : {
124 : return result;
125 : }
126 : finally
127 : {
128 : Logger.LogDebug("Returning cached collection took... Done", "AzureDocumentDbHelper\\CreateOrReadCollection");
129 : }
130 : }
131 : try
132 : {
133 : IEnumerable<DocumentCollection> query = client.CreateDocumentCollectionQuery(database.SelfLink)
134 : .Where(documentCollection => documentCollection.Id == collectionName)
135 : .AsEnumerable();
136 : Logger.LogDebug("Checking if the collection exists", "AzureDocumentDbHelper\\CreateOrReadCollection");
137 : start = DateTime.Now;
138 : result = query.SingleOrDefault();
139 : if (result != null)
140 : {
141 : Logger.LogDebug(string.Format("Returning the existing document collection took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadCollection");
142 : try
143 : {
144 : // AzureDocumentDbConnectionCache.SetDocumentCollection(documentCollectionCacheKey, result);
145 : return result;
146 : }
147 : finally
148 : {
149 : Logger.LogDebug("Returning the existing document collection... Done", "AzureDocumentDbHelper\\CreateOrReadCollection");
150 : }
151 : }
152 : Logger.LogDebug("Creating and returning a new collection", "AzureDocumentDbHelper\\CreateOrReadCollection");
153 : start = DateTime.Now;
154 : result = ExecuteFaultTollerantFunction(() => client.CreateDocumentCollectionAsync(database.SelfLink, new DocumentCollection { Id = collectionName }).Result);
155 : if (uniqiueIndexPropertyNames != null)
156 : {
157 : StringBuilder body = new StringBuilder(@"
158 : function()
159 : {
160 : var context = getContext();
161 : var collection = context.getCollection();
162 : var request = context.getRequest();
163 :
164 : // document to be created in the current operation
165 : var documentToCreate = request.getBody();
166 :
167 : function lookForDuplicates(propertyNames, propertyValues, continuation)
168 : {
169 : var queryString = 'SELECT * FROM c WHERE ';
170 : var queryParameters = [];
171 : for (index = 0; index < propertyNames.length; index++)
172 : {
173 : if (index > 0)
174 : queryString = queryString + ' AND';
175 : queryString = queryString + ' c.' + propertyNames[index] + ' = @property' + index;
176 : queryParameters.push({ name: '@property' + index, value: propertyValues[index] });
177 : }
178 : var query =
179 : {
180 : query: queryString,
181 : parameters: queryParameters
182 : };
183 : var requestOptions =
184 : {
185 : continuation: continuation
186 : };
187 :
188 : var isAccepted = collection.queryDocuments(collection.getSelfLink(), query, requestOptions,
189 : function(err, results, responseOptions)
190 : {
191 : if (err)
192 : {
193 : throw new Error('Error querying for documents with duplicate: ' + err.message);
194 : }
195 : if (results.length > 0)
196 : {
197 : // At least one document with property exists.
198 : throw new Error('Document with the property: ' + JSON.stringify(propertyNames) + ' and value: ' + JSON.stringify(propertyValues) + ', already exists: ' + JSON.stringify(results[0]));
199 : }
200 : else if (responseOptions.continuation)
201 : {
202 : // Else if the query came back empty, but with a continuation token; repeat the query w/ the token.
203 : // This is highly unlikely; but is included to serve as an example for larger queries.
204 : lookForDuplicates(propertyNames, propertyValues, responseOptions.continuation);
205 : }
206 : else
207 : {
208 : // Success, no duplicates found! Do nothing.
209 : }
210 : }
211 : );
212 :
213 : // If we hit execution bounds - throw an exception.
214 : // This is highly unlikely; but is included to serve as an example for more complex operations.
215 : if (!isAccepted)
216 : {
217 : throw new Error('Timeout querying for document with duplicates.');
218 : }
219 : }
220 :
221 : ");
222 : string propertyNames = uniqiueIndexPropertyNames.Aggregate("", (current, uniqiueIndexPropertyName) => string.Format("{0}{1}\"{2}\"", current, string.IsNullOrWhiteSpace(current) ? string.Empty : ", ", uniqiueIndexPropertyName));
223 : string propertyValues = uniqiueIndexPropertyNames.Aggregate("", (current, uniqiueIndexPropertyName) => string.Format("{0}{1}documentToCreate[\"{2}\"]", current, string.IsNullOrWhiteSpace(current) ? string.Empty : ", ", uniqiueIndexPropertyName));
224 : foreach (string uniqiueIndexPropertyName in uniqiueIndexPropertyNames)
225 : {
226 : /*
227 : if (uniqiueIndexPropertyName.Contains("::"))
228 : {
229 : string[] values = uniqiueIndexPropertyName.Split(new[] { "::" }, StringSplitOptions.RemoveEmptyEntries);
230 : string preFilter = null;
231 : string propertyName = values[0];
232 : string subPropertyName = values[1];
233 : if (values.Length == 3)
234 : {
235 : preFilter = values[0];
236 : propertyName = values[1];
237 : subPropertyName = values[2];
238 : }
239 : body.Append(@"
240 : if (!(""" + propertyName + @""" in documentToCreate)) {
241 : throw new Error('Document must include a """ + propertyName + @""" property.');
242 : }
243 : // get property
244 : var propertyData = documentToCreate[""" + propertyName + @"""];
245 : var property = JSON.parse(propertyData);
246 : ");
247 : }
248 : else
249 : {
250 : body.Append(@"
251 : if (!(""" + uniqiueIndexPropertyName + @""" in documentToCreate)) {
252 : throw new Error('Document must include a """ + uniqiueIndexPropertyName + @""" property.');
253 : lookForDuplicates(""" + uniqiueIndexPropertyName + @""", documentToCreate[""" + uniqiueIndexPropertyName + @"""]);");
254 : }
255 : */
256 : body.Append(@"
257 : if (!(""" + uniqiueIndexPropertyName + @""" in documentToCreate))
258 : throw new Error('Document must include a """ + uniqiueIndexPropertyName + @""" property.');");
259 : }
260 :
261 : body.Append(@"
262 : lookForDuplicates([" + propertyNames + @"], [" + propertyValues + @"]);
263 : }");
264 :
265 : var trigger = new Trigger
266 : {
267 : Id = "ValidateUniqueConstraints",
268 : Body = body.ToString(),
269 : TriggerOperation = TriggerOperation.Create,
270 : TriggerType = TriggerType.Pre
271 : };
272 : ExecuteFaultTollerantFunction(() => client.CreateTriggerAsync(result.SelfLink, trigger).Result);
273 : }
274 : Logger.LogDebug(string.Format("Getting Azure document collection took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadCollection");
275 : // AzureDocumentDbConnectionCache.SetDocumentCollection(documentCollectionCacheKey, result);
276 : return result;
277 : }
278 : finally
279 : {
280 : Logger.LogDebug("Getting Azure collection... Done", "AzureDocumentDbHelper\\CreateOrReadCollection");
281 : }
282 : }
283 :
284 : /// <summary>
285 : /// Process the provided <paramref name="documentClientException"/> checking for operations that can be retired.
286 : /// </summary>
287 : /// <param name="documentClientException">The <see cref="DocumentClientException"/> to check.</param>
288 1 : protected virtual void ProcessFaultTollerantExceptions(DocumentClientException documentClientException)
289 : {
290 : var statusCode = (int)documentClientException.StatusCode;
291 : if (statusCode == 429 || statusCode == 503)
292 : Thread.Sleep(documentClientException.RetryAfter);
293 : else
294 : {
295 : Logger.LogWarning("Non-fault tolerant exception raised via DocumentClientException.", "AzureDocumentDbDataStore\\ProcessFaultTollerantExceptions");
296 : if (documentClientException.Error.Message == "Resource with specified id or name already exists.")
297 : throw new DuplicateCreateCommandException(documentClientException);
298 : throw new DocumentDbException(documentClientException);
299 : }
300 : }
301 :
302 : /// <summary>
303 : /// Execute the provided <paramref name="func"/> in a fault tolerant way.
304 : /// </summary>
305 : /// <param name="func">The <see cref="Func{T}"/> to execute.</param>
306 1 : public virtual T ExecuteFaultTollerantFunction<T>(Func<T> func)
307 : {
308 : while (true)
309 : {
310 : try
311 : {
312 : return func();
313 : }
314 : catch (DocumentClientException documentClientException)
315 : {
316 : Logger.LogWarning("DocumentClientException thrown.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction");
317 : ProcessFaultTollerantExceptions(documentClientException);
318 : }
319 : catch (AggregateException aggregateException)
320 : {
321 : var documentClientException = aggregateException.InnerException as DocumentClientException;
322 : if (documentClientException != null)
323 : {
324 : Logger.LogWarning("DocumentClientException thrown via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", documentClientException);
325 : ProcessFaultTollerantExceptions(documentClientException);
326 : }
327 : else
328 : Logger.LogWarning("Non DocumentClientException raised via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", aggregateException);
329 : }
330 : }
331 : }
332 :
333 : /// <summary>
334 : /// Execute the provided <paramref name="func"/> in a fault tolerant way.
335 : /// </summary>
336 : /// <param name="func">The <see cref="Action"/> to execute.</param>
337 1 : public virtual void ExecuteFaultTollerantFunction(Action func)
338 : {
339 : while (true)
340 : {
341 : try
342 : {
343 : func();
344 : return;
345 : }
346 : catch (DocumentClientException documentClientException)
347 : {
348 : ProcessFaultTollerantExceptions(documentClientException);
349 : }
350 : catch (AggregateException aggregateException)
351 : {
352 : var documentClientException = aggregateException.InnerException as DocumentClientException;
353 : if (documentClientException != null)
354 : {
355 : Logger.LogWarning("DocumentClientException thrown via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", documentClientException);
356 : ProcessFaultTollerantExceptions(documentClientException);
357 : }
358 : else
359 : Logger.LogWarning("Non DocumentClientException raised via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", aggregateException);
360 : }
361 : }
362 : }
363 : }
364 : }
|