Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Text;
13 : using System.Threading;
14 : using System.Threading.Tasks;
15 : using cdmdotnet.Logging;
16 : using Cqrs.Domain.Exceptions;
17 : using Microsoft.Azure.Documents;
18 : using Microsoft.Azure.Documents.Client;
19 : using Microsoft.Azure.Documents.Linq;
20 :
21 : namespace Cqrs.Azure.DocumentDb
22 : {
23 : public class AzureDocumentDbHelper : IAzureDocumentDbHelper
24 0 : {
25 : protected ILogger Logger { get; private set; }
26 :
27 : protected IAzureDocumentDbConnectionCache AzureDocumentDbConnectionCache { get; private set; }
28 :
29 0 : public AzureDocumentDbHelper(ILogger logger, IAzureDocumentDbConnectionCache azureDocumentDbConnectionCache)
30 : {
31 : Logger = logger;
32 : AzureDocumentDbConnectionCache = azureDocumentDbConnectionCache;
33 : }
34 :
35 0 : public async Task<Database> CreateOrReadDatabase(DocumentClient client, string databaseName)
36 : {
37 : Logger.LogDebug("Getting Azure database", "AzureDocumentDbHelper\\CreateOrReadDatabase");
38 : DateTime start = DateTime.Now;
39 : Database result;
40 : string databaseCacheKey = string.Format("AzureDocumentDbDatabase::{0}", databaseName);
41 : if (AzureDocumentDbConnectionCache.TryGetDatabase(databaseCacheKey, out result))
42 : {
43 : Logger.LogDebug(string.Format("Returning cached database took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadDatabase");
44 : try
45 : {
46 : return result;
47 : }
48 : finally
49 : {
50 : Logger.LogDebug(string.Format("Returning cached database took... Done"), "AzureDocumentDbHelper\\CreateOrReadDatabase");
51 : }
52 : }
53 : try
54 : {
55 : IEnumerable<Database> query = client.CreateDatabaseQuery()
56 : .Where(database => database.Id == databaseName)
57 : .AsEnumerable();
58 : Logger.LogDebug("Checking if the database exists", "AzureDocumentDbHelper\\CreateOrReadDatabase");
59 : start = DateTime.Now;
60 : result = query.SingleOrDefault();
61 : if (result != null)
62 : {
63 : Logger.LogDebug(string.Format("Returning the existing database took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadDatabase");
64 : try
65 : {
66 : AzureDocumentDbConnectionCache.SetDatabase(databaseCacheKey, result);
67 : return result;
68 : }
69 : finally
70 : {
71 : Logger.LogDebug(string.Format("Returning the existing database... Done"), "AzureDocumentDbHelper\\CreateOrReadDatabase");
72 : }
73 : }
74 : Logger.LogDebug("Creating and returning a new database", "AzureDocumentDbHelper\\CreateOrReadDatabase");
75 : start = DateTime.Now;
76 :
77 : result = ExecuteFaultTollerantFunction(() => client.CreateDatabaseAsync(new Database { Id = databaseName }).Result);
78 :
79 : Logger.LogDebug(string.Format("Getting Azure database took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadDatabase");
80 : AzureDocumentDbConnectionCache.SetDatabase(databaseCacheKey, result);
81 : return result;
82 : }
83 : finally
84 : {
85 : Logger.LogDebug("Getting Azure database... Done", "AzureDocumentDbHelper\\CreateOrReadDatabase");
86 : }
87 : }
88 :
89 0 : public async Task<DocumentCollection> CreateOrReadCollection(DocumentClient client, Database database, string collectionName, string[] uniqiueIndexPropertyNames = null)
90 : {
91 : Logger.LogDebug("Getting Azure collection", "AzureDocumentDbHelper\\CreateOrReadCollection");
92 : DateTime start = DateTime.Now;
93 : DocumentCollection result;
94 : string documentCollectionCacheKey = string.Format("AzureDocumentDbDocumentCollection::{0}", collectionName);
95 : if (AzureDocumentDbConnectionCache.TryGetDocumentCollection(documentCollectionCacheKey, out result))
96 : {
97 : Logger.LogDebug(string.Format("Returning cached collection took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadCollection");
98 : try
99 : {
100 : return result;
101 : }
102 : finally
103 : {
104 : Logger.LogDebug(string.Format("Returning cached collection took... Done"), "AzureDocumentDbHelper\\CreateOrReadCollection");
105 : }
106 : }
107 : try
108 : {
109 : IEnumerable<DocumentCollection> query = client.CreateDocumentCollectionQuery(database.SelfLink)
110 : .Where(documentCollection => documentCollection.Id == collectionName)
111 : .AsEnumerable();
112 : Logger.LogDebug("Checking if the collection exists", "AzureDocumentDbHelper\\CreateOrReadCollection");
113 : start = DateTime.Now;
114 : result = query.SingleOrDefault();
115 : if (result != null)
116 : {
117 : Logger.LogDebug(string.Format("Returning the existing document collection took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadCollection");
118 : try
119 : {
120 : // AzureDocumentDbConnectionCache.SetDocumentCollection(documentCollectionCacheKey, result);
121 : return result;
122 : }
123 : finally
124 : {
125 : Logger.LogDebug(string.Format("Returning the existing document collection... Done"), "AzureDocumentDbHelper\\CreateOrReadCollection");
126 : }
127 : }
128 : Logger.LogDebug("Creating and returning a new collection", "AzureDocumentDbHelper\\CreateOrReadCollection");
129 : start = DateTime.Now;
130 : result = ExecuteFaultTollerantFunction(() => client.CreateDocumentCollectionAsync(database.SelfLink, new DocumentCollection { Id = collectionName }).Result);
131 : if (uniqiueIndexPropertyNames != null)
132 : {
133 : StringBuilder body = new StringBuilder(@"
134 : function()
135 : {
136 : var context = getContext();
137 : var collection = context.getCollection();
138 : var request = context.getRequest();
139 :
140 : // document to be created in the current operation
141 : var documentToCreate = request.getBody();
142 :
143 : function lookForDuplicates(propertyNames, propertyValues, continuation)
144 : {
145 : var queryString = 'SELECT * FROM c WHERE ';
146 : var queryParameters = [];
147 : for (index = 0; index < propertyNames.length; index++)
148 : {
149 : if (index > 0)
150 : queryString = queryString + ' AND';
151 : queryString = queryString + ' c.' + propertyNames[index] + ' = @property' + index;
152 : queryParameters.push({ name: '@property' + index, value: propertyValues[index] });
153 : }
154 : var query =
155 : {
156 : query: queryString,
157 : parameters: queryParameters
158 : };
159 : var requestOptions =
160 : {
161 : continuation: continuation
162 : };
163 :
164 : var isAccepted = collection.queryDocuments(collection.getSelfLink(), query, requestOptions,
165 : function(err, results, responseOptions)
166 : {
167 : if (err)
168 : {
169 : throw new Error('Error querying for documents with duplicate: ' + err.message);
170 : }
171 : if (results.length > 0)
172 : {
173 : // At least one document with property exists.
174 : throw new Error('Document with the property: ' + JSON.stringify(propertyNames) + ' and value: ' + JSON.stringify(propertyValues) + ', already exists: ' + JSON.stringify(results[0]));
175 : }
176 : else if (responseOptions.continuation)
177 : {
178 : // Else if the query came back empty, but with a continuation token; repeat the query w/ the token.
179 : // This is highly unlikely; but is included to serve as an example for larger queries.
180 : lookForDuplicates(propertyNames, propertyValues, responseOptions.continuation);
181 : }
182 : else
183 : {
184 : // Success, no duplicates found! Do nothing.
185 : }
186 : }
187 : );
188 :
189 : // If we hit execution bounds - throw an exception.
190 : // This is highly unlikely; but is included to serve as an example for more complex operations.
191 : if (!isAccepted)
192 : {
193 : throw new Error('Timeout querying for document with duplicates.');
194 : }
195 : }
196 :
197 : ");
198 : string propertyNames = uniqiueIndexPropertyNames.Aggregate("", (current, uniqiueIndexPropertyName) => string.Format("{0}{1}\"{2}\"", current, string.IsNullOrWhiteSpace(current) ? string.Empty : ", ", uniqiueIndexPropertyName));
199 : string propertyValues = uniqiueIndexPropertyNames.Aggregate("", (current, uniqiueIndexPropertyName) => string.Format("{0}{1}documentToCreate[\"{2}\"]", current, string.IsNullOrWhiteSpace(current) ? string.Empty : ", ", uniqiueIndexPropertyName));
200 : foreach (string uniqiueIndexPropertyName in uniqiueIndexPropertyNames)
201 : {
202 : /*
203 : if (uniqiueIndexPropertyName.Contains("::"))
204 : {
205 : string[] values = uniqiueIndexPropertyName.Split(new[] { "::" }, StringSplitOptions.RemoveEmptyEntries);
206 : string preFilter = null;
207 : string propertyName = values[0];
208 : string subPropertyName = values[1];
209 : if (values.Length == 3)
210 : {
211 : preFilter = values[0];
212 : propertyName = values[1];
213 : subPropertyName = values[2];
214 : }
215 : body.Append(@"
216 : if (!(""" + propertyName + @""" in documentToCreate)) {
217 : throw new Error('Document must include a """ + propertyName + @""" property.');
218 : }
219 : // get property
220 : var propertyData = documentToCreate[""" + propertyName + @"""];
221 : var property = JSON.parse(propertyData);
222 : ");
223 : }
224 : else
225 : {
226 : body.Append(@"
227 : if (!(""" + uniqiueIndexPropertyName + @""" in documentToCreate)) {
228 : throw new Error('Document must include a """ + uniqiueIndexPropertyName + @""" property.');
229 : lookForDuplicates(""" + uniqiueIndexPropertyName + @""", documentToCreate[""" + uniqiueIndexPropertyName + @"""]);");
230 : }
231 : */
232 : body.Append(@"
233 : if (!(""" + uniqiueIndexPropertyName + @""" in documentToCreate))
234 : throw new Error('Document must include a """ + uniqiueIndexPropertyName + @""" property.');");
235 : }
236 :
237 : body.Append(@"
238 : lookForDuplicates([" + propertyNames + @"], [" + propertyValues + @"]);
239 : }");
240 :
241 : var trigger = new Trigger
242 : {
243 : Id = "ValidateUniqueConstraints",
244 : Body = body.ToString(),
245 : TriggerOperation = TriggerOperation.Create,
246 : TriggerType = TriggerType.Pre
247 : };
248 : ExecuteFaultTollerantFunction(() => client.CreateTriggerAsync(result.SelfLink, trigger).Result);
249 : }
250 : Logger.LogDebug(string.Format("Getting Azure document collection took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadCollection");
251 : // AzureDocumentDbConnectionCache.SetDocumentCollection(documentCollectionCacheKey, result);
252 : return result;
253 : }
254 : finally
255 : {
256 : Logger.LogDebug("Getting Azure collection... Done", "AzureDocumentDbHelper\\CreateOrReadCollection");
257 : }
258 : }
259 :
260 0 : protected virtual void ProcessFaultTollerantExceptions(DocumentClientException documentClientException)
261 : {
262 : var statusCode = (int)documentClientException.StatusCode;
263 : if (statusCode == 429 || statusCode == 503)
264 : Thread.Sleep(documentClientException.RetryAfter);
265 : else
266 : {
267 : Logger.LogWarning("Non-fault tolerant exception raised via DocumentClientException.", "AzureDocumentDbDataStore\\ProcessFaultTollerantExceptions");
268 : if (documentClientException.Error.Message == "Resource with specified id or name already exists.")
269 : throw new DuplicateCreateCommandException(documentClientException);
270 : throw new DocumentDbException("Non-fault tolerant exception raised.", documentClientException);
271 : }
272 : }
273 :
274 0 : public virtual T ExecuteFaultTollerantFunction<T>(Func<T> func)
275 : {
276 : while (true)
277 : {
278 : try
279 : {
280 : return func();
281 : }
282 : catch (DocumentClientException documentClientException)
283 : {
284 : Logger.LogWarning("DocumentClientException thrown.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction");
285 : ProcessFaultTollerantExceptions(documentClientException);
286 : }
287 : catch (AggregateException aggregateException)
288 : {
289 : var documentClientException = aggregateException.InnerException as DocumentClientException;
290 : if (documentClientException != null)
291 : {
292 : Logger.LogWarning("DocumentClientException thrown via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", documentClientException);
293 : ProcessFaultTollerantExceptions(documentClientException);
294 : }
295 : else
296 : Logger.LogWarning("Non DocumentClientException raised via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", aggregateException);
297 : }
298 : }
299 : }
300 :
301 0 : public virtual void ExecuteFaultTollerantFunction(Action func)
302 : {
303 : while (true)
304 : {
305 : try
306 : {
307 : func();
308 : return;
309 : }
310 : catch (DocumentClientException documentClientException)
311 : {
312 : ProcessFaultTollerantExceptions(documentClientException);
313 : }
314 : catch (AggregateException aggregateException)
315 : {
316 : var documentClientException = aggregateException.InnerException as DocumentClientException;
317 : if (documentClientException != null)
318 : {
319 : Logger.LogWarning("DocumentClientException thrown via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", documentClientException);
320 : ProcessFaultTollerantExceptions(documentClientException);
321 : }
322 : else
323 : Logger.LogWarning("Non DocumentClientException raised via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", aggregateException);
324 : }
325 : }
326 : }
327 : }
328 : }
|