Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Collections.ObjectModel;
12 : using System.Linq;
13 : using Akka.Actor;
14 : using Chinchilla.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Commands;
17 : using Cqrs.Domain;
18 : using Cqrs.Domain.Exceptions;
19 : using Cqrs.Events;
20 : using Cqrs.Infrastructure;
21 :
22 : namespace Cqrs.Akka.Domain
23 : {
24 : /// <summary>
25 : /// A <see cref="ISaga{TAuthenticationToken}"/> that is safe to use within Akka.NET
26 : /// </summary>
27 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of authentication token.</typeparam>
28 : public abstract class AkkaSaga<TAuthenticationToken>
29 : : ReceiveActor // PersistentActor
30 : , ISaga<TAuthenticationToken>
31 1 : {
32 : /// <summary>
33 : /// Gets or sets the <see cref="ISagaUnitOfWork{TAuthenticationToken}"/>.
34 : /// </summary>
35 : protected ISagaUnitOfWork<TAuthenticationToken> UnitOfWork { get; set; }
36 :
37 : /// <summary>
38 : /// Gets or sets the <see cref="IAkkaSagaRepository{TAuthenticationToken}"/>.
39 : /// </summary>
40 : protected IAkkaSagaRepository<TAuthenticationToken> Repository { get; set; }
41 :
42 : /// <summary>
43 : /// Gets or sets the <see cref="ILogger"/>.
44 : /// </summary>
45 : protected ILogger Logger { get; set; }
46 :
47 : /// <summary>
48 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
49 : /// </summary>
50 : protected ICorrelationIdHelper CorrelationIdHelper { get; set; }
51 :
52 : /// <summary>
53 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
54 : /// </summary>
55 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; set; }
56 :
57 : private ICollection<ISagaEvent<TAuthenticationToken>> Changes { get; set; }
58 :
59 : private ICollection<ICommand<TAuthenticationToken>> Commands { get; set; }
60 :
61 : /// <summary>
62 : /// The identifier of the <see cref="ISaga{TAuthenticationToken}"/>.
63 : /// </summary>
64 : public Guid Id { get; protected set; }
65 :
66 : /// <summary>
67 : /// The current version of this <see cref="ISaga{TAuthenticationToken}"/>.
68 : /// </summary>
69 : public int Version { get; protected set; }
70 :
71 : /// <summary>
72 : /// Gets or sets the <see cref="ICommandPublisher{TAuthenticationToken}"/>.
73 : /// </summary>
74 : protected ICommandPublisher<TAuthenticationToken> CommandPublisher { get; set; }
75 :
76 : /// <summary>
77 : /// Instantiates a new instance of <see cref="AkkaSaga{TAuthenticationToken}"/>
78 : /// </summary>
79 1 : protected AkkaSaga(ISagaUnitOfWork<TAuthenticationToken> unitOfWork, ILogger logger, IAkkaSagaRepository<TAuthenticationToken> repository, ICorrelationIdHelper correlationIdHelper, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICommandPublisher<TAuthenticationToken> commandPublisher)
80 : {
81 : UnitOfWork = unitOfWork;
82 : Logger = logger;
83 : Repository = repository;
84 : CorrelationIdHelper = correlationIdHelper;
85 : AuthenticationTokenHelper = authenticationTokenHelper;
86 : CommandPublisher = commandPublisher;
87 : Changes = new ReadOnlyCollection<ISagaEvent<TAuthenticationToken>>(new List<ISagaEvent<TAuthenticationToken>>());
88 : Commands = new ReadOnlyCollection<ICommand<TAuthenticationToken>>(new List<ICommand<TAuthenticationToken>>());
89 : }
90 :
91 : #region Overrides of ActorBase
92 :
93 : /// <summary>
94 : /// User overridable callback.
95 : /// <p/>
96 : /// Is called when an Actor is started.
97 : /// Actors are automatically started asynchronously when created.
98 : /// Empty default implementation.
99 : /// </summary>
100 1 : protected override void PreStart()
101 : {
102 : base.PreStart();
103 : Repository.LoadSagaHistory(this, throwExceptionOnNoEvents: false);
104 : }
105 :
106 : #endregion
107 :
108 : /// <summary>
109 : /// Executes the provided <paramref name="action"/> passing it the provided <paramref name="event"/>,
110 : /// then calls <see cref="AggregateRepository{TAuthenticationToken}.PublishEvent"/>
111 : /// </summary>
112 1 : protected virtual void Execute<TEvent>(Action<TEvent> action, TEvent @event)
113 : where TEvent : IEvent<TAuthenticationToken>
114 : {
115 : UnitOfWork.Add(this);
116 : try
117 : {
118 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
119 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
120 : action(@event);
121 :
122 : UnitOfWork.Commit();
123 :
124 : Sender.Tell(true, Self);
125 : }
126 : catch(Exception exception)
127 : {
128 : Logger.LogError("Executing an Akka.net request failed.", exception: exception, metaData: new Dictionary<string, object> { { "Type", GetType() }, { "Event", @event } });
129 : Sender.Tell(false, Self);
130 : throw;
131 : }
132 : }
133 :
134 : /// <summary>
135 : /// Get all applied changes that haven't yet been committed.
136 : /// </summary>
137 1 : public virtual IEnumerable<ISagaEvent<TAuthenticationToken>> GetUncommittedChanges()
138 : {
139 : return Changes;
140 : }
141 :
142 : /// <summary>
143 : /// Mark all applied changes as committed, increment <see cref="Version"/> and flush the internal collection of changes.
144 : /// </summary>
145 1 : public virtual void MarkChangesAsCommitted()
146 : {
147 : Version = Version + Changes.Count;
148 : Changes = new ReadOnlyCollection<ISagaEvent<TAuthenticationToken>>(new List<ISagaEvent<TAuthenticationToken>>());
149 : }
150 :
151 : /// <summary>
152 : /// Apply all the <see cref="IEvent{TAuthenticationToken}">events</see> in <paramref name="history"/>
153 : /// using event replay to this instance.
154 : /// </summary>
155 1 : public virtual void LoadFromHistory(IEnumerable<ISagaEvent<TAuthenticationToken>> history)
156 : {
157 : Type sagaType = GetType();
158 : foreach (ISagaEvent<TAuthenticationToken> @event in history.OrderBy(e => e.Version))
159 : {
160 : if (@event.Version != Version + 1)
161 : throw new EventsOutOfOrderException(@event.GetIdentity(), sagaType, Version + 1, @event.Version);
162 : ApplyChange(@event, true);
163 : }
164 : }
165 :
166 : /// <summary>
167 : /// Get all pending commands that haven't yet been published yet.
168 : /// </summary>
169 1 : public virtual IEnumerable<ICommand<TAuthenticationToken>> GetUnpublishedCommands()
170 : {
171 : return Commands;
172 : }
173 :
174 : /// <summary>
175 : /// Queue the provided <paramref name="command"/> for publishing.
176 : /// </summary>
177 1 : protected virtual void QueueCommand(ICommand<TAuthenticationToken> command)
178 : {
179 : Commands = new ReadOnlyCollection<ICommand<TAuthenticationToken>>(Commands.Concat(new[] { command }).ToList());
180 : }
181 :
182 : /// <summary>
183 : /// Mark all published commands as published and flush the internal collection of commands.
184 : /// </summary>
185 1 : public virtual void MarkCommandsAsPublished()
186 : {
187 : Commands = new ReadOnlyCollection<ICommand<TAuthenticationToken>>(new List<ICommand<TAuthenticationToken>>());
188 : }
189 :
190 : /// <summary>
191 : /// Call the "Apply" method with a signature matching the provided <paramref name="event"/> without using event replay to this instance.
192 : /// </summary>
193 : /// <remarks>
194 : /// This means a method named "Apply", with return type void and one parameter must exist to be applied.
195 : /// If no method exists, nothing is applied
196 : /// The parameter type must match exactly the <see cref="Type"/> of the provided <paramref name="event"/>.
197 : /// </remarks>
198 1 : protected virtual void ApplyChange(ISagaEvent<TAuthenticationToken> @event)
199 : {
200 : ApplyChange(@event, false);
201 : }
202 :
203 : /// <summary>
204 : /// Calls <see cref="SetId"/>, then <see cref="ApplyChange(Cqrs.Events.ISagaEvent{TAuthenticationToken})"/>.
205 : /// </summary>
206 1 : protected virtual void ApplyChange(IEvent<TAuthenticationToken> @event)
207 : {
208 : var sagaEvent = new SagaEvent<TAuthenticationToken>(@event);
209 : // Set ID
210 : SetId(sagaEvent);
211 : ApplyChange(sagaEvent);
212 : }
213 :
214 : /// <summary>
215 : /// Sets the <see cref="IEvent{TAuthenticationToken}.Id"/> from <see cref="ISagaEvent{TAuthenticationToken}.Event"/> back onto <paramref name="sagaEvent"/>.
216 : /// </summary>
217 1 : protected virtual void SetId(ISagaEvent<TAuthenticationToken> sagaEvent)
218 : {
219 : sagaEvent.Rsn = sagaEvent.Event.GetIdentity();
220 : }
221 :
222 : private void ApplyChange(ISagaEvent<TAuthenticationToken> @event, bool isEventReplay)
223 : {
224 : this.AsDynamic().Apply(@event);
225 : if (!isEventReplay)
226 : {
227 : Changes = new ReadOnlyCollection<ISagaEvent<TAuthenticationToken>>(new[] { @event }.Concat(Changes).ToList());
228 : }
229 : else
230 : {
231 : Id = @event.Rsn;
232 : Version++;
233 : }
234 : }
235 :
236 : /// <summary>
237 : /// Dynamically calls the "Apply" method, passing it the <see cref="ISagaEvent{TAuthenticationToken}.Event"/> of the provided <paramref name="sagaEvent"/>.
238 : /// </summary>
239 1 : protected virtual void Apply(ISagaEvent<TAuthenticationToken> sagaEvent)
240 : {
241 : this.AsDynamic().Apply(sagaEvent.Event);
242 : }
243 : }
244 : }
|