Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.IO;
12 : using System.Linq;
13 : using System.Linq.Expressions;
14 : using System.Threading.Tasks;
15 : using cdmdotnet.Logging;
16 : using Cqrs.Entities;
17 : using Microsoft.WindowsAzure.Storage;
18 : using Microsoft.WindowsAzure.Storage.Blob;
19 :
20 : namespace Cqrs.Azure.BlobStorage
21 : {
22 : /// <summary>
23 : /// A <see cref="IEnumerable{TData}"/> that uses Azure Blobl Storage for storage.
24 : /// </summary>
25 : public class BlobStorageStore<TData>
26 : : StorageStore<TData, CloudBlobContainer>
27 1 : {
28 : internal Func<TData, string> GenerateFileName { get; set; }
29 :
30 : /// <summary>
31 : /// Initializes a new instance of the <see cref="BlobStorageStore{TData}"/> class using the specified container.
32 : /// </summary>
33 1 : public BlobStorageStore(ILogger logger)
34 : : base(logger)
35 : {
36 : }
37 :
38 : #region Implementation of IEnumerable
39 :
40 : /// <summary>
41 : /// Returns an enumerator that iterates through the collection.
42 : /// </summary>
43 : /// <returns>
44 : /// A <see cref="T:System.Collections.Generic.IEnumerator`1"/> that can be used to iterate through the collection.
45 : /// </returns>
46 1 : public override IEnumerator<TData> GetEnumerator()
47 : {
48 : return OpenStreamsForReading()
49 : .Select(Deserialise)
50 : .GetEnumerator();
51 : }
52 :
53 : #endregion
54 :
55 : #region Implementation of IQueryable
56 :
57 : /// <summary>
58 : /// Gets the expression tree that is associated with the instance of <see cref="T:System.Linq.IQueryable"/>.
59 : /// </summary>
60 : /// <returns>
61 : /// The <see cref="T:System.Linq.Expressions.Expression"/> that is associated with this instance of <see cref="T:System.Linq.IQueryable"/>.
62 : /// </returns>
63 : public override Expression Expression
64 : {
65 : get
66 : {
67 : return OpenStreamsForReading()
68 : .Select(Deserialise)
69 : .AsQueryable()
70 : .Expression;
71 : }
72 : }
73 :
74 : /// <summary>
75 : /// Gets the type of the element(s) that are returned when the expression tree associated with this instance of <see cref="T:System.Linq.IQueryable"/> is executed.
76 : /// </summary>
77 : /// <returns>
78 : /// A <see cref="T:System.Type"/> that represents the type of the element(s) that are returned when the expression tree associated with this object is executed.
79 : /// </returns>
80 : public override Type ElementType
81 : {
82 : get
83 : {
84 : return OpenStreamsForReading()
85 : .Select(Deserialise)
86 : .AsQueryable()
87 : .ElementType;
88 : }
89 : }
90 :
91 : /// <summary>
92 : /// Gets the query provider that is associated with this data source.
93 : /// </summary>
94 : /// <returns>
95 : /// The <see cref="T:System.Linq.IQueryProvider"/> that is associated with this data source.
96 : /// </returns>
97 : public override IQueryProvider Provider
98 : {
99 : get { return OpenStreamsForReading()
100 : .Select(Deserialise)
101 : .AsQueryable()
102 : .Provider;
103 : }
104 : }
105 :
106 : #endregion
107 :
108 : /// <summary>
109 : /// Save the provided <paramref name="data"/> asynchronously.
110 : /// </summary>
111 1 : protected virtual void AsyncSaveData<TResult>(TData data, Func<TData, CloudBlockBlob, TResult> function, Func<TData, string> customFilenameFunction = null)
112 : {
113 : IList<Task> persistTasks = new List<Task>();
114 : foreach (Tuple<CloudStorageAccount, CloudBlobContainer> tuple in WritableCollection)
115 : {
116 : TData taskData = data;
117 : CloudBlobContainer container = tuple.Item2;
118 : Task task = Task.Factory.StartNewSafely
119 : (
120 : () =>
121 : {
122 : string fileName = string.Format("{0}.json", (customFilenameFunction ?? GenerateFileName)(taskData));
123 : CloudBlockBlob cloudBlockBlob = GetBlobReference(container, fileName);
124 : if (typeof(TResult) == typeof(Uri))
125 : {
126 : Uri uri = AzureStorageRetryPolicy.ExecuteAction(() => (Uri)(object)function(taskData, cloudBlockBlob));
127 :
128 : Logger.LogDebug(string.Format("The data entity '{0}' was persisted at uri '{1}'", fileName, uri));
129 : }
130 : else
131 : AzureStorageRetryPolicy.ExecuteAction(() => function(taskData, cloudBlockBlob));
132 : }
133 : );
134 : persistTasks.Add(task);
135 : }
136 :
137 : bool anyFailed = Task.Factory.ContinueWhenAll(persistTasks.ToArray(), tasks =>
138 : {
139 : return tasks.Any(task => task.IsFaulted);
140 : }).Result;
141 : if (anyFailed)
142 : throw new AggregateException("Persisting data to blob storage failed. Check the logs for more details.");
143 : }
144 :
145 : #region Implementation of IDataStore<TData>
146 :
147 : /// <summary>
148 : /// Add the provided <paramref name="data"/> to the data store and persist the change.
149 : /// </summary>
150 1 : public override void Add(TData data)
151 : {
152 : AsyncSaveData
153 : (
154 : data,
155 : (taskData, cloudBlockBlob) =>
156 : {
157 : try
158 : {
159 : cloudBlockBlob.UploadFromStream(Serialise(taskData));
160 : cloudBlockBlob.Properties.ContentType = "application/json";
161 : cloudBlockBlob.SetProperties();
162 : return cloudBlockBlob.Uri;
163 : }
164 : catch (Exception exception)
165 : {
166 : Logger.LogError("There was an issue persisting data to blob storage.", exception: exception);
167 : throw;
168 : }
169 : }
170 : );
171 : }
172 :
173 : /// <summary>
174 : /// Remove the provided <paramref name="data"/> (normally by <see cref="IEntity.Rsn"/>) from the data store and persist the change.
175 : /// </summary>
176 1 : public override void Destroy(TData data)
177 : {
178 : AsyncSaveData
179 : (
180 : data,
181 : (taskData, cloudBlockBlob) =>
182 : {
183 : try
184 : {
185 : return cloudBlockBlob.DeleteIfExists(DeleteSnapshotsOption.IncludeSnapshots);
186 : }
187 : catch (Exception exception)
188 : {
189 : Logger.LogError("There was an issue deleting data from blob storage.", exception: exception);
190 : throw;
191 : }
192 : }
193 : );
194 : }
195 :
196 : /// <summary>
197 : /// Remove all contents (normally by use of a truncate operation) from the data store and persist the change.
198 : /// </summary>
199 1 : public override void RemoveAll()
200 : {
201 : foreach (Tuple<CloudStorageAccount, CloudBlobContainer> tuple in WritableCollection)
202 : tuple.Item2.DeleteIfExists();
203 : }
204 :
205 : /// <summary>
206 : /// Update the provided <paramref name="data"/> in the data store and persist the change.
207 : /// </summary>
208 1 : public override void Update(TData data)
209 : {
210 : Add(data);
211 : }
212 :
213 : #endregion
214 :
215 : /// <summary>
216 : /// Creates a <see cref="CloudBlobContainer"/> with the specified name <paramref name="containerName"/> if it doesn't already exist.
217 : /// </summary>
218 : /// <param name="storageAccount">The storage account to create the <see cref="CloudBlobContainer"/> is</param>
219 : /// <param name="containerName">The name of the <see cref="CloudBlobContainer"/>.</param>
220 : /// <param name="isPublic">Whether or not this <see cref="CloudBlobContainer"/> is publicly accessible.</param>
221 1 : protected override CloudBlobContainer CreateSource(CloudStorageAccount storageAccount, string containerName, bool isPublic = true)
222 : {
223 : CloudBlobContainer container = null;
224 : AzureStorageRetryPolicy.ExecuteAction
225 : (
226 : () =>
227 : {
228 : CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
229 : container = blobClient.GetContainerReference(GetSafeSourceName(containerName));
230 : container.CreateIfNotExists();
231 : if (isPublic)
232 : {
233 : container.SetPermissions(new BlobContainerPermissions
234 : {
235 : PublicAccess = BlobContainerPublicAccessType.Blob
236 : });
237 : }
238 : }
239 : );
240 :
241 : return container;
242 : }
243 :
244 : /// <summary>
245 : /// Opens stream for reading from a block blob.
246 : /// </summary>
247 1 : protected virtual IEnumerable<Stream> OpenStreamsForReading(Func<CloudBlockBlob, bool> predicate = null, string blobPrefix = null, string folderName = null)
248 : {
249 : IEnumerable<IListBlobItem> blobs;
250 : if (!string.IsNullOrWhiteSpace(folderName))
251 : {
252 : CloudBlobDirectory container = ReadableSource.GetDirectoryReference(folderName);
253 : blobs = container.ListBlobs(true);
254 : }
255 : else
256 : {
257 : blobs = ReadableSource.ListBlobs(blobPrefix, true);
258 : }
259 : IEnumerable<CloudBlockBlob> query = blobs
260 : .Where(x => x is CloudBlockBlob)
261 : .Cast<CloudBlockBlob>();
262 : if (predicate != null)
263 : query = query.Where(predicate);
264 : return query.Select(x => x.OpenRead());
265 : }
266 :
267 : /// <summary>
268 : /// Gets a reference to a block blob in the container.
269 : /// </summary>
270 : /// <param name="container">The container to get the reference from</param>
271 : /// <param name="blobName">The name of the blob.</param>
272 : /// <returns>A reference to a block blob.</returns>
273 1 : protected virtual CloudBlockBlob GetBlobReference(CloudBlobContainer container, string blobName)
274 : {
275 : return container.GetBlockBlobReference(blobName);
276 : }
277 :
278 : /// <summary>
279 : /// Get <typeparamref name="TData"/> by its name.
280 : /// </summary>
281 1 : public virtual TData GetByName(string name)
282 : {
283 : return OpenStreamsForReading(blobPrefix: name.Replace("\\", "/"))
284 : .Select(Deserialise)
285 : .SingleOrDefault();
286 : /*
287 : return OpenStreamsForReading(x => x.Name == name)
288 : .Select(Deserialise)
289 : .SingleOrDefault();
290 : */
291 : }
292 :
293 : /// <summary>
294 : /// Get all <typeparamref name="TData"/> items in the folder.
295 : /// </summary>
296 1 : public virtual IEnumerable<TData> GetByFolder(string folderName)
297 : {
298 : string folder = new Uri(string.Format(folderName.StartsWith("..\\") ? "http://l/2/{0}" : "http://l/{0}", folderName)).AbsolutePath.Substring(1);
299 : return OpenStreamsForReading(folderName: folder)
300 : .Select(Deserialise);
301 : /*
302 : return OpenStreamsForReading(x => x.Parent.StorageUri.PrimaryUri.AbsolutePath.StartsWith(new Uri(string.Format(folderName.StartsWith("..\\") ? "http://l/{0}/2/{1}" : "http://l/{0}/{1}", GetContainerName(), folderName)).AbsolutePath, StringComparison.InvariantCultureIgnoreCase))
303 : .Select(Deserialise);
304 : */
305 : }
306 : }
307 : }
|