LCOV - code coverage report
Current view: top level - Cqrs.EventStore/Bus - EventStoreEventPublisher.cs Hit Total Coverage
Test: doc-coverage.info Lines: 2 5 40.0 %
Date: 2017-07-26

          Line data    Source code
       1             : using System;
       2             : using System.Collections.Generic;
       3             : using Cqrs.Bus;
       4             : using Cqrs.Events;
       5             : using Cqrs.Messages;
       6             : using EventStore.ClientAPI;
       7             : 
       8             : namespace Cqrs.EventStore.Bus
       9             : {
      10             :         public class EventStoreEventPublisher<TAuthenticationToken> : IEventPublisher<TAuthenticationToken>
      11           0 :         {
      12             :                 protected Dictionary<Type, List<Action<IMessage>>> Routes { get; private set; }
      13             : 
      14             :                 protected IEventStoreConnection EventStoreConnection { get; private set; }
      15             : 
      16             :                 protected IStoreLastEventProcessed LastEventProcessedStore { get; private set; }
      17             : 
      18           0 :                 public EventStoreEventPublisher(IEventStoreConnectionHelper eventStoreConnectionHelper, IStoreLastEventProcessed lastEventProcessedStore)
      19             :                 {
      20             :                         EventStoreConnection = eventStoreConnectionHelper.GetEventStoreConnection();
      21             :                         LastEventProcessedStore = lastEventProcessedStore;
      22             :                         Routes = new Dictionary<Type, List<Action<IMessage>>>();
      23             :                 }
      24             : 
      25             :                 #region Implementation of IEventPublisher<TAuthenticationToken>
      26             : 
      27           1 :                 public void Publish<TEvent>(TEvent @event)
      28             :                         where TEvent : IEvent<TAuthenticationToken>
      29             :                 {
      30             :                         List<Action<IMessage>> handlers;
      31             :                         if (!Routes.TryGetValue(@event.GetType(), out handlers))
      32             :                                 return;
      33             :                         foreach (Action<IMessage> handler in handlers)
      34             :                                 handler(@event);
      35             :                 }
      36             : 
      37           1 :                 public void Publish<TEvent>(IEnumerable<TEvent> events)
      38             :                         where TEvent : IEvent<TAuthenticationToken>
      39             :                 {
      40             :                         foreach (TEvent @event in events)
      41             :                                 Publish(@event);
      42             :                 }
      43             : 
      44             :                 #endregion
      45             : 
      46           0 :                 protected void InitialiseCatchUpSubscription()
      47             :                 {
      48             :                         Position position = GetLastEventProcessedLocation();
      49             : 
      50             :                         EventStoreConnection.SubscribeToAllFrom(position, false, OnEventAppeared, OnLiveProcessingStarted, OnSubscriptionDropped);
      51             :                 }
      52             : 
      53             :                 private Position GetLastEventProcessedLocation()
      54             :                 {
      55             :                         return EventStoreUtilities.FormattedStringToPosition(LastEventProcessedStore.EventLocation);
      56             :                 }
      57             : 
      58             :                 private void OnEventAppeared(EventStoreCatchUpSubscription catchUpSubscription, ResolvedEvent resolvedEvent)
      59             :                 {
      60             :                         if (resolvedEvent.OriginalEvent.EventStreamId != EventStoreBasedLastEventProcessedStore.EventsProcessedStreamName)
      61             :                         {
      62             :                                 RecordedEvent receivedEvent = resolvedEvent.OriginalEvent;
      63             :                                 // this.logProvider.Log(string.Format("Event appeared: number {0}, position {1}, type {2}", receivedEvent.EventNumber, resolvedEvent.OriginalPosition, receivedEvent.EventType), LogMessageLevel.Info);
      64             : 
      65             :                                 var eventToSend = EventConverter.GetEventFromData<IEvent<TAuthenticationToken>>(resolvedEvent.Event.Data, resolvedEvent.Event.EventType);
      66             : 
      67             :                                 Publish(eventToSend);
      68             : 
      69             :                                 SaveLastEventProcessedLocation(resolvedEvent.OriginalPosition.Value);
      70             :                         }
      71             :                 }
      72             : 
      73             :                 private void SaveLastEventProcessedLocation(Position position)
      74             :                 {
      75             :                         LastEventProcessedStore.EventLocation = EventStoreUtilities.PositionToFormattedString(position);
      76             :                 }
      77             : 
      78             :                 private void OnLiveProcessingStarted(EventStoreCatchUpSubscription catchUpSubscription)
      79             :                 {
      80             :                         // this.logProvider.Log("Subscription live processing started", LogMessageLevel.Info);
      81             :                 }
      82             : 
      83             :                 private void OnSubscriptionDropped(EventStoreCatchUpSubscription catchUpSubscription, SubscriptionDropReason dropReason, Exception exception)
      84             :                 {
      85             :                         string innerExceptionMessage = string.Empty;
      86             :                         if (exception != null && exception.InnerException != null)
      87             :                         {
      88             :                                 innerExceptionMessage = string.Format(" ({0})", exception.InnerException.Message);
      89             :                         }
      90             : 
      91             :                         // logProvider.Log("Subscription dropped (reason: " + SubscriptionDropReasonText(dropReason) + innerExceptionMessage + ")", LogMessageLevel.Info);
      92             : 
      93             :                         if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow)
      94             :                         {
      95             :                                 // This happens when the server detects that _liveQueue.Count >= MaxPushQueueSize which defaults to 10,000
      96             :                                 // In the forum James Nugent suggests "Wait and reconnect probably with back off" https://gist.github.com/jen20/6092666
      97             : 
      98             :                                 // For now we will just re-subscribe
      99             :                                 InitialiseCatchUpSubscription();
     100             :                         }
     101             : 
     102             :                         if (SubscriptionDropMayBeRecoverable(dropReason))
     103             :                         {
     104             :                                 InitialiseCatchUpSubscription();
     105             :                         }
     106             :                 }
     107             : 
     108             :                 private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason)
     109             :                 {
     110             :                         return !(dropReason == SubscriptionDropReason.AccessDenied ||
     111             :                                          dropReason == SubscriptionDropReason.NotAuthenticated ||
     112             :                                          dropReason == SubscriptionDropReason.UserInitiated);
     113             :                 }
     114             : 
     115             :                 private static string SubscriptionDropReasonText(SubscriptionDropReason reason)
     116             :                 {
     117             :                         string reasonText;
     118             :                         switch (reason)
     119             :                         {
     120             :                                 case SubscriptionDropReason.UserInitiated:
     121             :                                         reasonText = "User Initiated";
     122             :                                         break;
     123             :                                 case SubscriptionDropReason.NotAuthenticated:
     124             :                                         reasonText = "Not Authenticated";
     125             :                                         break;
     126             :                                 case SubscriptionDropReason.AccessDenied:
     127             :                                         reasonText = "Access Denied";
     128             :                                         break;
     129             :                                 case SubscriptionDropReason.SubscribingError:
     130             :                                         reasonText = "Subscribing Error";
     131             :                                         break;
     132             :                                 case SubscriptionDropReason.ServerError:
     133             :                                         reasonText = "Server Error";
     134             :                                         break;
     135             :                                 case SubscriptionDropReason.ConnectionClosed:
     136             :                                         reasonText = "Connection Closed";
     137             :                                         break;
     138             :                                 case SubscriptionDropReason.CatchUpError:
     139             :                                         reasonText = "CatchUp Error";
     140             :                                         break;
     141             :                                 case SubscriptionDropReason.ProcessingQueueOverflow:
     142             :                                         reasonText = "Processing Queue Overflow";
     143             :                                         break;
     144             :                                 case SubscriptionDropReason.EventHandlerException:
     145             :                                         reasonText = "Event Handler Exception";
     146             :                                         break;
     147             :                                 case SubscriptionDropReason.Unknown:
     148             :                                 default:
     149             :                                         reasonText = "Unknown";
     150             :                                         break;
     151             :                         }
     152             : 
     153             :                         return reasonText;
     154             :                 }
     155             :         }
     156             : }

Generated by: LCOV version 1.10