Line data Source code
1 : using System;
2 : using System.Collections.Generic;
3 : using System.Data.Linq;
4 : using System.Linq;
5 : using System.Threading.Tasks;
6 : using System.Transactions;
7 : using cdmdotnet.Logging;
8 : using Cqrs.Configuration;
9 : using Cqrs.Events;
10 :
11 : namespace Cqrs.Sql.Events
12 : {
13 : /// <summary>
14 : /// A simplified SqlServer based <see cref="EventStore{TAuthenticationToken}"/> that uses LinqToSql and follows a rigid schema that also replicates to multiple connections, but only reads from one connection.
15 : /// </summary>
16 : public class ReplicatedSqlEventStore<TAuthenticationToken> : SqlEventStore<TAuthenticationToken>
17 1 : {
18 : protected IEnumerable<string> WritableConnectionStrings { get; private set; }
19 :
20 0 : public ReplicatedSqlEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, IConfigurationManager configurationManager)
21 : : base(eventBuilder, eventDeserialiser, logger, configurationManager)
22 : {
23 : var writableConnectionStrings = new List<string>();
24 :
25 : string connectionStringkey;
26 : if (!ConfigurationManager.TryGetSetting(SqlEventStoreConnectionNameApplicationKey, out connectionStringkey) || string.IsNullOrEmpty(connectionStringkey))
27 : throw new NullReferenceException(string.Format("No application setting named '{0}' was found in the configuration file with the name of a connection string to look for.", SqlEventStoreConnectionNameApplicationKey));
28 : string connectionString;
29 : int writeIndex = 1;
30 : while (!string.IsNullOrWhiteSpace(connectionStringkey))
31 : {
32 : try
33 : {
34 : connectionString = System.Configuration.ConfigurationManager.ConnectionStrings[connectionStringkey].ConnectionString;
35 : }
36 : catch (NullReferenceException exception)
37 : {
38 : throw new NullReferenceException(string.Format("No connection string setting named '{0}' was found in the configuration file with the SQL Event Store connection string.", connectionStringkey), exception);
39 : }
40 : writableConnectionStrings.Add(connectionString);
41 : if (!ConfigurationManager.TryGetSetting(string.Format("{0}.{1}", SqlEventStoreConnectionNameApplicationKey, writeIndex), out connectionStringkey) || string.IsNullOrEmpty(connectionStringkey))
42 : connectionStringkey = null;
43 : writeIndex++;
44 : }
45 :
46 : WritableConnectionStrings = writableConnectionStrings;
47 : }
48 :
49 0 : protected override void PersistEvent(EventData eventData)
50 : {
51 : try
52 : {
53 : using (TransactionScope scope = new TransactionScope())
54 : {
55 : IList<Task> persistTasks = new List<Task>();
56 : foreach (string connectionString in WritableConnectionStrings)
57 : {
58 : // Do not remove this variable copying or the parallel task stuff will bork.
59 : var safeConnectionString = connectionString;
60 : DependentTransaction subTransaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
61 : Task task = Task.Factory.StartNewSafely
62 : (
63 : (subTransactionObject) =>
64 : {
65 : var subTrx = (DependentTransaction) subTransactionObject;
66 : //Pass the DependentTransaction to the scope, so that work done in the scope becomes part of the transaction passed to the worker thread
67 : using (TransactionScope ts = new TransactionScope(subTrx))
68 : {
69 : using (DataContext dbDataContext = new DataContext(safeConnectionString))
70 : Add(dbDataContext, eventData);
71 :
72 : //Call complete on the transaction scope
73 : ts.Complete();
74 : }
75 :
76 : //Call complete on the dependent transaction
77 : subTrx.Complete();
78 : },
79 : subTransaction
80 : );
81 : persistTasks.Add(task);
82 : }
83 :
84 : bool anyFailed = Task.Factory.ContinueWhenAll(persistTasks.ToArray(), tasks =>
85 : {
86 : return tasks.Any(task => task.IsFaulted);
87 : }).Result;
88 : if (anyFailed)
89 : throw new AggregateException("Persisting data to the SQL event store failed. Check the logs for more details.");
90 : scope.Complete();
91 : }
92 : }
93 : catch (TransactionException exception)
94 : {
95 : Logger.LogError("There was an issue with the SQL transaction persisting data to the SQL event store.", exception: exception);
96 : throw;
97 : }
98 : catch (Exception exception)
99 : {
100 : Logger.LogError("There was an issue persisting data to the SQL event store.", exception: exception);
101 : throw;
102 : }
103 : }
104 : }
105 : }
|