Documentation Coverage Report
Current view: top level - Cqrs.EventStore - EventStore.cs Hit Total Coverage
Version: 2.2 Artefacts: 8 8 100.0 %
Date: 2017-09-22

          Line data    Source code
       1             : #region Copyright
       2             : // // -----------------------------------------------------------------------
       3             : // // <copyright company="Chinchilla Software Limited">
       4             : // //   Copyright Chinchilla Software Limited. All rights reserved.
       5             : // // </copyright>
       6             : // // -----------------------------------------------------------------------
       7             : #endregion
       8             : 
       9             : using System;
      10             : using System.Collections.Generic;
      11             : using System.Linq;
      12             : using Cqrs.Domain;
      13             : using Cqrs.Events;
      14             : using Cqrs.Messages;
      15             : using EventStore.ClientAPI;
      16             : using EventData = EventStore.ClientAPI.EventData;
      17             : 
      18             : namespace Cqrs.EventStore
      19             : {
      20             :         /// <summary>
      21             :         /// A Greg Young Event Store based <see cref="EventStore{TAuthenticationToken}"/>.
      22             :         /// </summary>
      23             :         /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
      24             :         public class EventStore<TAuthenticationToken> : IEventStore<TAuthenticationToken>
      25           1 :         {
      26             :                 /// <summary>
      27             :                 /// The pattern used to create stream names.
      28             :                 /// </summary>
      29             :                 protected const string CqrsEventStoreStreamNamePattern = "{0}/{1}";
      30             : 
      31             :                 /// <summary>
      32             :                 /// The <see cref="IEventBuilder{TAuthenticationToken}"/> used to build events.
      33             :                 /// </summary>
      34             :                 protected IEventBuilder<TAuthenticationToken> EventBuilder { get; set; }
      35             : 
      36             :                 /// <summary>
      37             :                 /// The <see cref="IEventDeserialiser{TAuthenticationToken}"/> used to deserialise events.
      38             :                 /// </summary>
      39             :                 protected IEventDeserialiser<TAuthenticationToken> EventDeserialiser { get; set; }
      40             : 
      41             :                 /// <summary>
      42             :                 /// The <see cref="IEventStoreConnection"/> used to read and write streams in the Greg Young Event Store.
      43             :                 /// </summary>
      44             :                 protected IEventStoreConnection EventStoreConnection { get; set; }
      45             : 
      46             :                 /// <summary>
      47             :                 /// Instantiates a new instance of <see cref="EventStore{TAuthenticationToken}"/>.
      48             :                 /// </summary>
      49           1 :                 public EventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, IEventStoreConnectionHelper eventStoreConnectionHelper)
      50             :                 {
      51             :                         EventBuilder = eventBuilder;
      52             :                         EventDeserialiser = eventDeserialiser;
      53             :                         EventStoreConnection = eventStoreConnectionHelper.GetEventStoreConnection();
      54             :                 }
      55             : 
      56             :                 #region Implementation of IEventStore
      57             : 
      58             :                 /// <summary>
      59             :                 /// Saves the provided <paramref name="event"/>.
      60             :                 /// </summary>
      61             :                 /// <typeparam name="T">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</typeparam>
      62             :                 /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to be saved.</param>
      63           1 :                 public void Save<T>(IEvent<TAuthenticationToken> @event)
      64             :                 {
      65             :                         Save(typeof (T), @event);
      66             :                 }
      67             : 
      68             :                 /// <summary>
      69             :                 /// Saves the provided <paramref name="event"/>.
      70             :                 /// </summary>
      71             :                 /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
      72             :                 /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to be saved.</param>
      73           1 :                 public void Save(Type aggregateRootType, IEvent<TAuthenticationToken> @event)
      74             :                 {
      75             :                         EventData eventData = EventBuilder.CreateFrameworkEvent(@event);
      76             :                         string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, @event.Id);
      77             :                         using (EventStoreTransaction transaction = EventStoreConnection.StartTransactionAsync(streamName, ExpectedVersion.Any).Result)
      78             :                         {
      79             :                                 WriteResult saveResult = EventStoreConnection.AppendToStreamAsync(streamName, ExpectedVersion.Any, new[] {eventData}).Result;
      80             :                                 WriteResult commitResult = transaction.CommitAsync().Result;
      81             :                         }
      82             :                 }
      83             : 
      84             :                 /// <summary>
      85             :                 /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <typeparamref name="T">aggregate root</typeparamref> with the ID matching the provided <paramref name="aggregateId"/>.
      86             :                 /// </summary>
      87             :                 /// <typeparam name="T">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</typeparam>
      88             :                 /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
      89             :                 /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
      90             :                 /// <param name="fromVersion">Load events starting from this version</param>
      91             :                 /// <remarks>
      92             :                 /// The value of <paramref name="fromVersion"/> is zero based but the internals indexing of the EventStore is offset by <see cref="StreamPosition.Start"/>.
      93             :                 /// Adjust the value of <paramref name="fromVersion"/> by <see cref="StreamPosition.Start"/>
      94             :                 /// </remarks>
      95           1 :                 public IEnumerable<IEvent<TAuthenticationToken>> Get<T>(Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
      96             :                 {
      97             :                         return Get(typeof (T), aggregateId, useLastEventOnly, fromVersion);
      98             :                 }
      99             : 
     100             :                 /// <summary>
     101             :                 /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
     102             :                 /// </summary>
     103             :                 /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
     104             :                 /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
     105             :                 /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
     106             :                 /// <param name="fromVersion">Load events starting from this version</param>
     107             :                 /// <remarks>
     108             :                 /// The value of <paramref name="fromVersion"/> is zero based but the internals indexing of the EventStore is offset by <see cref="StreamPosition.Start"/>.
     109             :                 /// Adjust the value of <paramref name="fromVersion"/> by <see cref="StreamPosition.Start"/>
     110             :                 /// </remarks>
     111           1 :                 public IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
     112             :                 {
     113             :                         int startPosition = StreamPosition.Start;
     114             :                         if (fromVersion > -1)
     115             :                                 startPosition = fromVersion + StreamPosition.Start;
     116             :                         StreamEventsSlice eventCollection;
     117             :                         string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
     118             :                         if (useLastEventOnly)
     119             :                                 eventCollection = EventStoreConnection.ReadStreamEventsBackwardAsync(streamName, startPosition, 1, false).Result;
     120             :                         else
     121             :                                 eventCollection = EventStoreConnection.ReadStreamEventsForwardAsync(streamName, startPosition, 200, false).Result;
     122             :                         return eventCollection.Events.Select(EventDeserialiser.Deserialise);
     123             :                 }
     124             : 
     125             :                 /// <summary>
     126             :                 /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
     127             :                 /// </summary>
     128             :                 /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
     129           1 :                 public IEnumerable<Events.EventData> Get(Guid correlationId)
     130             :                 {
     131             :                         throw new NotImplementedException();
     132             :                 }
     133             : 
     134             :                 #endregion
     135             : 
     136             :                 /// <summary>
     137             :                 /// Requests the <paramref name="connection"/> responds to OnConnect client notifications.
     138             :                 /// </summary>
     139             :                 /// <param name="connection">The <see cref="IEventStoreConnection"/> that will be monitored.</param>
     140           1 :                 protected virtual void ListenForNotificationsOnConnection(IEventStoreConnection connection)
     141             :                 {
     142             :                         connection.SubscribeToAllAsync(true, DisplayNotificationArrival, DisplaySubscriptionDropped).RunSynchronously();
     143             :                 }
     144             : 
     145             :                 private void DisplayNotificationArrival(EventStoreSubscription subscription, ResolvedEvent notification)
     146             :                 {
     147             :                         RecordedEvent @event = notification.Event;
     148             :                         string eventTypePrefix = @event.Data.GetType().AssemblyQualifiedName;
     149             :                         if (string.IsNullOrWhiteSpace(@event.EventType) || @event.EventType != eventTypePrefix)
     150             :                                 return;
     151             :                         Console.WriteLine("{0} : {1}", eventTypePrefix, @event.EventType);
     152             :                 }
     153             : 
     154             :                 private void DisplaySubscriptionDropped(EventStoreSubscription subscription, SubscriptionDropReason reasonDropped, Exception exception)
     155             :                 {
     156             :                         Console.WriteLine("Opps");
     157             :                 }
     158             :         }
     159             : }

Generated by: LCOV version 1.10