Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.IO;
12 : using System.Linq;
13 : using System.Linq.Expressions;
14 : using System.Threading.Tasks;
15 : using cdmdotnet.Logging;
16 : using Microsoft.WindowsAzure.Storage;
17 : using Microsoft.WindowsAzure.Storage.Blob;
18 :
19 : namespace Cqrs.Azure.BlobStorage
20 : {
21 : public class BlobStorageStore<TData>
22 : : StorageStore<TData, CloudBlobContainer>
23 0 : {
24 : internal Func<TData, string> GenerateFileName { get; set; }
25 :
26 : /// <summary>
27 : /// Initializes a new instance of the <see cref="BlobStorageStore{TData}"/> class using the specified container.
28 : /// </summary>
29 1 : public BlobStorageStore(ILogger logger)
30 : : base(logger)
31 : {
32 : }
33 :
34 : #region Implementation of IEnumerable
35 :
36 : /// <summary>
37 : /// Returns an enumerator that iterates through the collection.
38 : /// </summary>
39 : /// <returns>
40 : /// A <see cref="T:System.Collections.Generic.IEnumerator`1"/> that can be used to iterate through the collection.
41 : /// </returns>
42 1 : public override IEnumerator<TData> GetEnumerator()
43 : {
44 : return OpenStreamsForReading()
45 : .Select(Deserialise)
46 : .GetEnumerator();
47 : }
48 :
49 : #endregion
50 :
51 : #region Implementation of IQueryable
52 :
53 : /// <summary>
54 : /// Gets the expression tree that is associated with the instance of <see cref="T:System.Linq.IQueryable"/>.
55 : /// </summary>
56 : /// <returns>
57 : /// The <see cref="T:System.Linq.Expressions.Expression"/> that is associated with this instance of <see cref="T:System.Linq.IQueryable"/>.
58 : /// </returns>
59 : public override Expression Expression
60 : {
61 : get
62 : {
63 : return OpenStreamsForReading()
64 : .Select(Deserialise)
65 : .AsQueryable()
66 : .Expression;
67 : }
68 : }
69 :
70 : /// <summary>
71 : /// Gets the type of the element(s) that are returned when the expression tree associated with this instance of <see cref="T:System.Linq.IQueryable"/> is executed.
72 : /// </summary>
73 : /// <returns>
74 : /// A <see cref="T:System.Type"/> that represents the type of the element(s) that are returned when the expression tree associated with this object is executed.
75 : /// </returns>
76 : public override Type ElementType
77 : {
78 : get
79 : {
80 : return OpenStreamsForReading()
81 : .Select(Deserialise)
82 : .AsQueryable()
83 : .ElementType;
84 : }
85 : }
86 :
87 : /// <summary>
88 : /// Gets the query provider that is associated with this data source.
89 : /// </summary>
90 : /// <returns>
91 : /// The <see cref="T:System.Linq.IQueryProvider"/> that is associated with this data source.
92 : /// </returns>
93 : public override IQueryProvider Provider
94 : {
95 : get { return OpenStreamsForReading()
96 : .Select(Deserialise)
97 : .AsQueryable()
98 : .Provider;
99 : }
100 : }
101 :
102 : #endregion
103 :
104 0 : protected virtual void AsyncSaveData<TResult>(TData data, Func<TData, CloudBlockBlob, TResult> function, Func<TData, string> customFilenameFunction = null)
105 : {
106 : IList<Task> persistTasks = new List<Task>();
107 : foreach (Tuple<CloudStorageAccount, CloudBlobContainer> tuple in WritableCollection)
108 : {
109 : TData taskData = data;
110 : CloudBlobContainer container = tuple.Item2;
111 : Task task = Task.Factory.StartNewSafely
112 : (
113 : () =>
114 : {
115 : string fileName = string.Format("{0}.json", (customFilenameFunction ?? GenerateFileName)(taskData));
116 : CloudBlockBlob cloudBlockBlob = GetBlobReference(container, fileName);
117 : if (typeof(TResult) == typeof(Uri))
118 : {
119 : Uri uri = AzureStorageRetryPolicy.ExecuteAction(() => (Uri)(object)function(taskData, cloudBlockBlob));
120 :
121 : Logger.LogDebug(string.Format("The data entity '{0}' was persisted at uri '{1}'", fileName, uri));
122 : }
123 : else
124 : AzureStorageRetryPolicy.ExecuteAction(() => function(taskData, cloudBlockBlob));
125 : }
126 : );
127 : persistTasks.Add(task);
128 : }
129 :
130 : bool anyFailed = Task.Factory.ContinueWhenAll(persistTasks.ToArray(), tasks =>
131 : {
132 : return tasks.Any(task => task.IsFaulted);
133 : }).Result;
134 : if (anyFailed)
135 : throw new AggregateException("Persisting data to blob storage failed. Check the logs for more details.");
136 : }
137 :
138 : #region Implementation of IDataStore<TData>
139 :
140 0 : public override void Add(TData data)
141 : {
142 : AsyncSaveData
143 : (
144 : data,
145 : (taskData, cloudBlockBlob) =>
146 : {
147 : try
148 : {
149 : cloudBlockBlob.UploadFromStream(Serialise(taskData));
150 : cloudBlockBlob.Properties.ContentType = "application/json";
151 : cloudBlockBlob.SetProperties();
152 : return cloudBlockBlob.Uri;
153 : }
154 : catch (Exception exception)
155 : {
156 : Logger.LogError("There was an issue persisting data to blob storage.", exception: exception);
157 : throw;
158 : }
159 : }
160 : );
161 : }
162 :
163 0 : public override void Destroy(TData data)
164 : {
165 : AsyncSaveData
166 : (
167 : data,
168 : (taskData, cloudBlockBlob) =>
169 : {
170 : try
171 : {
172 : return cloudBlockBlob.DeleteIfExists(DeleteSnapshotsOption.IncludeSnapshots);
173 : }
174 : catch (Exception exception)
175 : {
176 : Logger.LogError("There was an issue deleting data from blob storage.", exception: exception);
177 : throw;
178 : }
179 : }
180 : );
181 : }
182 :
183 0 : public override void RemoveAll()
184 : {
185 : foreach (Tuple<CloudStorageAccount, CloudBlobContainer> tuple in WritableCollection)
186 : tuple.Item2.DeleteIfExists();
187 : }
188 :
189 0 : public override void Update(TData data)
190 : {
191 : Add(data);
192 : }
193 :
194 : #endregion
195 :
196 : /// <summary>
197 : /// Creates a <see cref="CloudBlobContainer"/> with the specified name <paramref name="containerName"/> if it doesn't already exist.
198 : /// </summary>
199 : /// <param name="storageAccount">The storage account to create the <see cref="CloudBlobContainer"/> is</param>
200 : /// <param name="containerName">The name of the <see cref="CloudBlobContainer"/>.</param>
201 : /// <param name="isPublic">Whether or not this <see cref="CloudBlobContainer"/> is publicly accessible.</param>
202 1 : protected override CloudBlobContainer CreateSource(CloudStorageAccount storageAccount, string containerName, bool isPublic = true)
203 : {
204 : CloudBlobContainer container = null;
205 : AzureStorageRetryPolicy.ExecuteAction
206 : (
207 : () =>
208 : {
209 : CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
210 : container = blobClient.GetContainerReference(GetSafeSourceName(containerName));
211 : container.CreateIfNotExists();
212 : if (isPublic)
213 : {
214 : container.SetPermissions(new BlobContainerPermissions
215 : {
216 : PublicAccess = BlobContainerPublicAccessType.Blob
217 : });
218 : }
219 : }
220 : );
221 :
222 : return container;
223 : }
224 :
225 : /// <summary>
226 : /// Opens stream for reading from a block blob.
227 : /// </summary>
228 1 : protected virtual IEnumerable<Stream> OpenStreamsForReading(Func<CloudBlockBlob, bool> predicate = null, string blobPrefix = null, string folderName = null)
229 : {
230 : IEnumerable<IListBlobItem> blobs;
231 : if (!string.IsNullOrWhiteSpace(folderName))
232 : {
233 : CloudBlobDirectory container = ReadableSource.GetDirectoryReference(folderName);
234 : blobs = container.ListBlobs(true);
235 : }
236 : else
237 : {
238 : blobs = ReadableSource.ListBlobs(blobPrefix, true);
239 : }
240 : IEnumerable<CloudBlockBlob> query = blobs
241 : .Where(x => x is CloudBlockBlob)
242 : .Cast<CloudBlockBlob>();
243 : if (predicate != null)
244 : query = query.Where(predicate);
245 : return query.Select(x => x.OpenRead());
246 : }
247 :
248 : /// <summary>
249 : /// Gets a reference to a block blob in the container.
250 : /// </summary>
251 : /// <param name="container">The container to get the reference from</param>
252 : /// <param name="blobName">The name of the blob.</param>
253 : /// <returns>A reference to a block blob.</returns>
254 1 : protected virtual CloudBlockBlob GetBlobReference(CloudBlobContainer container, string blobName)
255 : {
256 : return container.GetBlockBlobReference(blobName);
257 : }
258 :
259 0 : public virtual TData GetByName(string name)
260 : {
261 : return OpenStreamsForReading(blobPrefix: name.Replace("\\", "/"))
262 : .Select(Deserialise)
263 : .SingleOrDefault();
264 : /*
265 : return OpenStreamsForReading(x => x.Name == name)
266 : .Select(Deserialise)
267 : .SingleOrDefault();
268 : */
269 : }
270 :
271 0 : public virtual IEnumerable<TData> GetByFolder(string folderName)
272 : {
273 : string folder = new Uri(string.Format(folderName.StartsWith("..\\") ? "http://l/2/{0}" : "http://l/{0}", folderName)).AbsolutePath.Substring(1);
274 : return OpenStreamsForReading(folderName: folder)
275 : .Select(Deserialise);
276 : /*
277 : return OpenStreamsForReading(x => x.Parent.StorageUri.PrimaryUri.AbsolutePath.StartsWith(new Uri(string.Format(folderName.StartsWith("..\\") ? "http://l/{0}/2/{1}" : "http://l/{0}/{1}", GetContainerName(), folderName)).AbsolutePath, StringComparison.InvariantCultureIgnoreCase))
278 : .Select(Deserialise);
279 : */
280 : }
281 : }
282 : }
|