Line data Source code
1 : using System;
2 : using System.Collections.Generic;
3 : using Cqrs.Bus;
4 : using Cqrs.Events;
5 : using Cqrs.Messages;
6 : using EventStore.ClientAPI;
7 :
8 : namespace Cqrs.EventStore.Bus
9 : {
10 : public class EventStoreEventPublisher<TAuthenticationToken> : IEventPublisher<TAuthenticationToken>
11 0 : {
12 : protected Dictionary<Type, List<Action<IMessage>>> Routes { get; private set; }
13 :
14 : protected IEventStoreConnection EventStoreConnection { get; private set; }
15 :
16 : protected IStoreLastEventProcessed LastEventProcessedStore { get; private set; }
17 :
18 0 : public EventStoreEventPublisher(IEventStoreConnectionHelper eventStoreConnectionHelper, IStoreLastEventProcessed lastEventProcessedStore)
19 : {
20 : EventStoreConnection = eventStoreConnectionHelper.GetEventStoreConnection();
21 : LastEventProcessedStore = lastEventProcessedStore;
22 : Routes = new Dictionary<Type, List<Action<IMessage>>>();
23 : }
24 :
25 : #region Implementation of IEventPublisher<TAuthenticationToken>
26 :
27 1 : public void Publish<TEvent>(TEvent @event)
28 : where TEvent : IEvent<TAuthenticationToken>
29 : {
30 : List<Action<IMessage>> handlers;
31 : if (!Routes.TryGetValue(@event.GetType(), out handlers))
32 : return;
33 : foreach (Action<IMessage> handler in handlers)
34 : handler(@event);
35 : }
36 :
37 1 : public void Publish<TEvent>(IEnumerable<TEvent> events)
38 : where TEvent : IEvent<TAuthenticationToken>
39 : {
40 : foreach (TEvent @event in events)
41 : Publish(@event);
42 : }
43 :
44 : #endregion
45 :
46 0 : protected void InitialiseCatchUpSubscription()
47 : {
48 : Position position = GetLastEventProcessedLocation();
49 :
50 : EventStoreConnection.SubscribeToAllFrom(position, false, OnEventAppeared, OnLiveProcessingStarted, OnSubscriptionDropped);
51 : }
52 :
53 : private Position GetLastEventProcessedLocation()
54 : {
55 : return EventStoreUtilities.FormattedStringToPosition(LastEventProcessedStore.EventLocation);
56 : }
57 :
58 : private void OnEventAppeared(EventStoreCatchUpSubscription catchUpSubscription, ResolvedEvent resolvedEvent)
59 : {
60 : if (resolvedEvent.OriginalEvent.EventStreamId != EventStoreBasedLastEventProcessedStore.EventsProcessedStreamName)
61 : {
62 : RecordedEvent receivedEvent = resolvedEvent.OriginalEvent;
63 : // this.logProvider.Log(string.Format("Event appeared: number {0}, position {1}, type {2}", receivedEvent.EventNumber, resolvedEvent.OriginalPosition, receivedEvent.EventType), LogMessageLevel.Info);
64 :
65 : var eventToSend = EventConverter.GetEventFromData<IEvent<TAuthenticationToken>>(resolvedEvent.Event.Data, resolvedEvent.Event.EventType);
66 :
67 : Publish(eventToSend);
68 :
69 : SaveLastEventProcessedLocation(resolvedEvent.OriginalPosition.Value);
70 : }
71 : }
72 :
73 : private void SaveLastEventProcessedLocation(Position position)
74 : {
75 : LastEventProcessedStore.EventLocation = EventStoreUtilities.PositionToFormattedString(position);
76 : }
77 :
78 : private void OnLiveProcessingStarted(EventStoreCatchUpSubscription catchUpSubscription)
79 : {
80 : // this.logProvider.Log("Subscription live processing started", LogMessageLevel.Info);
81 : }
82 :
83 : private void OnSubscriptionDropped(EventStoreCatchUpSubscription catchUpSubscription, SubscriptionDropReason dropReason, Exception exception)
84 : {
85 : string innerExceptionMessage = string.Empty;
86 : if (exception != null && exception.InnerException != null)
87 : {
88 : innerExceptionMessage = string.Format(" ({0})", exception.InnerException.Message);
89 : }
90 :
91 : // logProvider.Log("Subscription dropped (reason: " + SubscriptionDropReasonText(dropReason) + innerExceptionMessage + ")", LogMessageLevel.Info);
92 :
93 : if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow)
94 : {
95 : // This happens when the server detects that _liveQueue.Count >= MaxPushQueueSize which defaults to 10,000
96 : // In the forum James Nugent suggests "Wait and reconnect probably with back off" https://gist.github.com/jen20/6092666
97 :
98 : // For now we will just re-subscribe
99 : InitialiseCatchUpSubscription();
100 : }
101 :
102 : if (SubscriptionDropMayBeRecoverable(dropReason))
103 : {
104 : InitialiseCatchUpSubscription();
105 : }
106 : }
107 :
108 : private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason)
109 : {
110 : return !(dropReason == SubscriptionDropReason.AccessDenied ||
111 : dropReason == SubscriptionDropReason.NotAuthenticated ||
112 : dropReason == SubscriptionDropReason.UserInitiated);
113 : }
114 :
115 : private static string SubscriptionDropReasonText(SubscriptionDropReason reason)
116 : {
117 : string reasonText;
118 : switch (reason)
119 : {
120 : case SubscriptionDropReason.UserInitiated:
121 : reasonText = "User Initiated";
122 : break;
123 : case SubscriptionDropReason.NotAuthenticated:
124 : reasonText = "Not Authenticated";
125 : break;
126 : case SubscriptionDropReason.AccessDenied:
127 : reasonText = "Access Denied";
128 : break;
129 : case SubscriptionDropReason.SubscribingError:
130 : reasonText = "Subscribing Error";
131 : break;
132 : case SubscriptionDropReason.ServerError:
133 : reasonText = "Server Error";
134 : break;
135 : case SubscriptionDropReason.ConnectionClosed:
136 : reasonText = "Connection Closed";
137 : break;
138 : case SubscriptionDropReason.CatchUpError:
139 : reasonText = "CatchUp Error";
140 : break;
141 : case SubscriptionDropReason.ProcessingQueueOverflow:
142 : reasonText = "Processing Queue Overflow";
143 : break;
144 : case SubscriptionDropReason.EventHandlerException:
145 : reasonText = "Event Handler Exception";
146 : break;
147 : case SubscriptionDropReason.Unknown:
148 : default:
149 : reasonText = "Unknown";
150 : break;
151 : }
152 :
153 : return reasonText;
154 : }
155 : }
156 : }
|