Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Data.Linq;
12 : using System.Linq;
13 : using System.Threading.Tasks;
14 : using System.Transactions;
15 : using cdmdotnet.Logging;
16 : using Cqrs.Configuration;
17 :
18 : namespace Cqrs.Events
19 : {
20 : /// <summary>
21 : /// A simplified SqlServer based <see cref="EventStore{TAuthenticationToken}"/> that uses LinqToSql and follows a rigid schema that also replicates to multiple connections, but only reads from one connection.
22 : /// </summary>
23 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
24 : public class ReplicatedSqlEventStore<TAuthenticationToken> : SqlEventStore<TAuthenticationToken>
25 1 : {
26 : /// <summary>
27 : /// A collection of connection strings that are used to write to the database.
28 : /// </summary>
29 : protected IEnumerable<string> WritableConnectionStrings { get; private set; }
30 :
31 : /// <summary>
32 : /// Instantiates and Initialises a new instance of the <see cref="ReplicatedSqlEventStore{TAuthenticationToken}"/> class.
33 : /// </summary>
34 1 : public ReplicatedSqlEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, IConfigurationManager configurationManager)
35 : : base(eventBuilder, eventDeserialiser, logger, configurationManager)
36 : {
37 : var writableConnectionStrings = new List<string>();
38 :
39 : string connectionStringkey;
40 : if (!ConfigurationManager.TryGetSetting(SqlEventStoreConnectionNameApplicationKey, out connectionStringkey) || string.IsNullOrEmpty(connectionStringkey))
41 : throw new NullReferenceException(string.Format("No application setting named '{0}' was found in the configuration file with the name of a connection string to look for.", SqlEventStoreConnectionNameApplicationKey));
42 : string connectionString;
43 : int writeIndex = 1;
44 : while (!string.IsNullOrWhiteSpace(connectionStringkey))
45 : {
46 : try
47 : {
48 : connectionString = System.Configuration.ConfigurationManager.ConnectionStrings[connectionStringkey].ConnectionString;
49 : }
50 : catch (NullReferenceException exception)
51 : {
52 : throw new NullReferenceException(string.Format("No connection string setting named '{0}' was found in the configuration file with the SQL Event Store connection string.", connectionStringkey), exception);
53 : }
54 : writableConnectionStrings.Add(connectionString);
55 : if (!ConfigurationManager.TryGetSetting(string.Format("{0}.{1}", SqlEventStoreConnectionNameApplicationKey, writeIndex), out connectionStringkey) || string.IsNullOrEmpty(connectionStringkey))
56 : connectionStringkey = null;
57 : writeIndex++;
58 : }
59 :
60 : WritableConnectionStrings = writableConnectionStrings;
61 : }
62 :
63 : /// <summary>
64 : /// Persist the provided <paramref name="eventData"/> into each SQL Server in <see cref="WritableConnectionStrings"/>.
65 : /// A single <see cref="TransactionScope"/> wraps all SQL servers, so all must complete successfully, or they will ALL roll back.
66 : /// </summary>
67 : /// <param name="eventData">The <see cref="EventData"/> to persist.</param>
68 1 : protected override void PersistEvent(EventData eventData)
69 : {
70 : try
71 : {
72 : using (TransactionScope scope = new TransactionScope())
73 : {
74 : IList<Task> persistTasks = new List<Task>();
75 : foreach (string connectionString in WritableConnectionStrings)
76 : {
77 : // Do not remove this variable copying or the parallel task stuff will bork.
78 : var safeConnectionString = connectionString;
79 : DependentTransaction subTransaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
80 : Task task = Task.Factory.StartNewSafely
81 : (
82 : (subTransactionObject) =>
83 : {
84 : var subTrx = (DependentTransaction) subTransactionObject;
85 : //Pass the DependentTransaction to the scope, so that work done in the scope becomes part of the transaction passed to the worker thread
86 : using (TransactionScope ts = new TransactionScope(subTrx))
87 : {
88 : using (DataContext dbDataContext = new DataContext(safeConnectionString))
89 : Add(dbDataContext, eventData);
90 :
91 : //Call complete on the transaction scope
92 : ts.Complete();
93 : }
94 :
95 : //Call complete on the dependent transaction
96 : subTrx.Complete();
97 : },
98 : subTransaction
99 : );
100 : persistTasks.Add(task);
101 : }
102 :
103 : bool anyFailed = Task.Factory.ContinueWhenAll(persistTasks.ToArray(), tasks =>
104 : {
105 : return tasks.Any(task => task.IsFaulted);
106 : }).Result;
107 : if (anyFailed)
108 : throw new AggregateException("Persisting data to the SQL event store failed. Check the logs for more details.");
109 : scope.Complete();
110 : }
111 : }
112 : catch (TransactionException exception)
113 : {
114 : Logger.LogError("There was an issue with the SQL transaction persisting data to the SQL event store.", exception: exception);
115 : throw;
116 : }
117 : catch (Exception exception)
118 : {
119 : Logger.LogError("There was an issue persisting data to the SQL event store.", exception: exception);
120 : throw;
121 : }
122 : }
123 : }
124 : }
|