Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using Cqrs.Bus;
12 : using Cqrs.Events;
13 : using Cqrs.Messages;
14 : using EventStore.ClientAPI;
15 :
16 : namespace Cqrs.EventStore.Bus
17 : {
18 : /// <summary>
19 : /// A <see cref="IEventPublisher{TAuthenticationToken}"/> that uses Greg Young's Event Store.
20 : /// </summary>
21 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
22 : public class EventStoreEventPublisher<TAuthenticationToken> : IEventPublisher<TAuthenticationToken>
23 1 : {
24 : /// <summary>
25 : /// The actions to execute per <see cref="Type"/>
26 : /// </summary>
27 : protected Dictionary<Type, List<Action<IMessage>>> Routes { get; private set; }
28 :
29 : /// <summary>
30 : /// The <see cref="IEventStoreConnection"/> used to read and write streams in the Greg Young Event Store.
31 : /// </summary>
32 : protected IEventStoreConnection EventStoreConnection { get; private set; }
33 :
34 : /// <summary>
35 : /// The store that hold stream position information.
36 : /// </summary>
37 : protected IStoreLastEventProcessed LastEventProcessedStore { get; private set; }
38 :
39 : /// <summary>
40 : /// Instantiates a new instance of <see cref="EventStoreEventPublisher{TAuthenticationToken}"/>
41 : /// </summary>
42 : /// <param name="eventStoreConnectionHelper">The <see cref="IEventStoreConnection"/> used to read and write streams in the Greg Young Event Store.</param>
43 : /// <param name="lastEventProcessedStore">The store that hold stream position information.</param>
44 1 : public EventStoreEventPublisher(IEventStoreConnectionHelper eventStoreConnectionHelper, IStoreLastEventProcessed lastEventProcessedStore)
45 : {
46 : EventStoreConnection = eventStoreConnectionHelper.GetEventStoreConnection();
47 : LastEventProcessedStore = lastEventProcessedStore;
48 : Routes = new Dictionary<Type, List<Action<IMessage>>>();
49 : }
50 :
51 : #region Implementation of IEventPublisher<TAuthenticationToken>
52 :
53 : /// <summary>
54 : /// Publishes the provided <paramref name="event"/> on the event bus.
55 : /// </summary>
56 1 : public void Publish<TEvent>(TEvent @event)
57 : where TEvent : IEvent<TAuthenticationToken>
58 : {
59 : List<Action<IMessage>> handlers;
60 : if (!Routes.TryGetValue(@event.GetType(), out handlers))
61 : return;
62 : foreach (Action<IMessage> handler in handlers)
63 : handler(@event);
64 : }
65 :
66 : /// <summary>
67 : /// Publishes the provided <paramref name="events"/> on the event bus.
68 : /// </summary>
69 1 : public void Publish<TEvent>(IEnumerable<TEvent> events)
70 : where TEvent : IEvent<TAuthenticationToken>
71 : {
72 : foreach (TEvent @event in events)
73 : Publish(@event);
74 : }
75 :
76 : #endregion
77 :
78 : /// <summary>
79 : /// Reads the position the store was last within the stream and subscribes requesting all events prior to that position aren't replayed.
80 : /// </summary>
81 1 : protected void InitialiseCatchUpSubscription()
82 : {
83 : Position position = GetLastEventProcessedLocation();
84 :
85 : EventStoreConnection.SubscribeToAllFrom(position, false, OnEventAppeared, OnLiveProcessingStarted, OnSubscriptionDropped);
86 : }
87 :
88 : private Position GetLastEventProcessedLocation()
89 : {
90 : return EventStoreUtilities.FormattedStringToPosition(LastEventProcessedStore.EventLocation);
91 : }
92 :
93 : private void OnEventAppeared(EventStoreCatchUpSubscription catchUpSubscription, ResolvedEvent resolvedEvent)
94 : {
95 : if (resolvedEvent.OriginalEvent.EventStreamId != EventStoreBasedLastEventProcessedStore.EventsProcessedStreamName)
96 : {
97 : RecordedEvent receivedEvent = resolvedEvent.OriginalEvent;
98 : // this.logProvider.Log(string.Format("Event appeared: number {0}, position {1}, type {2}", receivedEvent.EventNumber, resolvedEvent.OriginalPosition, receivedEvent.EventType), LogMessageLevel.Info);
99 :
100 : var eventToSend = EventConverter.GetEventFromData<IEvent<TAuthenticationToken>>(resolvedEvent.Event.Data, resolvedEvent.Event.EventType);
101 :
102 : Publish(eventToSend);
103 :
104 : SaveLastEventProcessedLocation(resolvedEvent.OriginalPosition.Value);
105 : }
106 : }
107 :
108 : private void SaveLastEventProcessedLocation(Position position)
109 : {
110 : LastEventProcessedStore.EventLocation = EventStoreUtilities.PositionToFormattedString(position);
111 : }
112 :
113 : private void OnLiveProcessingStarted(EventStoreCatchUpSubscription catchUpSubscription)
114 : {
115 : // this.logProvider.Log("Subscription live processing started", LogMessageLevel.Info);
116 : }
117 :
118 : private void OnSubscriptionDropped(EventStoreCatchUpSubscription catchUpSubscription, SubscriptionDropReason dropReason, Exception exception)
119 : {
120 : string innerExceptionMessage = string.Empty;
121 : if (exception != null && exception.InnerException != null)
122 : {
123 : innerExceptionMessage = string.Format(" ({0})", exception.InnerException.Message);
124 : }
125 :
126 : // logProvider.Log("Subscription dropped (reason: " + SubscriptionDropReasonText(dropReason) + innerExceptionMessage + ")", LogMessageLevel.Info);
127 :
128 : if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow)
129 : {
130 : // This happens when the server detects that _liveQueue.Count >= MaxPushQueueSize which defaults to 10,000
131 : // In the forum James Nugent suggests "Wait and reconnect probably with back off" https://gist.github.com/jen20/6092666
132 :
133 : // For now we will just re-subscribe
134 : InitialiseCatchUpSubscription();
135 : }
136 :
137 : if (SubscriptionDropMayBeRecoverable(dropReason))
138 : {
139 : InitialiseCatchUpSubscription();
140 : }
141 : }
142 :
143 : private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason)
144 : {
145 : return !(dropReason == SubscriptionDropReason.AccessDenied ||
146 : dropReason == SubscriptionDropReason.NotAuthenticated ||
147 : dropReason == SubscriptionDropReason.UserInitiated);
148 : }
149 :
150 : private static string SubscriptionDropReasonText(SubscriptionDropReason reason)
151 : {
152 : string reasonText;
153 : switch (reason)
154 : {
155 : case SubscriptionDropReason.UserInitiated:
156 : reasonText = "User Initiated";
157 : break;
158 : case SubscriptionDropReason.NotAuthenticated:
159 : reasonText = "Not Authenticated";
160 : break;
161 : case SubscriptionDropReason.AccessDenied:
162 : reasonText = "Access Denied";
163 : break;
164 : case SubscriptionDropReason.SubscribingError:
165 : reasonText = "Subscribing Error";
166 : break;
167 : case SubscriptionDropReason.ServerError:
168 : reasonText = "Server Error";
169 : break;
170 : case SubscriptionDropReason.ConnectionClosed:
171 : reasonText = "Connection Closed";
172 : break;
173 : case SubscriptionDropReason.CatchUpError:
174 : reasonText = "CatchUp Error";
175 : break;
176 : case SubscriptionDropReason.ProcessingQueueOverflow:
177 : reasonText = "Processing Queue Overflow";
178 : break;
179 : case SubscriptionDropReason.EventHandlerException:
180 : reasonText = "Event Handler Exception";
181 : break;
182 : case SubscriptionDropReason.Unknown:
183 : default:
184 : reasonText = "Unknown";
185 : break;
186 : }
187 :
188 : return reasonText;
189 : }
190 : }
191 : }
|