Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using Chinchilla.Logging;
13 : using Cqrs.Commands;
14 : using Cqrs.Domain.Exceptions;
15 : using Cqrs.Domain.Factories;
16 : using Cqrs.Events;
17 :
18 : namespace Cqrs.Domain
19 : {
20 : /// <summary>
21 : /// Provides basic repository methods for operations with instances of <see cref="ISaga{TAuthenticationToken}"/>.
22 : /// </summary>
23 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of authentication token.</typeparam>
24 : public class SagaRepository<TAuthenticationToken> : ISagaRepository<TAuthenticationToken>
25 1 : {
26 : /// <summary>
27 : /// Gets or sets the <see cref="IEventStore{TAuthenticationToken}"/> used to store and retrieve events from.
28 : /// </summary>
29 : protected IEventStore<TAuthenticationToken> EventStore { get; private set; }
30 :
31 : /// <summary>
32 : /// Gets or sets the Publisher used to publish events on once saved into the <see cref="EventStore"/>.
33 : /// </summary>
34 : protected IEventPublisher<TAuthenticationToken> Publisher { get; private set; }
35 :
36 : /// <summary>
37 : /// Gets or sets the Publisher used to publish an <see cref="ICommand{TAuthenticationToken}"/>
38 : /// </summary>
39 : protected ICommandPublisher<TAuthenticationToken> CommandPublisher { get; private set; }
40 :
41 : /// <summary>
42 : /// Gets or set the <see cref="IAggregateFactory"/>.
43 : /// </summary>
44 : protected IAggregateFactory SagaFactory { get; private set; }
45 :
46 : /// <summary>
47 : /// Gets or set the <see cref="ICorrelationIdHelper"/>.
48 : /// </summary>
49 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
50 :
51 : /// <summary>
52 : /// Instantiates a new instance of <see cref="SagaRepository{TAuthenticationToken}"/>
53 : /// </summary>
54 1 : public SagaRepository(IAggregateFactory sagaFactory, IEventStore<TAuthenticationToken> eventStore, IEventPublisher<TAuthenticationToken> publisher, ICommandPublisher<TAuthenticationToken> commandPublisher, ICorrelationIdHelper correlationIdHelper)
55 : {
56 : EventStore = eventStore;
57 : Publisher = publisher;
58 : CorrelationIdHelper = correlationIdHelper;
59 : CommandPublisher = commandPublisher;
60 : SagaFactory = sagaFactory;
61 : }
62 :
63 : /// <summary>
64 : /// Save and persist the provided <paramref name="saga"/>, optionally providing the version number the <see cref="ISaga{TAuthenticationToken}"/> is expected to be at.
65 : /// </summary>
66 : /// <typeparam name="TSaga">The <see cref="Type"/> of the <see cref="ISaga{TAuthenticationToken}"/>.</typeparam>
67 : /// <param name="saga">The <see cref="ISaga{TAuthenticationToken}"/> to save and persist.</param>
68 : /// <param name="expectedVersion">The version number the <see cref="ISaga{TAuthenticationToken}"/> is expected to be at.</param>
69 1 : public virtual void Save<TSaga>(TSaga saga, int? expectedVersion = null)
70 : where TSaga : ISaga<TAuthenticationToken>
71 : {
72 : IList<ISagaEvent<TAuthenticationToken>> uncommittedChanges = saga.GetUncommittedChanges().ToList();
73 : IEnumerable<ICommand<TAuthenticationToken>> commandsToPublish = saga.GetUnpublishedCommands();
74 : if (!uncommittedChanges.Any())
75 : {
76 : PublishCommand(commandsToPublish);
77 : return;
78 : }
79 :
80 : if (expectedVersion != null)
81 : {
82 : IEnumerable<IEvent<TAuthenticationToken>> eventStoreResults = EventStore.Get(saga.GetType(), saga.Id, false, expectedVersion.Value);
83 : if (eventStoreResults.Any())
84 : throw new ConcurrencyException(saga.Id);
85 : }
86 :
87 : var eventsToPublish = new List<ISagaEvent<TAuthenticationToken>>();
88 :
89 : int i = 0;
90 : int version = saga.Version;
91 : foreach (ISagaEvent<TAuthenticationToken> @event in uncommittedChanges)
92 : {
93 : if (@event.Rsn == Guid.Empty)
94 : @event.Rsn = saga.Id;
95 : if (@event.Rsn == Guid.Empty)
96 : throw new AggregateOrEventMissingIdException(saga.GetType(), @event.GetType());
97 :
98 : i++;
99 : version++;
100 :
101 : @event.Version = version;
102 : @event.TimeStamp = DateTimeOffset.UtcNow;
103 : @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
104 : EventStore.Save(saga.GetType(), @event);
105 : eventsToPublish.Add(@event);
106 : }
107 :
108 : saga.MarkChangesAsCommitted();
109 : foreach (ISagaEvent<TAuthenticationToken> @event in eventsToPublish)
110 : PublishEvent(@event);
111 :
112 : PublishCommand(commandsToPublish);
113 : }
114 :
115 : /// <summary>
116 : /// Publish the saved <paramref name="event"/>.
117 : /// </summary>
118 1 : protected virtual void PublishEvent(ISagaEvent<TAuthenticationToken> @event)
119 : {
120 : Publisher.Publish(@event);
121 : }
122 :
123 : /// <summary>
124 : /// Publish the <paramref name="commands"/>.
125 : /// </summary>
126 1 : protected virtual void PublishCommand(IEnumerable<ICommand<TAuthenticationToken>> commands)
127 : {
128 : CommandPublisher.Publish(commands);
129 : }
130 :
131 : /// <summary>
132 : /// Retrieves an <see cref="ISaga{TAuthenticationToken}"/> of type <typeparamref name="TSaga"/>.
133 : /// </summary>
134 : /// <typeparam name="TSaga">The <see cref="Type"/> of the <see cref="ISaga{TAuthenticationToken}"/>.</typeparam>
135 : /// <param name="sagaId">The identifier of the <see cref="ISaga{TAuthenticationToken}"/> to retrieve.</param>
136 : /// <param name="events">
137 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="ISaga{TAuthenticationToken}"/>.
138 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
139 : /// </param>
140 1 : public virtual TSaga Get<TSaga>(Guid sagaId, IList<ISagaEvent<TAuthenticationToken>> events = null)
141 : where TSaga : ISaga<TAuthenticationToken>
142 : {
143 : return LoadSaga<TSaga>(sagaId, events);
144 : }
145 :
146 : /// <summary>
147 : /// Calls <see cref="IAggregateFactory.Create"/> to get a, <typeparamref name="TSaga"/>.
148 : /// </summary>
149 : /// <typeparam name="TSaga">The <see cref="Type"/> of <see cref="ISaga{TAuthenticationToken}"/>.</typeparam>
150 : /// <param name="id">The id of the <typeparamref name="TSaga"/> to create.</param>
151 1 : protected virtual TSaga CreateSaga<TSaga>(Guid id)
152 : where TSaga : ISaga<TAuthenticationToken>
153 : {
154 : var saga = SagaFactory.Create<TSaga>(id);
155 :
156 : return saga;
157 : }
158 :
159 : /// <summary>
160 : /// Calls <see cref="IAggregateFactory.Create"/> to get a, <typeparamref name="TSaga"/> and then calls <see cref="LoadSagaHistory{TSaga}"/>.
161 : /// </summary>
162 : /// <typeparam name="TSaga">The <see cref="Type"/> of <see cref="ISaga{TAuthenticationToken}"/>.</typeparam>
163 : /// <param name="id">The id of the <typeparamref name="TSaga"/> to create.</param>
164 : /// <param name="events">
165 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="ISaga{TAuthenticationToken}"/>.
166 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
167 : /// </param>
168 1 : protected virtual TSaga LoadSaga<TSaga>(Guid id, IList<ISagaEvent<TAuthenticationToken>> events = null)
169 : where TSaga : ISaga<TAuthenticationToken>
170 : {
171 : var saga = SagaFactory.Create<TSaga>(id, false);
172 :
173 : LoadSagaHistory(saga, events);
174 : return saga;
175 : }
176 :
177 : /// <summary>
178 : /// If <paramref name="events"/> is null, loads the events from <see cref="EventStore"/>, checks for duplicates and then
179 : /// rehydrates the <paramref name="saga"/> with the events.
180 : /// </summary>
181 : /// <typeparam name="TSaga">The <see cref="Type"/> of <see cref="ISaga{TAuthenticationToken}"/>.</typeparam>
182 : /// <param name="saga">The <typeparamref name="TSaga"/> to rehydrate.</param>
183 : /// <param name="events">
184 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="ISaga{TAuthenticationToken}"/>.
185 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
186 : /// </param>
187 : /// <param name="throwExceptionOnNoEvents">If true will throw an instance of <see cref="SagaNotFoundException{TSaga,TAuthenticationToken}"/> if no aggregate events or provided or found in the <see cref="EventStore"/>.</param>
188 1 : public virtual void LoadSagaHistory<TSaga>(TSaga saga, IList<ISagaEvent<TAuthenticationToken>> events = null, bool throwExceptionOnNoEvents = true)
189 : where TSaga : ISaga<TAuthenticationToken>
190 : {
191 : IList<ISagaEvent<TAuthenticationToken>> theseEvents = events ?? EventStore.Get<TSaga>(saga.Id).Cast<ISagaEvent<TAuthenticationToken>>().ToList();
192 : if (!theseEvents.Any())
193 : {
194 : if (throwExceptionOnNoEvents)
195 : throw new SagaNotFoundException<TSaga, TAuthenticationToken>(saga.Id);
196 : return;
197 : }
198 :
199 : var duplicatedEvents =
200 : theseEvents.GroupBy(x => x.Version)
201 : .Select(x => new { Version = x.Key, Total = x.Count() })
202 : .FirstOrDefault(x => x.Total > 1);
203 : if (duplicatedEvents != null)
204 : throw new DuplicateSagaEventException<TSaga, TAuthenticationToken>(saga.Id, duplicatedEvents.Version);
205 :
206 : saga.LoadFromHistory(theseEvents);
207 : }
208 : }
209 : }
|