Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using Cqrs.Domain;
13 : using Cqrs.Events;
14 : using Cqrs.Messages;
15 : using EventStore.ClientAPI;
16 : using EventData = EventStore.ClientAPI.EventData;
17 :
18 : namespace Cqrs.EventStore
19 : {
20 : /// <summary>
21 : /// A Greg Young Event Store based <see cref="EventStore{TAuthenticationToken}"/>.
22 : /// </summary>
23 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
24 : public class EventStore<TAuthenticationToken> : IEventStore<TAuthenticationToken>
25 1 : {
26 : /// <summary>
27 : /// The pattern used to create stream names.
28 : /// </summary>
29 : protected const string CqrsEventStoreStreamNamePattern = "{0}/{1}";
30 :
31 : /// <summary>
32 : /// The <see cref="IEventBuilder{TAuthenticationToken}"/> used to build events.
33 : /// </summary>
34 : protected IEventBuilder<TAuthenticationToken> EventBuilder { get; set; }
35 :
36 : /// <summary>
37 : /// The <see cref="IEventDeserialiser{TAuthenticationToken}"/> used to deserialise events.
38 : /// </summary>
39 : protected IEventDeserialiser<TAuthenticationToken> EventDeserialiser { get; set; }
40 :
41 : /// <summary>
42 : /// The <see cref="IEventStoreConnection"/> used to read and write streams in the Greg Young Event Store.
43 : /// </summary>
44 : protected IEventStoreConnection EventStoreConnection { get; set; }
45 :
46 : /// <summary>
47 : /// Instantiates a new instance of <see cref="EventStore{TAuthenticationToken}"/>.
48 : /// </summary>
49 1 : public EventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, IEventStoreConnectionHelper eventStoreConnectionHelper)
50 : {
51 : EventBuilder = eventBuilder;
52 : EventDeserialiser = eventDeserialiser;
53 : EventStoreConnection = eventStoreConnectionHelper.GetEventStoreConnection();
54 : }
55 :
56 : #region Implementation of IEventStore
57 :
58 : /// <summary>
59 : /// Saves the provided <paramref name="event"/>.
60 : /// </summary>
61 : /// <typeparam name="T">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</typeparam>
62 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to be saved.</param>
63 1 : public void Save<T>(IEvent<TAuthenticationToken> @event)
64 : {
65 : Save(typeof (T), @event);
66 : }
67 :
68 : /// <summary>
69 : /// Saves the provided <paramref name="event"/>.
70 : /// </summary>
71 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
72 : /// <param name="event">The <see cref="IEvent{TAuthenticationToken}"/> to be saved.</param>
73 1 : public void Save(Type aggregateRootType, IEvent<TAuthenticationToken> @event)
74 : {
75 : EventData eventData = EventBuilder.CreateFrameworkEvent(@event);
76 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, @event.Id);
77 : using (EventStoreTransaction transaction = EventStoreConnection.StartTransactionAsync(streamName, ExpectedVersion.Any).Result)
78 : {
79 : WriteResult saveResult = EventStoreConnection.AppendToStreamAsync(streamName, ExpectedVersion.Any, new[] {eventData}).Result;
80 : WriteResult commitResult = transaction.CommitAsync().Result;
81 : }
82 : }
83 :
84 : /// <summary>
85 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <typeparamref name="T">aggregate root</typeparamref> with the ID matching the provided <paramref name="aggregateId"/>.
86 : /// </summary>
87 : /// <typeparam name="T">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</typeparam>
88 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
89 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
90 : /// <param name="fromVersion">Load events starting from this version</param>
91 : /// <remarks>
92 : /// The value of <paramref name="fromVersion"/> is zero based but the internals indexing of the EventStore is offset by <see cref="StreamPosition.Start"/>.
93 : /// Adjust the value of <paramref name="fromVersion"/> by <see cref="StreamPosition.Start"/>
94 : /// </remarks>
95 1 : public IEnumerable<IEvent<TAuthenticationToken>> Get<T>(Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
96 : {
97 : return Get(typeof (T), aggregateId, useLastEventOnly, fromVersion);
98 : }
99 :
100 : /// <summary>
101 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
102 : /// </summary>
103 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
104 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
105 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
106 : /// <param name="fromVersion">Load events starting from this version</param>
107 : /// <remarks>
108 : /// The value of <paramref name="fromVersion"/> is zero based but the internals indexing of the EventStore is offset by <see cref="StreamPosition.Start"/>.
109 : /// Adjust the value of <paramref name="fromVersion"/> by <see cref="StreamPosition.Start"/>
110 : /// </remarks>
111 1 : public IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
112 : {
113 : int startPosition = StreamPosition.Start;
114 : if (fromVersion > -1)
115 : startPosition = fromVersion + StreamPosition.Start;
116 : StreamEventsSlice eventCollection;
117 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
118 : if (useLastEventOnly)
119 : eventCollection = EventStoreConnection.ReadStreamEventsBackwardAsync(streamName, startPosition, 1, false).Result;
120 : else
121 : eventCollection = EventStoreConnection.ReadStreamEventsForwardAsync(streamName, startPosition, 200, false).Result;
122 : return eventCollection.Events.Select(EventDeserialiser.Deserialise);
123 : }
124 :
125 : /// <summary>
126 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
127 : /// </summary>
128 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
129 1 : public IEnumerable<Events.EventData> Get(Guid correlationId)
130 : {
131 : throw new NotImplementedException();
132 : }
133 :
134 : #endregion
135 :
136 : /// <summary>
137 : /// Requests the <paramref name="connection"/> responds to OnConnect client notifications.
138 : /// </summary>
139 : /// <param name="connection">The <see cref="IEventStoreConnection"/> that will be monitored.</param>
140 1 : protected virtual void ListenForNotificationsOnConnection(IEventStoreConnection connection)
141 : {
142 : connection.SubscribeToAllAsync(true, DisplayNotificationArrival, DisplaySubscriptionDropped).RunSynchronously();
143 : }
144 :
145 : private void DisplayNotificationArrival(EventStoreSubscription subscription, ResolvedEvent notification)
146 : {
147 : RecordedEvent @event = notification.Event;
148 : string eventTypePrefix = @event.Data.GetType().AssemblyQualifiedName;
149 : if (string.IsNullOrWhiteSpace(@event.EventType) || @event.EventType != eventTypePrefix)
150 : return;
151 : Console.WriteLine("{0} : {1}", eventTypePrefix, @event.EventType);
152 : }
153 :
154 : private void DisplaySubscriptionDropped(EventStoreSubscription subscription, SubscriptionDropReason reasonDropped, Exception exception)
155 : {
156 : Console.WriteLine("Opps");
157 : }
158 : }
159 : }
|