Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using Cqrs.Domain;
13 : using Cqrs.Events;
14 : using Cqrs.Messages;
15 : using EventStore.ClientAPI;
16 : using EventData = EventStore.ClientAPI.EventData;
17 :
18 : namespace Cqrs.EventStore
19 : {
20 : /// <summary>
21 : /// A Greg Young Event Store based <see cref="EventStore{TAuthenticationToken}"/>.
22 : /// </summary>
23 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
24 : public class EventStore<TAuthenticationToken>
25 : : IEventStore<TAuthenticationToken>
26 1 : {
27 : /// <summary>
28 : /// The pattern used to create stream names.
29 : /// </summary>
30 : protected const string CqrsEventStoreStreamNamePattern = "{0}/{1}";
31 :
32 : /// <summary>
33 : /// The <see cref="IEventBuilder{TAuthenticationToken}"/> used to build events.
34 : /// </summary>
35 : protected IEventBuilder<TAuthenticationToken> EventBuilder { get; set; }
36 :
37 : /// <summary>
38 : /// The <see cref="IEventDeserialiser{TAuthenticationToken}"/> used to deserialise events.
39 : /// </summary>
40 : protected IEventDeserialiser<TAuthenticationToken> EventDeserialiser { get; set; }
41 :
42 : /// <summary>
43 : /// The <see cref="IEventStoreConnection"/> used to read and write streams in the Greg Young Event Store.
44 : /// </summary>
45 : protected IEventStoreConnection EventStoreConnection { get; set; }
46 :
47 : /// <summary>
48 : /// Instantiates a new instance of <see cref="EventStore{TAuthenticationToken}"/>.
49 : /// </summary>
50 1 : public EventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, IEventStoreConnectionHelper eventStoreConnectionHelper)
51 : {
52 : EventBuilder = eventBuilder;
53 : EventDeserialiser = eventDeserialiser;
54 : EventStoreConnection = eventStoreConnectionHelper.GetEventStoreConnection();
55 : }
56 :
57 : #region Implementation of IEventStore
58 :
59 : /// <summary>
60 : /// Saves the provided <paramref name="event"/>.
61 : /// </summary>
62 : /// <typeparam name="T">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</typeparam>
63 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to be saved.</param>
64 1 : public void Save<T>(IEvent<TAuthenticationToken> @event)
65 : {
66 : Save(typeof (T), @event);
67 : }
68 :
69 : /// <summary>
70 : /// Saves the provided <paramref name="event"/>.
71 : /// </summary>
72 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
73 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to be saved.</param>
74 1 : public void Save(Type aggregateRootType, IEvent<TAuthenticationToken> @event)
75 : {
76 : EventData eventData = EventBuilder.CreateFrameworkEvent(@event);
77 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, @event.GetIdentity());
78 : using (EventStoreTransaction transaction = EventStoreConnection.StartTransactionAsync(streamName, ExpectedVersion.Any).Result)
79 : {
80 : WriteResult saveResult = EventStoreConnection.AppendToStreamAsync(streamName, ExpectedVersion.Any, new[] {eventData}).Result;
81 : WriteResult commitResult = transaction.CommitAsync().Result;
82 : }
83 : }
84 :
85 : /// <summary>
86 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <typeparamref name="T">aggregate root</typeparamref> with the ID matching the provided <paramref name="aggregateId"/>.
87 : /// </summary>
88 : /// <typeparam name="T">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</typeparam>
89 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
90 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
91 : /// <param name="fromVersion">Load events starting from this version</param>
92 : /// <remarks>
93 : /// The value of <paramref name="fromVersion"/> is zero based but the internals indexing of the EventStore is offset by <see cref="StreamPosition.Start"/>.
94 : /// Adjust the value of <paramref name="fromVersion"/> by <see cref="StreamPosition.Start"/>
95 : /// </remarks>
96 1 : public IEnumerable<IEvent<TAuthenticationToken>> Get<T>(Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
97 : {
98 : return Get(typeof (T), aggregateId, useLastEventOnly, fromVersion);
99 : }
100 :
101 : /// <summary>
102 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
103 : /// </summary>
104 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
105 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
106 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
107 : /// <param name="fromVersion">Load events starting from this version</param>
108 : /// <remarks>
109 : /// The value of <paramref name="fromVersion"/> is zero based but the internals indexing of the EventStore is offset by <see cref="StreamPosition.Start"/>.
110 : /// Adjust the value of <paramref name="fromVersion"/> by <see cref="StreamPosition.Start"/>
111 : /// </remarks>
112 1 : public IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
113 : {
114 : int startPosition = StreamPosition.Start;
115 : if (fromVersion > -1)
116 : startPosition = fromVersion + StreamPosition.Start;
117 : StreamEventsSlice eventCollection;
118 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
119 : if (useLastEventOnly)
120 : eventCollection = EventStoreConnection.ReadStreamEventsBackwardAsync(streamName, startPosition, 1, false).Result;
121 : else
122 : eventCollection = EventStoreConnection.ReadStreamEventsForwardAsync(streamName, startPosition, 200, false).Result;
123 : return eventCollection.Events.Select(EventDeserialiser.Deserialise);
124 : }
125 :
126 : /// <summary>
127 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="version"/>.
128 : /// </summary>
129 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
130 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
131 : /// <param name="version">Load events up-to and including from this version</param>
132 1 : public IEnumerable<IEvent<TAuthenticationToken>> GetToVersion(Type aggregateRootType, Guid aggregateId, int version)
133 : {
134 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
135 : StreamEventsSlice eventCollection = EventStoreConnection.ReadStreamEventsForwardAsync(streamName, StreamPosition.Start, version, false).Result;
136 : return eventCollection.Events.Select(EventDeserialiser.Deserialise);
137 : }
138 :
139 : /// <summary>
140 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <typeparamref name="T">aggregate root</typeparamref> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="version"/>.
141 : /// </summary>
142 : /// <typeparam name="T">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</typeparam>
143 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
144 : /// <param name="version">Load events up-to and including from this version</param>
145 1 : public IEnumerable<IEvent<TAuthenticationToken>> GetToVersion<T>(Guid aggregateId, int version)
146 : {
147 : return GetToVersion(typeof(T), aggregateId, version);
148 : }
149 :
150 : /// <summary>
151 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="versionedDate"/>.
152 : /// </summary>
153 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
154 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
155 : /// <param name="versionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
156 1 : public IEnumerable<IEvent<TAuthenticationToken>> GetToDate(Type aggregateRootType, Guid aggregateId, DateTime versionedDate)
157 : {
158 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
159 : StreamEventsSlice eventCollection = EventStoreConnection.ReadStreamEventsForwardAsync(streamName, StreamPosition.Start, 200, false).Result;
160 : return eventCollection.Events.Select(EventDeserialiser.Deserialise).Where(x => x.TimeStamp <= versionedDate);
161 : }
162 :
163 : /// <summary>
164 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <typeparamref name="T">aggregate root</typeparamref> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="versionedDate"/>.
165 : /// </summary>
166 : /// <typeparam name="T">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</typeparam>
167 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
168 : /// <param name="versionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
169 1 : public IEnumerable<IEvent<TAuthenticationToken>> GetToDate<T>(Guid aggregateId, DateTime versionedDate)
170 : {
171 : return GetToDate(typeof(T), aggregateId, versionedDate);
172 : }
173 :
174 : /// <summary>
175 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> from and including the provided <paramref name="fromVersionedDate"/> up to and including the provided <paramref name="toVersionedDate"/>.
176 : /// </summary>
177 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
178 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
179 : /// <param name="fromVersionedDate">Load events from and including from this <see cref="DateTime"/></param>
180 : /// <param name="toVersionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
181 1 : public IEnumerable<IEvent<TAuthenticationToken>> GetBetweenDates(Type aggregateRootType, Guid aggregateId, DateTime fromVersionedDate, DateTime toVersionedDate)
182 : {
183 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
184 : StreamEventsSlice eventCollection = EventStoreConnection.ReadStreamEventsForwardAsync(streamName, StreamPosition.Start, 200, false).Result;
185 : return eventCollection.Events.Select(EventDeserialiser.Deserialise).Where(eventData => eventData.TimeStamp >= fromVersionedDate && eventData.TimeStamp <= toVersionedDate);
186 : }
187 :
188 : /// <summary>
189 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <typeparamref name="T">aggregate root</typeparamref> with the ID matching the provided <paramref name="aggregateId"/> from and including the provided <paramref name="fromVersionedDate"/> up to and including the provided <paramref name="toVersionedDate"/>.
190 : /// </summary>
191 : /// <typeparam name="T">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</typeparam>
192 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
193 : /// <param name="fromVersionedDate">Load events from and including from this <see cref="DateTime"/></param>
194 : /// <param name="toVersionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
195 1 : public IEnumerable<IEvent<TAuthenticationToken>> GetBetweenDates<T>(Guid aggregateId, DateTime fromVersionedDate, DateTime toVersionedDate)
196 : {
197 : return GetBetweenDates(typeof(T), aggregateId, fromVersionedDate, toVersionedDate);
198 : }
199 :
200 : /// <summary>
201 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
202 : /// </summary>
203 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
204 1 : public IEnumerable<Events.EventData> Get(Guid correlationId)
205 : {
206 : throw new NotImplementedException();
207 : }
208 :
209 : #endregion
210 :
211 : /// <summary>
212 : /// Requests the <paramref name="connection"/> responds to OnConnect client notifications.
213 : /// </summary>
214 : /// <param name="connection">The <see cref="IEventStoreConnection"/> that will be monitored.</param>
215 1 : protected virtual void ListenForNotificationsOnConnection(IEventStoreConnection connection)
216 : {
217 : connection.SubscribeToAllAsync(true, DisplayNotificationArrival, DisplaySubscriptionDropped).RunSynchronously();
218 : }
219 :
220 : private void DisplayNotificationArrival(EventStoreSubscription subscription, ResolvedEvent notification)
221 : {
222 : RecordedEvent @event = notification.Event;
223 : string eventTypePrefix = @event.Data.GetType().AssemblyQualifiedName;
224 : if (string.IsNullOrWhiteSpace(@event.EventType) || @event.EventType != eventTypePrefix)
225 : return;
226 : Console.WriteLine("{0} : {1}", eventTypePrefix, @event.EventType);
227 : }
228 :
229 : private void DisplaySubscriptionDropped(EventStoreSubscription subscription, SubscriptionDropReason reasonDropped, Exception exception)
230 : {
231 : Console.WriteLine("Opps");
232 : }
233 : }
234 : }
|