       1             : #region Copyright
       2             : // // -----------------------------------------------------------------------
       3             : // // <copyright company="cdmdotnet Limited">
       4             : // //   Copyright cdmdotnet Limited. All rights reserved.
       5             : // // </copyright>
       6             : // // -----------------------------------------------------------------------
       7             : #endregion
       8             : 
       9             : using System;
      10             : using System.Collections.Generic;
      11             : using System.Linq;
      12             : using System.Text;
      13             : using System.Threading;
      14             : using System.Threading.Tasks;
      15             : using cdmdotnet.Logging;
      16             : using Cqrs.Domain.Exceptions;
      17             : using Microsoft.Azure.Documents;
      18             : using Microsoft.Azure.Documents.Client;
      19             : using Microsoft.Azure.Documents.Linq;
      20             : 
      21             : namespace Cqrs.Azure.DocumentDb
      22             : {
      23             :         public class AzureDocumentDbHelper : IAzureDocumentDbHelper
      24           0 :         {
      25             :                 protected ILogger Logger { get; private set; }
      26             : 
      27             :                 protected IAzureDocumentDbConnectionCache AzureDocumentDbConnectionCache { get; private set; }
      28             : 
      29           0 :                 public AzureDocumentDbHelper(ILogger logger, IAzureDocumentDbConnectionCache azureDocumentDbConnectionCache)
      30             :                 {
      31             :                         Logger = logger;
      32             :                         AzureDocumentDbConnectionCache = azureDocumentDbConnectionCache;
      33             :                 }
      34             : 
      35           0 :                 public async Task<Database> CreateOrReadDatabase(DocumentClient client, string databaseName)
      36             :                 {
      37             :                         Logger.LogDebug("Getting Azure database", "AzureDocumentDbHelper\\CreateOrReadDatabase");
      38             :                         DateTime start = DateTime.Now;
      39             :                         Database result;
      40             :                         string databaseCacheKey = string.Format("AzureDocumentDbDatabase::{0}", databaseName);
      41             :                         if (AzureDocumentDbConnectionCache.TryGetDatabase(databaseCacheKey, out result))
      42             :                         {
      43             :                                 Logger.LogDebug(string.Format("Returning cached database took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadDatabase");
      44             :                                 try
      45             :                                 {
      46             :                                         return result;
      47             :                                 }
      48             :                                 finally
      49             :                                 {
      50             :                                         Logger.LogDebug(string.Format("Returning cached database took... Done"), "AzureDocumentDbHelper\\CreateOrReadDatabase");
      51             :                                 }
      52             :                         }
      53             :                         try
      54             :                         {
      55             :                                 IEnumerable<Database> query = client.CreateDatabaseQuery()
      56             :                                         .Where(database => database.Id == databaseName)
      57             :                                         .AsEnumerable();
      58             :                                 Logger.LogDebug("Checking if the database exists", "AzureDocumentDbHelper\\CreateOrReadDatabase");
      59             :                                 start = DateTime.Now;
      60             :                                 result = query.SingleOrDefault();
      61             :                                 if (result != null)
      62             :                                 {
      63             :                                         Logger.LogDebug(string.Format("Returning the existing database took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadDatabase");
      64             :                                         try
      65             :                                         {
      66             :                                                 AzureDocumentDbConnectionCache.SetDatabase(databaseCacheKey, result);
      67             :                                                 return result;
      68             :                                         }
      69             :                                         finally
      70             :                                         {
      71             :                                                 Logger.LogDebug(string.Format("Returning the existing database... Done"), "AzureDocumentDbHelper\\CreateOrReadDatabase");
      72             :                                         }
      73             :                                 }
      74             :                                 Logger.LogDebug("Creating and returning a new database", "AzureDocumentDbHelper\\CreateOrReadDatabase");
      75             :                                 start = DateTime.Now;
      76             : 
      77             :                                 result = ExecuteFaultTollerantFunction(() => client.CreateDatabaseAsync(new Database { Id = databaseName }).Result);
      78             : 
      79             :                                 Logger.LogDebug(string.Format("Getting Azure database took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadDatabase");
      80             :                                 AzureDocumentDbConnectionCache.SetDatabase(databaseCacheKey, result);
      81             :                                 return result;
      82             :                         }
      83             :                         finally
      84             :                         {
      85             :                                 Logger.LogDebug("Getting Azure database... Done", "AzureDocumentDbHelper\\CreateOrReadDatabase");
      86             :                         }
      87             :                 }
      88             : 
      89           0 :                 public async Task<DocumentCollection> CreateOrReadCollection(DocumentClient client, Database database, string collectionName, string[] uniqiueIndexPropertyNames = null)
      90             :                 {
      91             :                         Logger.LogDebug("Getting Azure collection", "AzureDocumentDbHelper\\CreateOrReadCollection");
      92             :                         DateTime start = DateTime.Now;
      93             :                         DocumentCollection result;
      94             :                         string documentCollectionCacheKey = string.Format("AzureDocumentDbDocumentCollection::{0}", collectionName);
      95             :                         if (AzureDocumentDbConnectionCache.TryGetDocumentCollection(documentCollectionCacheKey, out result))
      96             :                         {
      97             :                                 Logger.LogDebug(string.Format("Returning cached collection took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadCollection");
      98             :                                 try
      99             :                                 {
     100             :                                         return result;
     101             :                                 }
     102             :                                 finally
     103             :                                 {
     104             :                                         Logger.LogDebug(string.Format("Returning cached collection took... Done"), "AzureDocumentDbHelper\\CreateOrReadCollection");
     105             :                                 }
     106             :                         }
     107             :                         try
     108             :                         {
     109             :                                 IEnumerable<DocumentCollection> query = client.CreateDocumentCollectionQuery(database.SelfLink)
     110             :                                         .Where(documentCollection => documentCollection.Id == collectionName)
     111             :                                         .AsEnumerable();
     112             :                                 Logger.LogDebug("Checking if the collection exists", "AzureDocumentDbHelper\\CreateOrReadCollection");
     113             :                                 start = DateTime.Now;
     114             :                                 result = query.SingleOrDefault();
     115             :                                 if (result != null)
     116             :                                 {
     117             :                                         Logger.LogDebug(string.Format("Returning the existing document collection took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadCollection");
     118             :                                         try
     119             :                                         {
     120             :                                                 // AzureDocumentDbConnectionCache.SetDocumentCollection(documentCollectionCacheKey, result);
     121             :                                                 return result;
     122             :                                         }
     123             :                                         finally
     124             :                                         {
     125             :                                                 Logger.LogDebug(string.Format("Returning the existing document collection... Done"), "AzureDocumentDbHelper\\CreateOrReadCollection");
     126             :                                         }
     127             :                                 }
     128             :                                 Logger.LogDebug("Creating and returning a new collection", "AzureDocumentDbHelper\\CreateOrReadCollection");
     129             :                                 start = DateTime.Now;
     130             :                                 result = ExecuteFaultTollerantFunction(() => client.CreateDocumentCollectionAsync(database.SelfLink, new DocumentCollection { Id = collectionName }).Result);
     131             :                                 if (uniqiueIndexPropertyNames != null)
     132             :                                 {
     133             :                                         StringBuilder body = new StringBuilder(@"
     134             : function()
     135             : {
     136             :         var context = getContext();
     137             :         var collection = context.getCollection();
     138             :         var request = context.getRequest();
     139             : 
     140             :         // document to be created in the current operation
     141             :         var documentToCreate = request.getBody();
     142             : 
     143             :         function lookForDuplicates(propertyNames, propertyValues, continuation)
     144             :         {
     145             :                 var queryString = 'SELECT * FROM c WHERE ';
     146             :                 var queryParameters = [];
     147             :                 for (index = 0; index < propertyNames.length; index++)
     148             :                 {
     149             :                         if (index > 0)
     150             :                                 queryString = queryString + ' AND';
     151             :                         queryString = queryString + ' c.' + propertyNames[index] + ' = @property' + index;
     152             :                         queryParameters.push({ name: '@property' + index, value: propertyValues[index] });
     153             :                 }
     154             :                 var query =
     155             :                 {
     156             :                         query: queryString,
     157             :                         parameters: queryParameters
     158             :                 };
     159             :                 var requestOptions =
     160             :                 {
     161             :                         continuation: continuation
     162             :                 };
     163             : 
     164             :                 var isAccepted = collection.queryDocuments(collection.getSelfLink(), query, requestOptions,
     165             :                         function(err, results, responseOptions)
     166             :                         {
     167             :                                 if (err)
     168             :                                 {
     169             :                                         throw new Error('Error querying for documents with duplicate: ' + err.message);
     170             :                                 }
     171             :                                 if (results.length > 0)
     172             :                                 {
     173             :                                         // At least one document with property exists.
     174             :                                         throw new Error('Document with the property: ' + JSON.stringify(propertyNames) + ' and value: ' + JSON.stringify(propertyValues) + ', already exists: ' + JSON.stringify(results[0]));
     175             :                                 }
     176             :                                 else if (responseOptions.continuation)
     177             :                                 {
     178             :                                         // Else if the query came back empty, but with a continuation token; repeat the query w/ the token.
     179             :                                         // This is highly unlikely; but is included to serve as an example for larger queries.
     180             :                                         lookForDuplicates(propertyNames, propertyValues, responseOptions.continuation);
     181             :                                 }
     182             :                                 else
     183             :                                 {
     184             :                                         // Success, no duplicates found! Do nothing.
     185             :                                 }
     186             :                         }
     187             :                 );
     188             : 
     189             :                 // If we hit execution bounds - throw an exception.
     190             :                 // This is highly unlikely; but is included to serve as an example for more complex operations.
     191             :                 if (!isAccepted)
     192             :                 {
     193             :                         throw new Error('Timeout querying for document with duplicates.');
     194             :                 }
     195             :         }
     196             : 
     197             : ");
     198             :                                         string propertyNames = uniqiueIndexPropertyNames.Aggregate("", (current, uniqiueIndexPropertyName) => string.Format("{0}{1}\"{2}\"", current, string.IsNullOrWhiteSpace(current) ? string.Empty : ", ", uniqiueIndexPropertyName));
     199             :                                         string propertyValues = uniqiueIndexPropertyNames.Aggregate("", (current, uniqiueIndexPropertyName) => string.Format("{0}{1}documentToCreate[\"{2}\"]", current, string.IsNullOrWhiteSpace(current) ? string.Empty : ", ", uniqiueIndexPropertyName));
     200             :                                         foreach (string uniqiueIndexPropertyName in uniqiueIndexPropertyNames)
     201             :                                         {
     202             :                                                 /*
     203             :                                                 if (uniqiueIndexPropertyName.Contains("::"))
     204             :                                                 {
     205             :                                                         string[] values = uniqiueIndexPropertyName.Split(new[] { "::" }, StringSplitOptions.RemoveEmptyEntries);
     206             :                                                         string preFilter = null;
     207             :                                                         string propertyName = values[0];
     208             :                                                         string subPropertyName = values[1];
     209             :                                                         if (values.Length == 3)
     210             :                                                         {
     211             :                                                                 preFilter = values[0];
     212             :                                                                 propertyName = values[1];
     213             :                                                                 subPropertyName = values[2];
     214             :                                                         }
     215             :                                                         body.Append(@"
     216             :         if (!(""" + propertyName + @""" in documentToCreate)) {
     217             :                 throw new Error('Document must include a """ + propertyName + @""" property.');
     218             :         }
     219             :         // get property
     220             :         var propertyData = documentToCreate[""" + propertyName + @"""];
     221             :         var property = JSON.parse(propertyData);
     222             : ");
     223             :                                                 }
     224             :                                                 else
     225             :                                                 {
     226             :                                                         body.Append(@"
     227             :         if (!(""" + uniqiueIndexPropertyName + @""" in documentToCreate)) {
     228             :                 throw new Error('Document must include a """ + uniqiueIndexPropertyName + @""" property.');
     229             :         lookForDuplicates(""" + uniqiueIndexPropertyName + @""", documentToCreate[""" + uniqiueIndexPropertyName + @"""]);");
     230             :                                                 }
     231             :                                                 */
     232             :                                                         body.Append(@"
     233             :         if (!(""" + uniqiueIndexPropertyName + @""" in documentToCreate))
     234             :                 throw new Error('Document must include a """ + uniqiueIndexPropertyName + @""" property.');");
     235             :                                         }
     236             : 
     237             :                                         body.Append(@"
     238             :         lookForDuplicates([" + propertyNames + @"], [" + propertyValues + @"]);
     239             : }");
     240             : 
     241             :                                         var trigger = new Trigger
     242             :                                         {
     243             :                                                 Id = "ValidateUniqueConstraints",
     244             :                                                 Body = body.ToString(),
     245             :                                                 TriggerOperation = TriggerOperation.Create,
     246             :                                                 TriggerType = TriggerType.Pre
     247             :                                         };
     248             :                                         ExecuteFaultTollerantFunction(() => client.CreateTriggerAsync(result.SelfLink, trigger).Result);
     249             :                                 }
     250             :                                 Logger.LogDebug(string.Format("Getting Azure document collection took {0}", DateTime.Now - start), "AzureDocumentDbHelper\\CreateOrReadCollection");
     251             :                                 // AzureDocumentDbConnectionCache.SetDocumentCollection(documentCollectionCacheKey, result);
     252             :                                 return result;
     253             :                         }
     254             :                         finally
     255             :                         {
     256             :                                 Logger.LogDebug("Getting Azure collection... Done", "AzureDocumentDbHelper\\CreateOrReadCollection");
     257             :                         }
     258             :                 }
     259             : 
     260           0 :                 protected virtual void ProcessFaultTollerantExceptions(DocumentClientException documentClientException)
     261             :                 {
     262             :                         var statusCode = (int)documentClientException.StatusCode;
     263             :                         if (statusCode == 429 || statusCode == 503)
     264             :                                 Thread.Sleep(documentClientException.RetryAfter);
     265             :                         else
     266             :                         {
     267             :                                 Logger.LogWarning("Non-fault tolerant exception raised via DocumentClientException.", "AzureDocumentDbDataStore\\ProcessFaultTollerantExceptions");
     268             :                                 if (documentClientException.Error.Message == "Resource with specified id or name already exists.")
     269             :                                         throw new DuplicateCreateCommandException(documentClientException);
     270             :                                 throw new DocumentDbException("Non-fault tolerant exception raised.", documentClientException);
     271             :                         }
     272             :                 }
     273             : 
     274           0 :                 public virtual T ExecuteFaultTollerantFunction<T>(Func<T> func)
     275             :                 {
     276             :                         while (true)
     277             :                         {
     278             :                                 try
     279             :                                 {
     280             :                                         return func();
     281             :                                 }
     282             :                                 catch (DocumentClientException documentClientException)
     283             :                                 {
     284             :                                         Logger.LogWarning("DocumentClientException thrown.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction");
     285             :                                         ProcessFaultTollerantExceptions(documentClientException);
     286             :                                 }
     287             :                                 catch (AggregateException aggregateException)
     288             :                                 {
     289             :                                         var documentClientException = aggregateException.InnerException as DocumentClientException;
     290             :                                         if (documentClientException != null)
     291             :                                         {
     292             :                                                 Logger.LogWarning("DocumentClientException thrown via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", documentClientException);
     293             :                                                 ProcessFaultTollerantExceptions(documentClientException);
     294             :                                         }
     295             :                                         else
     296             :                                                 Logger.LogWarning("Non DocumentClientException raised via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", aggregateException);
     297             :                                 }
     298             :                         }
     299             :                 }
     300             : 
     301           0 :                 public virtual void ExecuteFaultTollerantFunction(Action func)
     302             :                 {
     303             :                         while (true)
     304             :                         {
     305             :                                 try
     306             :                                 {
     307             :                                         func();
     308             :                                         return;
     309             :                                 }
     310             :                                 catch (DocumentClientException documentClientException)
     311             :                                 {
     312             :                                         ProcessFaultTollerantExceptions(documentClientException);
     313             :                                 }
     314             :                                 catch (AggregateException aggregateException)
     315             :                                 {
     316             :                                         var documentClientException = aggregateException.InnerException as DocumentClientException;
     317             :                                         if (documentClientException != null)
     318             :                                         {
     319             :                                                 Logger.LogWarning("DocumentClientException thrown via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", documentClientException);
     320             :                                                 ProcessFaultTollerantExceptions(documentClientException);
     321             :                                         }
     322             :                                         else
     323             :                                                 Logger.LogWarning("Non DocumentClientException raised via AggregateException.", "AzureDocumentDbDataStore\\ExecuteFaultTollerantFunction", aggregateException);
     324             :                                 }
     325             :                         }
     326             :                 }
     327             :         }
     328             : }

