Line data Source code
1 : using System;
2 : using System.Collections.Generic;
3 : using System.Linq;
4 : using Cqrs.Events;
5 : using EventStore.ClientAPI;
6 : using EventData = EventStore.ClientAPI.EventData;
7 :
8 : namespace Cqrs.EventStore
9 : {
10 : public class EventStore<TAuthenticationToken> : IEventStore<TAuthenticationToken>
11 0 : {
12 : protected const string CqrsEventStoreStreamNamePattern = "{0}/{1}";
13 :
14 : protected IEventBuilder<TAuthenticationToken> EventBuilder { get; set; }
15 :
16 : protected IEventDeserialiser<TAuthenticationToken> EventDeserialiser { get; set; }
17 :
18 : protected IEventStoreConnection EventStoreConnection { get; set; }
19 :
20 0 : public EventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, IEventStoreConnectionHelper eventStoreConnectionHelper)
21 : {
22 : EventBuilder = eventBuilder;
23 : EventDeserialiser = eventDeserialiser;
24 : EventStoreConnection = eventStoreConnectionHelper.GetEventStoreConnection();
25 : }
26 :
27 : #region Implementation of IEventStore
28 :
29 0 : public void Save<T>(IEvent<TAuthenticationToken> @event)
30 : {
31 : Save(typeof (T), @event);
32 : }
33 :
34 0 : public void Save(Type aggregateRootType, IEvent<TAuthenticationToken> @event)
35 : {
36 : EventData eventData = EventBuilder.CreateFrameworkEvent(@event);
37 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, @event.Id);
38 : using (EventStoreTransaction transaction = EventStoreConnection.StartTransactionAsync(streamName, ExpectedVersion.Any).Result)
39 : {
40 : WriteResult saveResult = EventStoreConnection.AppendToStreamAsync(streamName, ExpectedVersion.Any, new[] {eventData}).Result;
41 : WriteResult commitResult = transaction.CommitAsync().Result;
42 : }
43 : }
44 :
45 : /// <remarks>
46 : /// The value of <paramref name="fromVersion"/> is zero based but the internals indexing of the EventStore is offset by <see cref="StreamPosition.Start"/>.
47 : /// Adjust the value of <paramref name="fromVersion"/> by <see cref="StreamPosition.Start"/>
48 : /// </remarks>
49 1 : public IEnumerable<IEvent<TAuthenticationToken>> Get<T>(Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
50 : {
51 : return Get(typeof (T), aggregateId, useLastEventOnly, fromVersion);
52 : }
53 :
54 0 : public IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
55 : {
56 : int startPosition = StreamPosition.Start;
57 : if (fromVersion > -1)
58 : startPosition = fromVersion + StreamPosition.Start;
59 : StreamEventsSlice eventCollection;
60 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateType.FullName, aggregateId);
61 : if (useLastEventOnly)
62 : eventCollection = EventStoreConnection.ReadStreamEventsBackwardAsync(streamName, startPosition, 1, false).Result;
63 : else
64 : eventCollection = EventStoreConnection.ReadStreamEventsForwardAsync(streamName, startPosition, 200, false).Result;
65 : return eventCollection.Events.Select(EventDeserialiser.Deserialise);
66 : }
67 :
68 0 : public IEnumerable<Events.EventData> Get(Guid correlationId)
69 : {
70 : throw new NotImplementedException();
71 : }
72 :
73 : #endregion
74 :
75 0 : protected virtual void ListenForNotificationsOnConnection(IEventStoreConnection connection)
76 : {
77 : connection.SubscribeToAllAsync(true, DisplayNotificationArrival, DisplaySubscriptionDropped).RunSynchronously();
78 : }
79 :
80 : private void DisplayNotificationArrival(EventStoreSubscription subscription, ResolvedEvent notification)
81 : {
82 : RecordedEvent @event = notification.Event;
83 : string eventTypePrefix = @event.Data.GetType().AssemblyQualifiedName;
84 : if (string.IsNullOrWhiteSpace(@event.EventType) || @event.EventType != eventTypePrefix)
85 : return;
86 : Console.WriteLine("{0} : {1}", eventTypePrefix, @event.EventType);
87 : }
88 :
89 : private void DisplaySubscriptionDropped(EventStoreSubscription subscription, SubscriptionDropReason reasonDropped, Exception exception)
90 : {
91 : Console.WriteLine("Opps");
92 : }
93 : }
94 : }
|