|           Line data    Source code 
       1             : using System;
       2             : using System.Collections.Generic;
       3             : using System.Linq;
       4             : using Cqrs.Events;
       5             : using EventStore.ClientAPI;
       6             : using EventData = EventStore.ClientAPI.EventData;
       7             : 
       8             : namespace Cqrs.EventStore
       9             : {
      10             :         public class EventStore<TAuthenticationToken> : IEventStore<TAuthenticationToken>
      11           0 :         {
      12             :                 protected const string CqrsEventStoreStreamNamePattern = "{0}/{1}";
      13             : 
      14             :                 protected IEventBuilder<TAuthenticationToken> EventBuilder { get; set; }
      15             : 
      16             :                 protected IEventDeserialiser<TAuthenticationToken> EventDeserialiser { get; set; }
      17             : 
      18             :                 protected IEventStoreConnection EventStoreConnection { get; set; }
      19             : 
      20           0 :                 public EventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, IEventStoreConnectionHelper eventStoreConnectionHelper)
      21             :                 {
      22             :                         EventBuilder = eventBuilder;
      23             :                         EventDeserialiser = eventDeserialiser;
      24             :                         EventStoreConnection = eventStoreConnectionHelper.GetEventStoreConnection();
      25             :                 }
      26             : 
      27             :                 #region Implementation of IEventStore
      28             : 
      29           0 :                 public void Save<T>(IEvent<TAuthenticationToken> @event)
      30             :                 {
      31             :                         Save(typeof (T), @event);
      32             :                 }
      33             : 
      34           0 :                 public void Save(Type aggregateRootType, IEvent<TAuthenticationToken> @event)
      35             :                 {
      36             :                         EventData eventData = EventBuilder.CreateFrameworkEvent(@event);
      37             :                         string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, @event.Id);
      38             :                         using (EventStoreTransaction transaction = EventStoreConnection.StartTransactionAsync(streamName, ExpectedVersion.Any).Result)
      39             :                         {
      40             :                                 WriteResult saveResult = EventStoreConnection.AppendToStreamAsync(streamName, ExpectedVersion.Any, new[] {eventData}).Result;
      41             :                                 WriteResult commitResult = transaction.CommitAsync().Result;
      42             :                         }
      43             :                 }
      44             : 
      45             :                 /// <remarks>
      46             :                 /// The value of <paramref name="fromVersion"/> is zero based but the internals indexing of the EventStore is offset by <see cref="StreamPosition.Start"/>.
      47             :                 /// Adjust the value of <paramref name="fromVersion"/> by <see cref="StreamPosition.Start"/>
      48             :                 /// </remarks>
      49           1 :                 public IEnumerable<IEvent<TAuthenticationToken>> Get<T>(Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
      50             :                 {
      51             :                         return Get(typeof (T), aggregateId, useLastEventOnly, fromVersion);
      52             :                 }
      53             : 
      54           0 :                 public IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
      55             :                 {
      56             :                         int startPosition = StreamPosition.Start;
      57             :                         if (fromVersion > -1)
      58             :                                 startPosition = fromVersion + StreamPosition.Start;
      59             :                         StreamEventsSlice eventCollection;
      60             :                         string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateType.FullName, aggregateId);
      61             :                         if (useLastEventOnly)
      62             :                                 eventCollection = EventStoreConnection.ReadStreamEventsBackwardAsync(streamName, startPosition, 1, false).Result;
      63             :                         else
      64             :                                 eventCollection = EventStoreConnection.ReadStreamEventsForwardAsync(streamName, startPosition, 200, false).Result;
      65             :                         return eventCollection.Events.Select(EventDeserialiser.Deserialise);
      66             :                 }
      67             : 
      68           0 :                 public IEnumerable<Events.EventData> Get(Guid correlationId)
      69             :                 {
      70             :                         throw new NotImplementedException();
      71             :                 }
      72             : 
      73             :                 #endregion
      74             : 
      75           0 :                 protected virtual void ListenForNotificationsOnConnection(IEventStoreConnection connection)
      76             :                 {
      77             :                         connection.SubscribeToAllAsync(true, DisplayNotificationArrival, DisplaySubscriptionDropped).RunSynchronously();
      78             :                 }
      79             : 
      80             :                 private void DisplayNotificationArrival(EventStoreSubscription subscription, ResolvedEvent notification)
      81             :                 {
      82             :                         RecordedEvent @event = notification.Event;
      83             :                         string eventTypePrefix = @event.Data.GetType().AssemblyQualifiedName;
      84             :                         if (string.IsNullOrWhiteSpace(@event.EventType) || @event.EventType != eventTypePrefix)
      85             :                                 return;
      86             :                         Console.WriteLine("{0} : {1}", eventTypePrefix, @event.EventType);
      87             :                 }
      88             : 
      89             :                 private void DisplaySubscriptionDropped(EventStoreSubscription subscription, SubscriptionDropReason reasonDropped, Exception exception)
      90             :                 {
      91             :                         Console.WriteLine("Opps");
      92             :                 }
      93             :         }
      94             : }
 |