Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using cdmdotnet.Logging;
13 : using Cqrs.Domain.Exceptions;
14 : using Cqrs.Domain.Factories;
15 : using Cqrs.Events;
16 :
17 : namespace Cqrs.Domain
18 : {
19 : /// <summary>
20 : /// Provides basic repository methods for operations with instances of <see cref="ISaga{TAuthenticationToken}"/>.
21 : /// </summary>
22 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of authentication token.</typeparam>
23 : public class SagaRepository<TAuthenticationToken> : ISagaRepository<TAuthenticationToken>
24 1 : {
25 : /// <summary>
26 : /// Gets or sets the <see cref="IEventStore{TAuthenticationToken}"/> used to store and retrieve events from.
27 : /// </summary>
28 : protected IEventStore<TAuthenticationToken> EventStore { get; private set; }
29 :
30 : /// <summary>
31 : /// Gets or sets the Publisher used to publish events on once saved into the <see cref="EventStore"/>.
32 : /// </summary>
33 : protected IEventPublisher<TAuthenticationToken> Publisher { get; private set; }
34 :
35 : /// <summary>
36 : /// Gets or set the <see cref="IAggregateFactory"/>.
37 : /// </summary>
38 : protected IAggregateFactory SagaFactory { get; private set; }
39 :
40 : /// <summary>
41 : /// Gets or set the <see cref="ICorrelationIdHelper"/>.
42 : /// </summary>
43 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
44 :
45 : /// <summary>
46 : /// Instantiates a new instance of <see cref="SagaRepository{TAuthenticationToken}"/>
47 : /// </summary>
48 1 : public SagaRepository(IAggregateFactory sagaFactory, IEventStore<TAuthenticationToken> eventStore, IEventPublisher<TAuthenticationToken> publisher, ICorrelationIdHelper correlationIdHelper)
49 : {
50 : EventStore = eventStore;
51 : Publisher = publisher;
52 : CorrelationIdHelper = correlationIdHelper;
53 : SagaFactory = sagaFactory;
54 : }
55 :
56 : /// <summary>
57 : /// Save and persist the provided <paramref name="saga"/>, optionally providing the version number the <see cref="ISaga{TAuthenticationToken}"/> is expected to be at.
58 : /// </summary>
59 : /// <typeparam name="TSaga">The <see cref="Type"/> of the <see cref="ISaga{TAuthenticationToken}"/>.</typeparam>
60 : /// <param name="saga">The <see cref="ISaga{TAuthenticationToken}"/> to save and persist.</param>
61 : /// <param name="expectedVersion">The version number the <see cref="ISaga{TAuthenticationToken}"/> is expected to be at.</param>
62 1 : public virtual void Save<TSaga>(TSaga saga, int? expectedVersion = null)
63 : where TSaga : ISaga<TAuthenticationToken>
64 : {
65 : IList<ISagaEvent<TAuthenticationToken>> uncommittedChanges = saga.GetUncommittedChanges().ToList();
66 : if (!uncommittedChanges.Any())
67 : return;
68 :
69 : if (expectedVersion != null)
70 : {
71 : IEnumerable<IEvent<TAuthenticationToken>> eventStoreResults = EventStore.Get(saga.GetType(), saga.Id, false, expectedVersion.Value);
72 : if (eventStoreResults.Any())
73 : throw new ConcurrencyException(saga.Id);
74 : }
75 :
76 : var eventsToPublish = new List<ISagaEvent<TAuthenticationToken>>();
77 :
78 : int i = 0;
79 : int version = saga.Version;
80 : foreach (ISagaEvent<TAuthenticationToken> @event in uncommittedChanges)
81 : {
82 : if (@event.Id == Guid.Empty)
83 : @event.Id = saga.Id;
84 : if (@event.Id == Guid.Empty)
85 : throw new AggregateOrEventMissingIdException(saga.GetType(), @event.GetType());
86 :
87 : i++;
88 : version++;
89 :
90 : @event.Version = version;
91 : @event.TimeStamp = DateTimeOffset.UtcNow;
92 : @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
93 : EventStore.Save(saga.GetType(), @event);
94 : eventsToPublish.Add(@event);
95 : }
96 :
97 : saga.MarkChangesAsCommitted();
98 : foreach (ISagaEvent<TAuthenticationToken> @event in eventsToPublish)
99 : PublishEvent(@event);
100 : }
101 :
102 : /// <summary>
103 : /// Publish the saved <paramref name="event"/>.
104 : /// </summary>
105 1 : protected virtual void PublishEvent(ISagaEvent<TAuthenticationToken> @event)
106 : {
107 : Publisher.Publish(@event);
108 : }
109 :
110 : /// <summary>
111 : /// Retrieves an <see cref="ISaga{TAuthenticationToken}"/> of type <typeparamref name="TSaga"/>.
112 : /// </summary>
113 : /// <typeparam name="TSaga">The <see cref="Type"/> of the <see cref="ISaga{TAuthenticationToken}"/>.</typeparam>
114 : /// <param name="sagaId">The identifier of the <see cref="ISaga{TAuthenticationToken}"/> to retrieve.</param>
115 : /// <param name="events">
116 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="ISaga{TAuthenticationToken}"/>.
117 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
118 : /// </param>
119 1 : public virtual TSaga Get<TSaga>(Guid sagaId, IList<ISagaEvent<TAuthenticationToken>> events = null)
120 : where TSaga : ISaga<TAuthenticationToken>
121 : {
122 : return LoadSaga<TSaga>(sagaId, events);
123 : }
124 :
125 : /// <summary>
126 : /// Calls <see cref="IAggregateFactory.Create"/> to get a, <typeparamref name="TSaga"/>.
127 : /// </summary>
128 : /// <typeparam name="TSaga">The <see cref="Type"/> of <see cref="ISaga{TAuthenticationToken}"/>.</typeparam>
129 : /// <param name="id">The id of the <typeparamref name="TSaga"/> to create.</param>
130 1 : protected virtual TSaga CreateSaga<TSaga>(Guid id)
131 : where TSaga : ISaga<TAuthenticationToken>
132 : {
133 : var saga = SagaFactory.Create<TSaga>(id);
134 :
135 : return saga;
136 : }
137 :
138 : /// <summary>
139 : /// Calls <see cref="IAggregateFactory.Create"/> to get a, <typeparamref name="TSaga"/> and then calls <see cref="LoadSagaHistory{TSaga}"/>.
140 : /// </summary>
141 : /// <typeparam name="TSaga">The <see cref="Type"/> of <see cref="ISaga{TAuthenticationToken}"/>.</typeparam>
142 : /// <param name="id">The id of the <typeparamref name="TSaga"/> to create.</param>
143 : /// <param name="events">
144 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="ISaga{TAuthenticationToken}"/>.
145 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
146 : /// </param>
147 1 : protected virtual TSaga LoadSaga<TSaga>(Guid id, IList<ISagaEvent<TAuthenticationToken>> events = null)
148 : where TSaga : ISaga<TAuthenticationToken>
149 : {
150 : var saga = SagaFactory.Create<TSaga>(id);
151 :
152 : LoadSagaHistory(saga, events);
153 : return saga;
154 : }
155 :
156 : /// <summary>
157 : /// If <paramref name="events"/> is null, loads the events from <see cref="EventStore"/>, checks for duplicates and then
158 : /// rehydrates the <paramref name="saga"/> with the events.
159 : /// </summary>
160 : /// <typeparam name="TSaga">The <see cref="Type"/> of <see cref="ISaga{TAuthenticationToken}"/>.</typeparam>
161 : /// <param name="saga">The <typeparamref name="TSaga"/> to rehydrate.</param>
162 : /// <param name="events">
163 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="ISaga{TAuthenticationToken}"/>.
164 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
165 : /// </param>
166 : /// <param name="throwExceptionOnNoEvents">If true will throw an instance of <see cref="SagaNotFoundException{TSaga,TAuthenticationToken}"/> if no aggregate events or provided or found in the <see cref="EventStore"/>.</param>
167 1 : public virtual void LoadSagaHistory<TSaga>(TSaga saga, IList<ISagaEvent<TAuthenticationToken>> events = null, bool throwExceptionOnNoEvents = true)
168 : where TSaga : ISaga<TAuthenticationToken>
169 : {
170 : IList<ISagaEvent<TAuthenticationToken>> theseEvents = events ?? EventStore.Get<TSaga>(saga.Id).Cast<ISagaEvent<TAuthenticationToken>>().ToList();
171 : if (!theseEvents.Any())
172 : {
173 : if (throwExceptionOnNoEvents)
174 : throw new SagaNotFoundException<TSaga, TAuthenticationToken>(saga.Id);
175 : return;
176 : }
177 :
178 : var duplicatedEvents =
179 : theseEvents.GroupBy(x => x.Version)
180 : .Select(x => new { Version = x.Key, Total = x.Count() })
181 : .FirstOrDefault(x => x.Total > 1);
182 : if (duplicatedEvents != null)
183 : throw new DuplicateSagaEventException<TSaga, TAuthenticationToken>(saga.Id, duplicatedEvents.Version);
184 :
185 : saga.LoadFromHistory(theseEvents);
186 : }
187 : }
188 : }
|