|           Line data    Source code 
       1             : #region Copyright
       2             : // // -----------------------------------------------------------------------
       3             : // // <copyright company="Chinchilla Software Limited">
       4             : // //   Copyright Chinchilla Software Limited. All rights reserved.
       5             : // // </copyright>
       6             : // // -----------------------------------------------------------------------
       7             : #endregion
       8             : 
       9             : using System;
      10             : using System.Collections.Generic;
      11             : using System.IO;
      12             : using System.Linq;
      13             : using System.Linq.Expressions;
      14             : using System.Threading.Tasks;
      15             : using cdmdotnet.Logging;
      16             : using Cqrs.Entities;
      17             : using Microsoft.WindowsAzure.Storage;
      18             : using Microsoft.WindowsAzure.Storage.Blob;
      19             : 
      20             : namespace Cqrs.Azure.BlobStorage
      21             : {
      22             :         /// <summary>
      23             :         /// A <see cref="IEnumerable{TData}"/> that uses Azure Blobl Storage for storage.
      24             :         /// </summary>
      25             :         public class BlobStorageStore<TData>
      26             :                 : StorageStore<TData, CloudBlobContainer>
      27           1 :         {
      28             :                 internal Func<TData, string> GenerateFileName { get; set; }
      29             : 
      30             :                 /// <summary>
      31             :                 /// Initializes a new instance of the <see cref="BlobStorageStore{TData}"/> class using the specified container.
      32             :                 /// </summary>
      33           1 :                 public BlobStorageStore(ILogger logger)
      34             :                         : base(logger)
      35             :                 {
      36             :                 }
      37             : 
      38             :                 #region Implementation of IEnumerable
      39             : 
      40             :                 /// <summary>
      41             :                 /// Returns an enumerator that iterates through the collection.
      42             :                 /// </summary>
      43             :                 /// <returns>
      44             :                 /// A <see cref="T:System.Collections.Generic.IEnumerator`1"/> that can be used to iterate through the collection.
      45             :                 /// </returns>
      46           1 :                 public override IEnumerator<TData> GetEnumerator()
      47             :                 {
      48             :                         return OpenStreamsForReading()
      49             :                                 .Select(Deserialise)
      50             :                                 .GetEnumerator();
      51             :                 }
      52             : 
      53             :                 #endregion
      54             : 
      55             :                 #region Implementation of IQueryable
      56             : 
      57             :                 /// <summary>
      58             :                 /// Gets the expression tree that is associated with the instance of <see cref="T:System.Linq.IQueryable"/>.
      59             :                 /// </summary>
      60             :                 /// <returns>
      61             :                 /// The <see cref="T:System.Linq.Expressions.Expression"/> that is associated with this instance of <see cref="T:System.Linq.IQueryable"/>.
      62             :                 /// </returns>
      63             :                 public override Expression Expression
      64             :                 {
      65             :                         get
      66             :                         {
      67             :                                 return OpenStreamsForReading()
      68             :                                         .Select(Deserialise)
      69             :                                         .AsQueryable()
      70             :                                         .Expression;
      71             :                         }
      72             :                 }
      73             : 
      74             :                 /// <summary>
      75             :                 /// Gets the type of the element(s) that are returned when the expression tree associated with this instance of <see cref="T:System.Linq.IQueryable"/> is executed.
      76             :                 /// </summary>
      77             :                 /// <returns>
      78             :                 /// A <see cref="T:System.Type"/> that represents the type of the element(s) that are returned when the expression tree associated with this object is executed.
      79             :                 /// </returns>
      80             :                 public override Type ElementType
      81             :                 {
      82             :                         get
      83             :                         {
      84             :                                 return OpenStreamsForReading()
      85             :                                         .Select(Deserialise)
      86             :                                         .AsQueryable()
      87             :                                         .ElementType;
      88             :                         }
      89             :                 }
      90             : 
      91             :                 /// <summary>
      92             :                 /// Gets the query provider that is associated with this data source.
      93             :                 /// </summary>
      94             :                 /// <returns>
      95             :                 /// The <see cref="T:System.Linq.IQueryProvider"/> that is associated with this data source.
      96             :                 /// </returns>
      97             :                 public override IQueryProvider Provider
      98             :                 {
      99             :                         get { return OpenStreamsForReading()
     100             :                                         .Select(Deserialise)
     101             :                                         .AsQueryable()
     102             :                                         .Provider;
     103             :                         }
     104             :                 }
     105             : 
     106             :                 #endregion
     107             : 
     108             :                 /// <summary>
     109             :                 /// Save the provided <paramref name="data"/> asynchronously.
     110             :                 /// </summary>
     111           1 :                 protected virtual void AsyncSaveData<TResult>(TData data, Func<TData, CloudBlockBlob, TResult> function, Func<TData, string> customFilenameFunction = null)
     112             :                 {
     113             :                         IList<Task> persistTasks = new List<Task>();
     114             :                         foreach (Tuple<CloudStorageAccount, CloudBlobContainer> tuple in WritableCollection)
     115             :                         {
     116             :                                 TData taskData = data;
     117             :                                 CloudBlobContainer container = tuple.Item2;
     118             :                                 Task task = Task.Factory.StartNewSafely
     119             :                                 (
     120             :                                         () =>
     121             :                                         {
     122             :                                                 string fileName = string.Format("{0}.json", (customFilenameFunction ?? GenerateFileName)(taskData));
     123             :                                                 CloudBlockBlob cloudBlockBlob = GetBlobReference(container, fileName);
     124             :                                                 if (typeof(TResult) == typeof(Uri))
     125             :                                                 {
     126             :                                                         Uri uri = AzureStorageRetryPolicy.ExecuteAction(() => (Uri)(object)function(taskData, cloudBlockBlob));
     127             : 
     128             :                                                         Logger.LogDebug(string.Format("The data entity '{0}' was persisted at uri '{1}'", fileName, uri));
     129             :                                                 }
     130             :                                                 else
     131             :                                                         AzureStorageRetryPolicy.ExecuteAction(() => function(taskData, cloudBlockBlob));
     132             :                                         }
     133             :                                 );
     134             :                                 persistTasks.Add(task);
     135             :                         }
     136             : 
     137             :                         bool anyFailed = Task.Factory.ContinueWhenAll(persistTasks.ToArray(), tasks =>
     138             :                         {
     139             :                                 return tasks.Any(task => task.IsFaulted);
     140             :                         }).Result;
     141             :                         if (anyFailed)
     142             :                                 throw new AggregateException("Persisting data to blob storage failed. Check the logs for more details.");
     143             :                 }
     144             : 
     145             :                 #region Implementation of IDataStore<TData>
     146             : 
     147             :                 /// <summary>
     148             :                 /// Add the provided <paramref name="data"/> to the data store and persist the change.
     149             :                 /// </summary>
     150           1 :                 public override void Add(TData data)
     151             :                 {
     152             :                         AsyncSaveData
     153             :                         (
     154             :                                 data,
     155             :                                 (taskData, cloudBlockBlob) =>
     156             :                                 {
     157             :                                         try
     158             :                                         {
     159             :                                                 cloudBlockBlob.UploadFromStream(Serialise(taskData));
     160             :                                                 cloudBlockBlob.Properties.ContentType = "application/json";
     161             :                                                 cloudBlockBlob.SetProperties();
     162             :                                                 return cloudBlockBlob.Uri;
     163             :                                         }
     164             :                                         catch (Exception exception)
     165             :                                         {
     166             :                                                 Logger.LogError("There was an issue persisting data to blob storage.", exception: exception);
     167             :                                                 throw;
     168             :                                         }
     169             :                                 }
     170             :                         );
     171             :                 }
     172             : 
     173             :                 /// <summary>
     174             :                 /// Remove the provided <paramref name="data"/> (normally by <see cref="IEntity.Rsn"/>) from the data store and persist the change.
     175             :                 /// </summary>
     176           1 :                 public override void Destroy(TData data)
     177             :                 {
     178             :                         AsyncSaveData
     179             :                         (
     180             :                                 data,
     181             :                                 (taskData, cloudBlockBlob) =>
     182             :                                 {
     183             :                                         try
     184             :                                         {
     185             :                                                 return cloudBlockBlob.DeleteIfExists(DeleteSnapshotsOption.IncludeSnapshots);
     186             :                                         }
     187             :                                         catch (Exception exception)
     188             :                                         {
     189             :                                                 Logger.LogError("There was an issue deleting data from blob storage.", exception: exception);
     190             :                                                 throw;
     191             :                                         }
     192             :                                 }
     193             :                         );
     194             :                 }
     195             : 
     196             :                 /// <summary>
     197             :                 /// Remove all contents (normally by use of a truncate operation) from the data store and persist the change.
     198             :                 /// </summary>
     199           1 :                 public override void RemoveAll()
     200             :                 {
     201             :                         foreach (Tuple<CloudStorageAccount, CloudBlobContainer> tuple in WritableCollection)
     202             :                                 tuple.Item2.DeleteIfExists();
     203             :                 }
     204             : 
     205             :                 /// <summary>
     206             :                 /// Update the provided <paramref name="data"/> in the data store and persist the change.
     207             :                 /// </summary>
     208           1 :                 public override void Update(TData data)
     209             :                 {
     210             :                         Add(data);
     211             :                 }
     212             : 
     213             :                 #endregion
     214             : 
     215             :                 /// <summary>
     216             :                 /// Creates a <see cref="CloudBlobContainer"/> with the specified name <paramref name="containerName"/> if it doesn't already exist.
     217             :                 /// </summary>
     218             :                 /// <param name="storageAccount">The storage account to create the <see cref="CloudBlobContainer"/> is</param>
     219             :                 /// <param name="containerName">The name of the <see cref="CloudBlobContainer"/>.</param>
     220             :                 /// <param name="isPublic">Whether or not this <see cref="CloudBlobContainer"/> is publicly accessible.</param>
     221           1 :                 protected override CloudBlobContainer CreateSource(CloudStorageAccount storageAccount, string containerName, bool isPublic = true)
     222             :                 {
     223             :                         CloudBlobContainer container = null;
     224             :                         AzureStorageRetryPolicy.ExecuteAction
     225             :                         (
     226             :                                 () =>
     227             :                                 {
     228             :                                         CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
     229             :                                         container = blobClient.GetContainerReference(GetSafeSourceName(containerName));
     230             :                                         container.CreateIfNotExists();
     231             :                                         if (isPublic)
     232             :                                         {
     233             :                                                 container.SetPermissions(new BlobContainerPermissions
     234             :                                                 {
     235             :                                                         PublicAccess = BlobContainerPublicAccessType.Blob
     236             :                                                 });
     237             :                                         }
     238             :                                 }
     239             :                         );
     240             : 
     241             :                         return container;
     242             :                 }
     243             : 
     244             :                 /// <summary>
     245             :                 /// Opens stream for reading from a block blob.
     246             :                 /// </summary>
     247           1 :                 protected virtual IEnumerable<Stream> OpenStreamsForReading(Func<CloudBlockBlob, bool> predicate = null, string blobPrefix = null, string folderName = null)
     248             :                 {
     249             :                         IEnumerable<IListBlobItem> blobs;
     250             :                         if (!string.IsNullOrWhiteSpace(folderName))
     251             :                         {
     252             :                                 CloudBlobDirectory container = ReadableSource.GetDirectoryReference(folderName);
     253             :                                 blobs = container.ListBlobs(true);
     254             :                         }
     255             :                         else
     256             :                         {
     257             :                                 blobs = ReadableSource.ListBlobs(blobPrefix, true);
     258             :                         }
     259             :                         IEnumerable<CloudBlockBlob> query = blobs
     260             :                                 .Where(x => x is CloudBlockBlob)
     261             :                                 .Cast<CloudBlockBlob>();
     262             :                         if (predicate != null)
     263             :                                 query = query.Where(predicate);
     264             :                         return query.Select(x => x.OpenRead());
     265             :                 }
     266             : 
     267             :                 /// <summary>
     268             :                 /// Gets a reference to a block blob in the container.
     269             :                 /// </summary>
     270             :                 /// <param name="container">The container to get the reference from</param>
     271             :                 /// <param name="blobName">The name of the blob.</param>
     272             :                 /// <returns>A reference to a block blob.</returns>
     273           1 :                 protected virtual CloudBlockBlob GetBlobReference(CloudBlobContainer container, string blobName)
     274             :                 {
     275             :                         return container.GetBlockBlobReference(blobName);
     276             :                 }
     277             : 
     278             :                 /// <summary>
     279             :                 /// Get <typeparamref name="TData"/> by its name.
     280             :                 /// </summary>
     281           1 :                 public virtual TData GetByName(string name)
     282             :                 {
     283             :                         return OpenStreamsForReading(blobPrefix: name.Replace("\\", "/"))
     284             :                                 .Select(Deserialise)
     285             :                                 .SingleOrDefault();
     286             :                         /*
     287             :                         return OpenStreamsForReading(x => x.Name == name)
     288             :                                 .Select(Deserialise)
     289             :                                 .SingleOrDefault();
     290             :                         */
     291             :                 }
     292             : 
     293             :                 /// <summary>
     294             :                 /// Get all <typeparamref name="TData"/> items in the folder.
     295             :                 /// </summary>
     296           1 :                 public virtual IEnumerable<TData> GetByFolder(string folderName)
     297             :                 {
     298             :                         string folder = new Uri(string.Format(folderName.StartsWith("..\\") ? "http://l/2/{0}" : "http://l/{0}", folderName)).AbsolutePath.Substring(1);
     299             :                         return OpenStreamsForReading(folderName: folder)
     300             :                                 .Select(Deserialise);
     301             :                         /*
     302             :                         return OpenStreamsForReading(x => x.Parent.StorageUri.PrimaryUri.AbsolutePath.StartsWith(new Uri(string.Format(folderName.StartsWith("..\\") ? "http://l/{0}/2/{1}" : "http://l/{0}/{1}", GetContainerName(), folderName)).AbsolutePath, StringComparison.InvariantCultureIgnoreCase))
     303             :                                 .Select(Deserialise);
     304             :                         */
     305             :                 }
     306             :         }
     307             : }
 |