|           Line data    Source code 
       1             : #region Copyright
       2             : // // -----------------------------------------------------------------------
       3             : // // <copyright company="Chinchilla Software Limited">
       4             : // //   Copyright Chinchilla Software Limited. All rights reserved.
       5             : // // </copyright>
       6             : // // -----------------------------------------------------------------------
       7             : #endregion
       8             : 
       9             : using System;
      10             : using System.Collections.Generic;
      11             : using System.Data.Linq;
      12             : using System.Linq;
      13             : using System.Threading.Tasks;
      14             : using System.Transactions;
      15             : using cdmdotnet.Logging;
      16             : using Cqrs.Configuration;
      17             : 
      18             : namespace Cqrs.Events
      19             : {
      20             :         /// <summary>
      21             :         /// A simplified SqlServer based <see cref="EventStore{TAuthenticationToken}"/> that uses LinqToSql and follows a rigid schema that also replicates to multiple connections, but only reads from one connection.
      22             :         /// </summary>
      23             :         /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
      24             :         public class ReplicatedSqlEventStore<TAuthenticationToken> : SqlEventStore<TAuthenticationToken>
      25           1 :         {
      26             :                 /// <summary>
      27             :                 /// A collection of connection strings that are used to write to the database.
      28             :                 /// </summary>
      29             :                 protected IEnumerable<string> WritableConnectionStrings { get; private set; }
      30             : 
      31             :                 /// <summary>
      32             :                 /// Instantiates and Initialises a new instance of the <see cref="ReplicatedSqlEventStore{TAuthenticationToken}"/> class.
      33             :                 /// </summary>
      34           1 :                 public ReplicatedSqlEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, IConfigurationManager configurationManager)
      35             :                         : base(eventBuilder, eventDeserialiser, logger, configurationManager)
      36             :                 {
      37             :                         var writableConnectionStrings = new List<string>();
      38             : 
      39             :                         string connectionStringkey;
      40             :                         if (!ConfigurationManager.TryGetSetting(SqlEventStoreConnectionNameApplicationKey, out connectionStringkey) || string.IsNullOrEmpty(connectionStringkey))
      41             :                                 throw new NullReferenceException(string.Format("No application setting named '{0}' was found in the configuration file with the name of a connection string to look for.", SqlEventStoreConnectionNameApplicationKey));
      42             :                         string connectionString;
      43             :                         int writeIndex = 1;
      44             :                         while (!string.IsNullOrWhiteSpace(connectionStringkey))
      45             :                         {
      46             :                                 try
      47             :                                 {
      48             :                                         connectionString = System.Configuration.ConfigurationManager.ConnectionStrings[connectionStringkey].ConnectionString;
      49             :                                 }
      50             :                                 catch (NullReferenceException exception)
      51             :                                 {
      52             :                                         throw new NullReferenceException(string.Format("No connection string setting named '{0}' was found in the configuration file with the SQL Event Store connection string.", connectionStringkey), exception);
      53             :                                 }
      54             :                                 writableConnectionStrings.Add(connectionString);
      55             :                                 if (!ConfigurationManager.TryGetSetting(string.Format("{0}.{1}", SqlEventStoreConnectionNameApplicationKey, writeIndex), out connectionStringkey) || string.IsNullOrEmpty(connectionStringkey))
      56             :                                         connectionStringkey = null;
      57             :                                 writeIndex++;
      58             :                         }
      59             : 
      60             :                         WritableConnectionStrings = writableConnectionStrings;
      61             :                 }
      62             : 
      63             :                 /// <summary>
      64             :                 /// Persist the provided <paramref name="eventData"/> into each SQL Server in <see cref="WritableConnectionStrings"/>.
      65             :                 /// A single <see cref="TransactionScope"/> wraps all SQL servers, so all must complete successfully, or they will ALL roll back.
      66             :                 /// </summary>
      67             :                 /// <param name="eventData">The <see cref="EventData"/> to persist.</param>
      68           1 :                 protected override void PersistEvent(EventData eventData)
      69             :                 {
      70             :                         try
      71             :                         {
      72             :                                 using (TransactionScope scope = new TransactionScope())
      73             :                                 {
      74             :                                         IList<Task> persistTasks = new List<Task>();
      75             :                                         foreach (string connectionString in WritableConnectionStrings)
      76             :                                         {
      77             :                                                 // Do not remove this variable copying or the parallel task stuff will bork.
      78             :                                                 var safeConnectionString = connectionString;
      79             :                                                 DependentTransaction subTransaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
      80             :                                                 Task task = Task.Factory.StartNewSafely
      81             :                                                 (
      82             :                                                         (subTransactionObject) =>
      83             :                                                         {
      84             :                                                                 var subTrx = (DependentTransaction) subTransactionObject;
      85             :                                                                 //Pass the DependentTransaction to the scope, so that work done in the scope becomes part of the transaction passed to the worker thread
      86             :                                                                 using (TransactionScope ts = new TransactionScope(subTrx))
      87             :                                                                 {
      88             :                                                                         using (DataContext dbDataContext = new DataContext(safeConnectionString))
      89             :                                                                                 Add(dbDataContext, eventData);
      90             : 
      91             :                                                                         //Call complete on the transaction scope
      92             :                                                                         ts.Complete();
      93             :                                                                 }
      94             : 
      95             :                                                                 //Call complete on the dependent transaction
      96             :                                                                 subTrx.Complete();
      97             :                                                         },
      98             :                                                         subTransaction
      99             :                                                 );
     100             :                                                 persistTasks.Add(task);
     101             :                                         }
     102             : 
     103             :                                         bool anyFailed = Task.Factory.ContinueWhenAll(persistTasks.ToArray(), tasks =>
     104             :                                         {
     105             :                                                 return tasks.Any(task => task.IsFaulted);
     106             :                                         }).Result;
     107             :                                         if (anyFailed)
     108             :                                                 throw new AggregateException("Persisting data to the SQL event store failed. Check the logs for more details.");
     109             :                                         scope.Complete();
     110             :                                 }
     111             :                         }
     112             :                         catch (TransactionException exception)
     113             :                         {
     114             :                                 Logger.LogError("There was an issue with the SQL transaction persisting data to the SQL event store.", exception: exception);
     115             :                                 throw;
     116             :                         }
     117             :                         catch (Exception exception)
     118             :                         {
     119             :                                 Logger.LogError("There was an issue persisting data to the SQL event store.", exception: exception);
     120             :                                 throw;
     121             :                         }
     122             :                 }
     123             :         }
     124             : }
 |