Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Linq.Expressions;
13 : using System.Threading.Tasks;
14 : using cdmdotnet.Logging;
15 : using Cqrs.DataStores;
16 : using Cqrs.Entities;
17 : using Microsoft.WindowsAzure.Storage;
18 : using Microsoft.WindowsAzure.Storage.Table;
19 :
20 : namespace Cqrs.Azure.BlobStorage
21 : {
22 : /// <summary>
23 : /// A <see cref="IDataStore{TData}"/> that uses Azure Storage for storage.
24 : /// </summary>
25 : /// <typeparam name="TData">The <see cref="Type"/> of <see cref="TableEntity"/> Azure Table Storage will contain.</typeparam>
26 : /// <typeparam name="TCollectionItemData">The <see cref="Type"/> of <see cref="IEntity"/> the <see cref="IDataStore{TData}"/> will contain.</typeparam>
27 : public abstract class TableStorageStore<TData, TCollectionItemData>
28 : : StorageStore<TData, CloudTable>
29 : , IDataStore<TCollectionItemData>
30 : where TData : TableEntity, new()
31 1 : {
32 : /// <summary>
33 : /// Gets or set the <see cref="TableQuery"/>.
34 : /// </summary>
35 : public TableQuery<TData> Collection { get; private set; }
36 :
37 : /// <summary>
38 : /// Initializes a new instance of the <see cref="TableStorageStore{TData,TCollectionItemData}"/> class using the specified container.
39 : /// </summary>
40 1 : protected TableStorageStore(ILogger logger)
41 : : base(logger)
42 : {
43 : }
44 :
45 : #region Overrides of StorageStore<TData,CloudTable>
46 :
47 : /// <summary>
48 : /// Initialises the <see cref="StorageStore{TData,TSource}"/>.
49 : /// </summary>
50 1 : protected override void Initialise(IStorageStoreConnectionStringFactory tableStorageDataStoreConnectionStringFactory)
51 : {
52 : base.Initialise(tableStorageDataStoreConnectionStringFactory);
53 : Collection = new TableQuery<TData>();
54 : }
55 :
56 : #endregion
57 :
58 : #region Implementation of IEnumerable
59 :
60 : /// <summary>
61 : /// Returns an enumerator that iterates through the collection.
62 : /// </summary>
63 : /// <returns>
64 : /// A <see cref="T:System.Collections.Generic.IEnumerator`1"/> that can be used to iterate through the collection.
65 : /// </returns>
66 : IEnumerator<TCollectionItemData> IEnumerable<TCollectionItemData>.GetEnumerator()
67 : {
68 : throw new NotImplementedException("Use IEnumerable<TData>.GetEnumerator() directly");
69 : }
70 :
71 : /// <summary>
72 : /// Returns an enumerator that iterates through the collection.
73 : /// </summary>
74 : /// <returns>
75 : /// A <see cref="T:System.Collections.Generic.IEnumerator`1"/> that can be used to iterate through the collection.
76 : /// </returns>
77 1 : public override IEnumerator<TData> GetEnumerator()
78 : {
79 : return Collection.GetEnumerator();
80 : }
81 :
82 : #endregion
83 :
84 : #region Implementation of IQueryable
85 :
86 : /// <summary>
87 : /// Gets the expression tree that is associated with the instance of <see cref="T:System.Linq.IQueryable"/>.
88 : /// </summary>
89 : /// <returns>
90 : /// The <see cref="T:System.Linq.Expressions.Expression"/> that is associated with this instance of <see cref="T:System.Linq.IQueryable"/>.
91 : /// </returns>
92 : public override Expression Expression
93 : {
94 : get
95 : {
96 : return Collection.Expression;
97 : }
98 : }
99 :
100 : /// <summary>
101 : /// Gets the type of the element(s) that are returned when the expression tree associated with this instance of <see cref="T:System.Linq.IQueryable"/> is executed.
102 : /// </summary>
103 : /// <returns>
104 : /// A <see cref="T:System.Type"/> that represents the type of the element(s) that are returned when the expression tree associated with this object is executed.
105 : /// </returns>
106 : public override Type ElementType
107 : {
108 : get
109 : {
110 : return Collection.ElementType;
111 : }
112 : }
113 :
114 : /// <summary>
115 : /// Gets the query provider that is associated with this data source.
116 : /// </summary>
117 : /// <returns>
118 : /// The <see cref="T:System.Linq.IQueryProvider"/> that is associated with this data source.
119 : /// </returns>
120 : public override IQueryProvider Provider
121 : {
122 : get
123 : {
124 : return Collection.Provider;
125 : }
126 : }
127 :
128 : #endregion
129 :
130 : /// <summary>
131 : /// Save the provided <paramref name="data"/> asynchronously.
132 : /// </summary>
133 1 : protected virtual void AsyncSaveData<TSaveData, TResult>(TSaveData data, Func<TSaveData, CloudTable, TResult> function, Func<TSaveData, string> customFilenameFunction = null)
134 : {
135 : IList<Task> persistTasks = new List<Task>();
136 : foreach (Tuple<CloudStorageAccount, CloudTable> tuple in WritableCollection)
137 : {
138 : TSaveData taskData = data;
139 : CloudTable table = tuple.Item2;
140 : Task task = Task.Factory.StartNewSafely
141 : (
142 : () =>
143 : {
144 : AzureStorageRetryPolicy.ExecuteAction(() => function(taskData, table));
145 : }
146 : );
147 : persistTasks.Add(task);
148 : }
149 :
150 : bool anyFailed = Task.Factory.ContinueWhenAll(persistTasks.ToArray(), tasks =>
151 : {
152 : return tasks.Any(task => task.IsFaulted);
153 : }).Result;
154 : if (anyFailed)
155 : throw new AggregateException("Persisting data to table storage failed. Check the logs for more details.");
156 : }
157 :
158 : /// <summary>
159 : /// Creates a new instance of <see cref="ITableEntity"/> populating it with the provided <paramref name="data"/>.
160 : /// </summary>
161 : /// <param name="data">The data to store.</param>
162 1 : protected abstract ITableEntity CreateTableEntity(TCollectionItemData data);
163 :
164 : #region Implementation of IDataStore<TData>
165 :
166 : /// <summary>
167 : /// Add the provided <paramref name="data"/> to the data store and persist the change.
168 : /// </summary>
169 1 : public override void Add(TData data)
170 : {
171 : Add(data);
172 : }
173 :
174 : /// <summary>
175 : /// Add the provided <paramref name="data"/> to the data store and persist the change.
176 : /// </summary>
177 1 : public virtual void Add(ITableEntity data)
178 : {
179 : AsyncSaveData
180 : (
181 : data,
182 : (taskData, table) =>
183 : {
184 : try
185 : {
186 : TableOperation insertOperation = TableOperation.Insert(taskData);
187 :
188 : // Execute the insert operation.
189 : return table.Execute(insertOperation);
190 : }
191 : catch (StorageException exception)
192 : {
193 : var extendedErrorInformation = exception.RequestInformation.ExtendedErrorInformation;
194 : if (extendedErrorInformation != null)
195 : Logger.LogError("There was an issue persisting data to table storage.", exception: exception);
196 : else
197 : Logger.LogError(string.Format("There was an issue persisting data to table storage. Specifically {0} :: {1}", exception.RequestInformation.ExtendedErrorInformation.ErrorCode, exception.RequestInformation.ExtendedErrorInformation.ErrorMessage), exception: exception);
198 : throw;
199 : }
200 : catch (Exception exception)
201 : {
202 : Logger.LogError("There was an issue persisting data to table storage.", exception: exception);
203 : throw;
204 : }
205 : }
206 : );
207 : }
208 :
209 : /// <summary>
210 : /// Add the provided <paramref name="data"/> to the data store and persist the change.
211 : /// </summary>
212 1 : public override void Add(IEnumerable<TData> data)
213 : {
214 : Add(data);
215 : }
216 :
217 : /// <summary>
218 : /// Add the provided <paramref name="data"/> to the data store and persist the change.
219 : /// </summary>
220 1 : public virtual void Add(IEnumerable<ITableEntity> data)
221 : {
222 : AsyncSaveData
223 : (
224 : data,
225 : (taskData, table) =>
226 : {
227 : try
228 : {
229 : // Create the batch operation.
230 : TableBatchOperation batchOperation = new TableBatchOperation();
231 :
232 : foreach (TData item in taskData)
233 : batchOperation.Insert(item);
234 :
235 : // Execute the insert operation.
236 : return table.ExecuteBatch(batchOperation);
237 : }
238 : catch (Exception exception)
239 : {
240 : Logger.LogError("There was an issue persisting data to table storage.", exception: exception);
241 : throw;
242 : }
243 : }
244 : );
245 : }
246 :
247 : /// <summary>
248 : /// Remove the provided <paramref name="data"/> (normally by <see cref="IEntity.Rsn"/>) from the data store and persist the change.
249 : /// </summary>
250 1 : public override void Destroy(TData data)
251 : {
252 : AsyncSaveData
253 : (
254 : data,
255 : (taskData, table) =>
256 : {
257 : try
258 : {
259 : // Create a retrieve operation that takes a customer entity.
260 : TableOperation retrieveOperation = GetUpdatableTableEntity(taskData);
261 :
262 : // Execute the operation.
263 : TableResult retrievedResult = table.Execute(retrieveOperation);
264 : ITableEntity tableEntity = (ITableEntity)retrievedResult.Result;
265 :
266 : TableOperation deleteOperation = TableOperation.Delete(tableEntity);
267 :
268 : // Execute the delete operation.
269 : return table.Execute(deleteOperation);
270 : }
271 : catch (Exception exception)
272 : {
273 : Logger.LogError("There was an issue deleting data from table storage.", exception: exception);
274 : throw;
275 : }
276 : }
277 : );
278 : }
279 :
280 : /*
281 : public virtual void Destroy(ITableEntity data)
282 : {
283 : AsyncSaveData
284 : (
285 : data,
286 : (taskData, table) =>
287 : {
288 : try
289 : {
290 : // Create a retrieve operation that takes a customer entity.
291 : TableOperation retrieveOperation = GetUpdatableTableEntity(taskData);
292 :
293 : // Execute the operation.
294 : TableResult retrievedResult = table.Execute(retrieveOperation);
295 : ITableEntity tableEntity = (ITableEntity)retrievedResult.Result;
296 :
297 : TableOperation deleteOperation = TableOperation.Delete(tableEntity);
298 :
299 : // Execute the delete operation.
300 : return table.Execute(deleteOperation);
301 : }
302 : catch (Exception exception)
303 : {
304 : Logger.LogError("There was an issue deleting data from table storage.", exception: exception);
305 : throw;
306 : }
307 : }
308 : );
309 : }
310 : */
311 :
312 : /// <summary>
313 : /// Add the provided <paramref name="data"/> to the data store and persist the change.
314 : /// </summary>
315 1 : public virtual void Add(TCollectionItemData data)
316 : {
317 : // Create the TableOperation object that inserts the customer entity.
318 : Add(CreateTableEntity(data));
319 : }
320 :
321 : /// <summary>
322 : /// Add the provided <paramref name="data"/> to the data store and persist the change.
323 : /// </summary>
324 1 : public virtual void Add(IEnumerable<TCollectionItemData> data)
325 : {
326 : // Create the TableOperation object that inserts the customer entity.
327 : Add(data.Select(tdata => (TData)CreateTableEntity(tdata)));
328 : }
329 :
330 : /// <summary>
331 : /// Will mark the <paramref name="data"/> as logically (or soft).
332 : /// </summary>
333 1 : public abstract void Remove(TCollectionItemData data);
334 :
335 : /// <summary>
336 : /// Remove the provided <paramref name="data"/> (normally by <see cref="IEntity.Rsn"/>) from the data store and persist the change.
337 : /// </summary>
338 1 : public virtual void Destroy(TCollectionItemData data)
339 : {
340 : Destroy((TData)CreateTableEntity(data));
341 : }
342 :
343 : /// <summary>
344 : /// Remove all contents (normally by use of a truncate operation) from the data store and persist the change.
345 : /// </summary>
346 1 : public override void RemoveAll()
347 : {
348 : foreach (Tuple<CloudStorageAccount, CloudTable> tuple in WritableCollection)
349 : tuple.Item2.DeleteIfExists();
350 : }
351 :
352 : /// <summary>
353 : /// Update the provided <paramref name="data"/> in the data store and persist the change.
354 : /// </summary>
355 1 : public virtual void Update(TCollectionItemData data)
356 : {
357 : Update((TData)CreateTableEntity(data));
358 : }
359 :
360 : /// <summary>
361 : /// Update the provided <paramref name="data"/> in the data store and persist the change.
362 : /// </summary>
363 1 : public override void Update(TData data)
364 : {
365 : AsyncSaveData
366 : (
367 : data,
368 : (taskData, table) =>
369 : {
370 : try
371 : {
372 : // Create a retrieve operation that takes a customer entity.
373 : TableOperation retrieveOperation = GetUpdatableTableEntity(taskData);
374 :
375 : // Execute the operation.
376 : TableResult retrievedResult = table.Execute(retrieveOperation);
377 : ITableEntity tableEntity = ReplaceValues(retrievedResult, data);
378 :
379 : TableOperation updateOperation = TableOperation.Replace(tableEntity);
380 :
381 : // Execute the update operation.
382 : return table.Execute(updateOperation);
383 : }
384 : catch (Exception exception)
385 : {
386 : Logger.LogError("There was an issue updating data in table storage.", exception: exception);
387 : throw;
388 : }
389 : }
390 : );
391 : }
392 :
393 : #endregion
394 :
395 : /// <summary>
396 : /// Gets a <see cref="TableOperation"/> for updating.
397 : /// </summary>
398 1 : protected abstract TableOperation GetUpdatableTableEntity(TCollectionItemData data);
399 :
400 : /// <summary>
401 : /// Gets a <see cref="TableOperation"/> for updating.
402 : /// </summary>
403 1 : protected abstract TableOperation GetUpdatableTableEntity(TData data);
404 :
405 : /// <summary>
406 : /// Creates a <see cref="CloudTable"/> with the specified name <paramref name="sourceName"/> if it doesn't already exist.
407 : /// </summary>
408 : /// <param name="storageAccount">The storage account to create the <see cref="CloudTable"/> is</param>
409 : /// <param name="sourceName">The name of the <see cref="CloudTable"/>.</param>
410 : /// <param name="isPublic">Whether or not this <see cref="CloudTable"/> is publicly accessible.</param>
411 1 : protected override CloudTable CreateSource(CloudStorageAccount storageAccount, string sourceName, bool isPublic = true)
412 : {
413 : // Create the table client.
414 : CloudTableClient tableClient = storageAccount.CreateCloudTableClient();
415 :
416 : // Retrieve a reference to the table.
417 : CloudTable table = tableClient.GetTableReference(GetSafeSourceName(sourceName));
418 :
419 : // Create the table if it doesn't exist.
420 : try
421 : {
422 : table.CreateIfNotExists();
423 : }
424 : catch (StorageException exception)
425 : {
426 : var extendedErrorInformation = exception.RequestInformation.ExtendedErrorInformation;
427 : if (extendedErrorInformation != null)
428 : Logger.LogError("There was an issue creating the table.", exception: exception);
429 : else
430 : Logger.LogError(string.Format("There was an issue creating the table. Specifically {0} :: {1}", exception.RequestInformation.ExtendedErrorInformation.ErrorCode, exception.RequestInformation.ExtendedErrorInformation.ErrorMessage), exception: exception);
431 : throw;
432 : }
433 : catch (Exception exception)
434 : {
435 : Logger.LogError("There was an issue creating the table.", exception: exception);
436 : throw;
437 : }
438 :
439 : return table;
440 : }
441 :
442 : /// <summary>
443 : /// Retrieves the data from Azure Storage using <see cref="Collection"/>.
444 : /// </summary>
445 1 : public virtual TData GetByKeyAndRow(Guid rsn)
446 : {
447 : // Create the table query.
448 : var rangeQuery = Collection.Where
449 : (
450 : TableQuery.CombineFilters
451 : (
452 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(typeof(TCollectionItemData).FullName)),
453 : TableOperators.And,
454 : TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(rsn.ToString("N")))
455 : )
456 : );
457 :
458 : return ReadableSource.ExecuteQuery(rangeQuery).Single();
459 : }
460 :
461 : /// <summary>
462 : /// Retrieves the data from Azure Storage using <see cref="Collection"/>.
463 : /// </summary>
464 1 : public virtual IEnumerable<TData> GetByKey()
465 : {
466 : // Create the table query.
467 : var rangeQuery = Collection.Where
468 : (
469 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(typeof(TCollectionItemData).FullName))
470 : );
471 :
472 : return ReadableSource.ExecuteQuery(rangeQuery);
473 : }
474 :
475 : /// <summary>
476 : /// Extracts <see cref="TableResult.Result"/> of the provided <paramref name="retrievedResult"/>
477 : /// If <see cref="TableResult.Result"/> is a <see cref="IEventDataTableEntity{TData}"/>
478 : /// then <see cref="IEventDataTableEntity{TEventData}.EventData"/> is replaced with <paramref name="data"/>.
479 : /// Otherwise <see cref="TableResult.Result"/> is a <see cref="IEntityTableEntity{TCollectionItemData}"/>
480 : /// and <see cref="IEntityTableEntity{TCollectionItemData}.Entity"/> is replaced with <paramref name="data"/>.
481 : /// </summary>
482 : /// <param name="retrievedResult">The existing data to update.</param>
483 : /// <param name="data">The new data to store.</param>
484 1 : protected virtual ITableEntity ReplaceValues(TableResult retrievedResult, TData data)
485 : {
486 : ITableEntity tableEntity = (ITableEntity)retrievedResult.Result;
487 : var eventTableEntity = tableEntity as IEventDataTableEntity<TData>;
488 : if (eventTableEntity != null)
489 : eventTableEntity.EventData = data;
490 : else
491 : ((IEntityTableEntity<TCollectionItemData>)tableEntity).Entity = ((IEntityTableEntity<TCollectionItemData>)data).Entity;
492 :
493 : return tableEntity;
494 : }
495 :
496 : #region Overrides of StorageStore<TData,CloudTable>
497 :
498 : /// <summary>
499 : /// Gets the provided <paramref name="sourceName"/> in a safe to use format.
500 : /// </summary>
501 : /// <param name="sourceName">The name to make safe.</param>
502 1 : protected override string GetSafeSourceName(string sourceName)
503 : {
504 : return GetSafeSourceName(sourceName, false);
505 : }
506 :
507 : #endregion
508 : }
509 : }
|