Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Linq.Expressions;
13 : using System.Threading.Tasks;
14 : using cdmdotnet.Logging;
15 : using Cqrs.DataStores;
16 : using Microsoft.WindowsAzure.Storage;
17 : using Microsoft.WindowsAzure.Storage.Table;
18 :
19 : namespace Cqrs.Azure.BlobStorage
20 : {
21 : public abstract class TableStorageStore<TData, TCollectionItemData>
22 : : StorageStore<TData, CloudTable>
23 : , IDataStore<TCollectionItemData>
24 : where TData : TableEntity, new()
25 0 : {
26 : public TableQuery<TData> Collection { get; private set; }
27 :
28 : /// <summary>
29 : /// Initializes a new instance of the <see cref="TableStorageStore{TData,TCollectionItemData}"/> class using the specified container.
30 : /// </summary>
31 1 : protected TableStorageStore(ILogger logger)
32 : : base(logger)
33 : {
34 : }
35 :
36 : #region Overrides of StorageStore<TData,CloudTable>
37 :
38 0 : protected override void Initialise(IStorageStoreConnectionStringFactory tableStorageDataStoreConnectionStringFactory)
39 : {
40 : base.Initialise(tableStorageDataStoreConnectionStringFactory);
41 : Collection = new TableQuery<TData>();
42 : }
43 :
44 : #endregion
45 :
46 : #region Implementation of IEnumerable
47 :
48 : /// <summary>
49 : /// Returns an enumerator that iterates through the collection.
50 : /// </summary>
51 : /// <returns>
52 : /// A <see cref="T:System.Collections.Generic.IEnumerator`1"/> that can be used to iterate through the collection.
53 : /// </returns>
54 : IEnumerator<TCollectionItemData> IEnumerable<TCollectionItemData>.GetEnumerator()
55 : {
56 : throw new NotImplementedException("Use IEnumerable<TData>.GetEnumerator() directly");
57 : }
58 :
59 : /// <summary>
60 : /// Returns an enumerator that iterates through the collection.
61 : /// </summary>
62 : /// <returns>
63 : /// A <see cref="T:System.Collections.Generic.IEnumerator`1"/> that can be used to iterate through the collection.
64 : /// </returns>
65 1 : public override IEnumerator<TData> GetEnumerator()
66 : {
67 : return Collection.GetEnumerator();
68 : }
69 :
70 : #endregion
71 :
72 : #region Implementation of IQueryable
73 :
74 : /// <summary>
75 : /// Gets the expression tree that is associated with the instance of <see cref="T:System.Linq.IQueryable"/>.
76 : /// </summary>
77 : /// <returns>
78 : /// The <see cref="T:System.Linq.Expressions.Expression"/> that is associated with this instance of <see cref="T:System.Linq.IQueryable"/>.
79 : /// </returns>
80 : public override Expression Expression
81 : {
82 : get
83 : {
84 : return Collection.Expression;
85 : }
86 : }
87 :
88 : /// <summary>
89 : /// Gets the type of the element(s) that are returned when the expression tree associated with this instance of <see cref="T:System.Linq.IQueryable"/> is executed.
90 : /// </summary>
91 : /// <returns>
92 : /// A <see cref="T:System.Type"/> that represents the type of the element(s) that are returned when the expression tree associated with this object is executed.
93 : /// </returns>
94 : public override Type ElementType
95 : {
96 : get
97 : {
98 : return Collection.ElementType;
99 : }
100 : }
101 :
102 : /// <summary>
103 : /// Gets the query provider that is associated with this data source.
104 : /// </summary>
105 : /// <returns>
106 : /// The <see cref="T:System.Linq.IQueryProvider"/> that is associated with this data source.
107 : /// </returns>
108 : public override IQueryProvider Provider
109 : {
110 : get
111 : {
112 : return Collection.Provider;
113 : }
114 : }
115 :
116 : #endregion
117 :
118 0 : protected virtual void AsyncSaveData<TSaveData, TResult>(TSaveData data, Func<TSaveData, CloudTable, TResult> function, Func<TSaveData, string> customFilenameFunction = null)
119 : {
120 : IList<Task> persistTasks = new List<Task>();
121 : foreach (Tuple<CloudStorageAccount, CloudTable> tuple in WritableCollection)
122 : {
123 : TSaveData taskData = data;
124 : CloudTable table = tuple.Item2;
125 : Task task = Task.Factory.StartNewSafely
126 : (
127 : () =>
128 : {
129 : AzureStorageRetryPolicy.ExecuteAction(() => function(taskData, table));
130 : }
131 : );
132 : persistTasks.Add(task);
133 : }
134 :
135 : bool anyFailed = Task.Factory.ContinueWhenAll(persistTasks.ToArray(), tasks =>
136 : {
137 : return tasks.Any(task => task.IsFaulted);
138 : }).Result;
139 : if (anyFailed)
140 : throw new AggregateException("Persisting data to table storage failed. Check the logs for more details.");
141 : }
142 :
143 0 : protected abstract ITableEntity CreateTableEntity(TCollectionItemData data);
144 :
145 : #region Implementation of IDataStore<TData>
146 :
147 0 : public override void Add(TData data)
148 : {
149 : Add(data);
150 : }
151 :
152 0 : public virtual void Add(ITableEntity data)
153 : {
154 : AsyncSaveData
155 : (
156 : data,
157 : (taskData, table) =>
158 : {
159 : try
160 : {
161 : TableOperation insertOperation = TableOperation.Insert(taskData);
162 :
163 : // Execute the insert operation.
164 : return table.Execute(insertOperation);
165 : }
166 : catch (StorageException exception)
167 : {
168 : var extendedErrorInformation = exception.RequestInformation.ExtendedErrorInformation;
169 : if (extendedErrorInformation != null)
170 : Logger.LogError("There was an issue persisting data to table storage.", exception: exception);
171 : else
172 : Logger.LogError(string.Format("There was an issue persisting data to table storage. Specifically {0} :: {1}", exception.RequestInformation.ExtendedErrorInformation.ErrorCode, exception.RequestInformation.ExtendedErrorInformation.ErrorMessage), exception: exception);
173 : throw;
174 : }
175 : catch (Exception exception)
176 : {
177 : Logger.LogError("There was an issue persisting data to table storage.", exception: exception);
178 : throw;
179 : }
180 : }
181 : );
182 : }
183 :
184 0 : public override void Add(IEnumerable<TData> data)
185 : {
186 : Add(data);
187 : }
188 :
189 0 : public virtual void Add(IEnumerable<ITableEntity> data)
190 : {
191 : AsyncSaveData
192 : (
193 : data,
194 : (taskData, table) =>
195 : {
196 : try
197 : {
198 : // Create the batch operation.
199 : TableBatchOperation batchOperation = new TableBatchOperation();
200 :
201 : foreach (TData item in taskData)
202 : batchOperation.Insert(item);
203 :
204 : // Execute the insert operation.
205 : return table.ExecuteBatch(batchOperation);
206 : }
207 : catch (Exception exception)
208 : {
209 : Logger.LogError("There was an issue persisting data to table storage.", exception: exception);
210 : throw;
211 : }
212 : }
213 : );
214 : }
215 :
216 0 : public override void Destroy(TData data)
217 : {
218 : AsyncSaveData
219 : (
220 : data,
221 : (taskData, table) =>
222 : {
223 : try
224 : {
225 : // Create a retrieve operation that takes a customer entity.
226 : TableOperation retrieveOperation = GetUpdatableTableEntity(taskData);
227 :
228 : // Execute the operation.
229 : TableResult retrievedResult = table.Execute(retrieveOperation);
230 : ITableEntity tableEntity = (ITableEntity)retrievedResult.Result;
231 :
232 : TableOperation deleteOperation = TableOperation.Delete(tableEntity);
233 :
234 : // Execute the delete operation.
235 : return table.Execute(deleteOperation);
236 : }
237 : catch (Exception exception)
238 : {
239 : Logger.LogError("There was an issue deleting data from table storage.", exception: exception);
240 : throw;
241 : }
242 : }
243 : );
244 : }
245 :
246 : /*
247 : public virtual void Destroy(ITableEntity data)
248 : {
249 : AsyncSaveData
250 : (
251 : data,
252 : (taskData, table) =>
253 : {
254 : try
255 : {
256 : // Create a retrieve operation that takes a customer entity.
257 : TableOperation retrieveOperation = GetUpdatableTableEntity(taskData);
258 :
259 : // Execute the operation.
260 : TableResult retrievedResult = table.Execute(retrieveOperation);
261 : ITableEntity tableEntity = (ITableEntity)retrievedResult.Result;
262 :
263 : TableOperation deleteOperation = TableOperation.Delete(tableEntity);
264 :
265 : // Execute the delete operation.
266 : return table.Execute(deleteOperation);
267 : }
268 : catch (Exception exception)
269 : {
270 : Logger.LogError("There was an issue deleting data from table storage.", exception: exception);
271 : throw;
272 : }
273 : }
274 : );
275 : }
276 : */
277 :
278 0 : public virtual void Add(TCollectionItemData data)
279 : {
280 : // Create the TableOperation object that inserts the customer entity.
281 : Add(CreateTableEntity(data));
282 : }
283 :
284 0 : public virtual void Add(IEnumerable<TCollectionItemData> data)
285 : {
286 : // Create the TableOperation object that inserts the customer entity.
287 : Add(data.Select(tdata => (TData)CreateTableEntity(tdata)));
288 : }
289 :
290 : /// <summary>
291 : /// Will mark the <paramref name="data"/> as logically (or soft).
292 : /// </summary>
293 1 : public abstract void Remove(TCollectionItemData data);
294 :
295 0 : public virtual void Destroy(TCollectionItemData data)
296 : {
297 : Destroy((TData)CreateTableEntity(data));
298 : }
299 :
300 0 : public override void RemoveAll()
301 : {
302 : foreach (Tuple<CloudStorageAccount, CloudTable> tuple in WritableCollection)
303 : tuple.Item2.DeleteIfExists();
304 : }
305 :
306 0 : public virtual void Update(TCollectionItemData data)
307 : {
308 : Update((TData)CreateTableEntity(data));
309 : }
310 :
311 0 : public override void Update(TData data)
312 : {
313 : AsyncSaveData
314 : (
315 : data,
316 : (taskData, table) =>
317 : {
318 : try
319 : {
320 : // Create a retrieve operation that takes a customer entity.
321 : TableOperation retrieveOperation = GetUpdatableTableEntity(taskData);
322 :
323 : // Execute the operation.
324 : TableResult retrievedResult = table.Execute(retrieveOperation);
325 : ITableEntity tableEntity = ReplaceValues(retrievedResult, data);
326 :
327 : TableOperation updateOperation = TableOperation.Replace(tableEntity);
328 :
329 : // Execute the update operation.
330 : return table.Execute(updateOperation);
331 : }
332 : catch (Exception exception)
333 : {
334 : Logger.LogError("There was an issue updating data in table storage.", exception: exception);
335 : throw;
336 : }
337 : }
338 : );
339 : }
340 :
341 : #endregion
342 :
343 0 : protected abstract TableOperation GetUpdatableTableEntity(TCollectionItemData data);
344 :
345 0 : protected abstract TableOperation GetUpdatableTableEntity(TData data);
346 :
347 : /// <summary>
348 : /// Creates a <see cref="CloudTable"/> with the specified name <paramref name="sourceName"/> if it doesn't already exist.
349 : /// </summary>
350 : /// <param name="storageAccount">The storage account to create the <see cref="CloudTable"/> is</param>
351 : /// <param name="sourceName">The name of the <see cref="CloudTable"/>.</param>
352 : /// <param name="isPublic">Whether or not this <see cref="CloudTable"/> is publicly accessible.</param>
353 1 : protected override CloudTable CreateSource(CloudStorageAccount storageAccount, string sourceName, bool isPublic = true)
354 : {
355 : // Create the table client.
356 : CloudTableClient tableClient = storageAccount.CreateCloudTableClient();
357 :
358 : // Retrieve a reference to the table.
359 : CloudTable table = tableClient.GetTableReference(GetSafeSourceName(sourceName));
360 :
361 : // Create the table if it doesn't exist.
362 : try
363 : {
364 : table.CreateIfNotExists();
365 : }
366 : catch (StorageException exception)
367 : {
368 : var extendedErrorInformation = exception.RequestInformation.ExtendedErrorInformation;
369 : if (extendedErrorInformation != null)
370 : Logger.LogError("There was an issue creating the table.", exception: exception);
371 : else
372 : Logger.LogError(string.Format("There was an issue creating the table. Specifically {0} :: {1}", exception.RequestInformation.ExtendedErrorInformation.ErrorCode, exception.RequestInformation.ExtendedErrorInformation.ErrorMessage), exception: exception);
373 : throw;
374 : }
375 : catch (Exception exception)
376 : {
377 : Logger.LogError("There was an issue creating the table.", exception: exception);
378 : throw;
379 : }
380 :
381 : return table;
382 : }
383 :
384 0 : public virtual TData GetByKeyAndRow(Guid rsn)
385 : {
386 : // Create the table query.
387 : var rangeQuery = Collection.Where
388 : (
389 : TableQuery.CombineFilters
390 : (
391 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(typeof(TCollectionItemData).FullName)),
392 : TableOperators.And,
393 : TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(rsn.ToString("N")))
394 : )
395 : );
396 :
397 : return ReadableSource.ExecuteQuery(rangeQuery).Single();
398 : }
399 :
400 0 : public virtual IEnumerable<TData> GetByKey()
401 : {
402 : // Create the table query.
403 : var rangeQuery = Collection.Where
404 : (
405 : TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, StorageStore<object, object>.GetSafeStorageKey(typeof(TCollectionItemData).FullName))
406 : );
407 :
408 : return ReadableSource.ExecuteQuery(rangeQuery);
409 : }
410 :
411 0 : protected virtual ITableEntity ReplaceValues(TableResult retrievedResult, TData data)
412 : {
413 : ITableEntity tableEntity = (ITableEntity)retrievedResult.Result;
414 : var eventTableEntity = tableEntity as IEventDataTableEntity<TData>;
415 : if (eventTableEntity != null)
416 : eventTableEntity.EventData = data;
417 : else
418 : ((IEntityTableEntity<TCollectionItemData>)tableEntity).Entity = ((IEntityTableEntity<TCollectionItemData>)data).Entity;
419 :
420 : return tableEntity;
421 : }
422 :
423 : #region Overrides of StorageStore<TData,CloudTable>
424 :
425 0 : protected override string GetSafeSourceName(string sourceName)
426 : {
427 : return GetSafeSourceName(sourceName, false);
428 : }
429 :
430 : #endregion
431 : }
432 : }
|