Documentation Coverage Report
Current view: top level - Cqrs.EventStore/Bus - EventStoreEventPublisher.cs Hit Total Coverage
Version: 4.0 Artefacts: 5 5 100.0 %
Date: 2019-11-24 03:15:41

          Line data    Source code
       1             : #region Copyright
       2             : // // -----------------------------------------------------------------------
       3             : // // <copyright company="Chinchilla Software Limited">
       4             : // //   Copyright Chinchilla Software Limited. All rights reserved.
       5             : // // </copyright>
       6             : // // -----------------------------------------------------------------------
       7             : #endregion
       8             : 
       9             : using System;
      10             : using System.Collections.Generic;
      11             : using Cqrs.Bus;
      12             : using Cqrs.Events;
      13             : using Cqrs.Messages;
      14             : using EventStore.ClientAPI;
      15             : 
      16             : namespace Cqrs.EventStore.Bus
      17             : {
      18             :         /// <summary>
      19             :         /// A <see cref="IEventPublisher{TAuthenticationToken}"/> that uses Greg Young's Event Store.
      20             :         /// </summary>
      21             :         /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
      22             :         public class EventStoreEventPublisher<TAuthenticationToken> : IEventPublisher<TAuthenticationToken>
      23           1 :         {
      24             :                 /// <summary>
      25             :                 /// The actions to execute per <see cref="Type"/>
      26             :                 /// </summary>
      27             :                 protected Dictionary<Type, List<Action<IMessage>>> Routes { get; private set; }
      28             : 
      29             :                 /// <summary>
      30             :                 /// The <see cref="IEventStoreConnection"/> used to read and write streams in the Greg Young Event Store.
      31             :                 /// </summary>
      32             :                 protected IEventStoreConnection EventStoreConnection { get; private set; }
      33             : 
      34             :                 /// <summary>
      35             :                 /// The store that hold stream position information.
      36             :                 /// </summary>
      37             :                 protected IStoreLastEventProcessed LastEventProcessedStore { get; private set; }
      38             : 
      39             :                 /// <summary>
      40             :                 /// Instantiates a new instance of <see cref="EventStoreEventPublisher{TAuthenticationToken}"/>
      41             :                 /// </summary>
      42             :                 /// <param name="eventStoreConnectionHelper">The <see cref="IEventStoreConnection"/> used to read and write streams in the Greg Young Event Store.</param>
      43             :                 /// <param name="lastEventProcessedStore">The store that hold stream position information.</param>
      44           1 :                 public EventStoreEventPublisher(IEventStoreConnectionHelper eventStoreConnectionHelper, IStoreLastEventProcessed lastEventProcessedStore)
      45             :                 {
      46             :                         EventStoreConnection = eventStoreConnectionHelper.GetEventStoreConnection();
      47             :                         LastEventProcessedStore = lastEventProcessedStore;
      48             :                         Routes = new Dictionary<Type, List<Action<IMessage>>>();
      49             :                 }
      50             : 
      51             :                 #region Implementation of IEventPublisher<TAuthenticationToken>
      52             : 
      53             :                 /// <summary>
      54             :                 /// Publishes the provided <paramref name="event"/> on the event bus.
      55             :                 /// </summary>
      56           1 :                 public void Publish<TEvent>(TEvent @event)
      57             :                         where TEvent : IEvent<TAuthenticationToken>
      58             :                 {
      59             :                         List<Action<IMessage>> handlers;
      60             :                         if (!Routes.TryGetValue(@event.GetType(), out handlers))
      61             :                                 return;
      62             :                         foreach (Action<IMessage> handler in handlers)
      63             :                                 handler(@event);
      64             :                 }
      65             : 
      66             :                 /// <summary>
      67             :                 /// Publishes the provided <paramref name="events"/> on the event bus.
      68             :                 /// </summary>
      69           1 :                 public void Publish<TEvent>(IEnumerable<TEvent> events)
      70             :                         where TEvent : IEvent<TAuthenticationToken>
      71             :                 {
      72             :                         foreach (TEvent @event in events)
      73             :                                 Publish(@event);
      74             :                 }
      75             : 
      76             :                 #endregion
      77             : 
      78             :                 /// <summary>
      79             :                 /// Reads the position the store was last within the stream and subscribes requesting all events prior to that position aren't replayed.
      80             :                 /// </summary>
      81           1 :                 protected void InitialiseCatchUpSubscription()
      82             :                 {
      83             :                         Position position = GetLastEventProcessedLocation();
      84             : 
      85             :                         EventStoreConnection.SubscribeToAllFrom(position, false, OnEventAppeared, OnLiveProcessingStarted, OnSubscriptionDropped);
      86             :                 }
      87             : 
      88             :                 private Position GetLastEventProcessedLocation()
      89             :                 {
      90             :                         return EventStoreUtilities.FormattedStringToPosition(LastEventProcessedStore.EventLocation);
      91             :                 }
      92             : 
      93             :                 private void OnEventAppeared(EventStoreCatchUpSubscription catchUpSubscription, ResolvedEvent resolvedEvent)
      94             :                 {
      95             :                         if (resolvedEvent.OriginalEvent.EventStreamId != EventStoreBasedLastEventProcessedStore.EventsProcessedStreamName)
      96             :                         {
      97             :                                 RecordedEvent receivedEvent = resolvedEvent.OriginalEvent;
      98             :                                 // this.logProvider.Log(string.Format("Event appeared: number {0}, position {1}, type {2}", receivedEvent.EventNumber, resolvedEvent.OriginalPosition, receivedEvent.EventType), LogMessageLevel.Info);
      99             : 
     100             :                                 var eventToSend = EventConverter.GetEventFromData<IEvent<TAuthenticationToken>>(resolvedEvent.Event.Data, resolvedEvent.Event.EventType);
     101             : 
     102             :                                 Publish(eventToSend);
     103             : 
     104             :                                 SaveLastEventProcessedLocation(resolvedEvent.OriginalPosition.Value);
     105             :                         }
     106             :                 }
     107             : 
     108             :                 private void SaveLastEventProcessedLocation(Position position)
     109             :                 {
     110             :                         LastEventProcessedStore.EventLocation = EventStoreUtilities.PositionToFormattedString(position);
     111             :                 }
     112             : 
     113             :                 private void OnLiveProcessingStarted(EventStoreCatchUpSubscription catchUpSubscription)
     114             :                 {
     115             :                         // this.logProvider.Log("Subscription live processing started", LogMessageLevel.Info);
     116             :                 }
     117             : 
     118             :                 private void OnSubscriptionDropped(EventStoreCatchUpSubscription catchUpSubscription, SubscriptionDropReason dropReason, Exception exception)
     119             :                 {
     120             :                         string innerExceptionMessage = string.Empty;
     121             :                         if (exception != null && exception.InnerException != null)
     122             :                         {
     123             :                                 innerExceptionMessage = string.Format(" ({0})", exception.InnerException.Message);
     124             :                         }
     125             : 
     126             :                         // logProvider.Log("Subscription dropped (reason: " + SubscriptionDropReasonText(dropReason) + innerExceptionMessage + ")", LogMessageLevel.Info);
     127             : 
     128             :                         if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow)
     129             :                         {
     130             :                                 // This happens when the server detects that _liveQueue.Count >= MaxPushQueueSize which defaults to 10,000
     131             :                                 // In the forum James Nugent suggests "Wait and reconnect probably with back off" https://gist.github.com/jen20/6092666
     132             : 
     133             :                                 // For now we will just re-subscribe
     134             :                                 InitialiseCatchUpSubscription();
     135             :                         }
     136             : 
     137             :                         if (SubscriptionDropMayBeRecoverable(dropReason))
     138             :                         {
     139             :                                 InitialiseCatchUpSubscription();
     140             :                         }
     141             :                 }
     142             : 
     143             :                 private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason)
     144             :                 {
     145             :                         return !(dropReason == SubscriptionDropReason.AccessDenied ||
     146             :                                          dropReason == SubscriptionDropReason.NotAuthenticated ||
     147             :                                          dropReason == SubscriptionDropReason.UserInitiated);
     148             :                 }
     149             : 
     150             :                 private static string SubscriptionDropReasonText(SubscriptionDropReason reason)
     151             :                 {
     152             :                         string reasonText;
     153             :                         switch (reason)
     154             :                         {
     155             :                                 case SubscriptionDropReason.UserInitiated:
     156             :                                         reasonText = "User Initiated";
     157             :                                         break;
     158             :                                 case SubscriptionDropReason.NotAuthenticated:
     159             :                                         reasonText = "Not Authenticated";
     160             :                                         break;
     161             :                                 case SubscriptionDropReason.AccessDenied:
     162             :                                         reasonText = "Access Denied";
     163             :                                         break;
     164             :                                 case SubscriptionDropReason.SubscribingError:
     165             :                                         reasonText = "Subscribing Error";
     166             :                                         break;
     167             :                                 case SubscriptionDropReason.ServerError:
     168             :                                         reasonText = "Server Error";
     169             :                                         break;
     170             :                                 case SubscriptionDropReason.ConnectionClosed:
     171             :                                         reasonText = "Connection Closed";
     172             :                                         break;
     173             :                                 case SubscriptionDropReason.CatchUpError:
     174             :                                         reasonText = "CatchUp Error";
     175             :                                         break;
     176             :                                 case SubscriptionDropReason.ProcessingQueueOverflow:
     177             :                                         reasonText = "Processing Queue Overflow";
     178             :                                         break;
     179             :                                 case SubscriptionDropReason.EventHandlerException:
     180             :                                         reasonText = "Event Handler Exception";
     181             :                                         break;
     182             :                                 case SubscriptionDropReason.Unknown:
     183             :                                 default:
     184             :                                         reasonText = "Unknown";
     185             :                                         break;
     186             :                         }
     187             : 
     188             :                         return reasonText;
     189             :                 }
     190             :         }
     191             : }

Generated by: LCOV version 1.13