Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Collections.ObjectModel;
12 : using System.Linq;
13 : using Akka.Actor;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Commands;
17 : using Cqrs.Domain;
18 : using Cqrs.Domain.Exceptions;
19 : using Cqrs.Events;
20 : using Cqrs.Infrastructure;
21 :
22 : namespace Cqrs.Akka.Domain
23 : {
24 : /// <summary>
25 : /// A <see cref="ISaga{TAuthenticationToken}"/> that is safe to use within Akka.NET
26 : /// </summary>
27 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of authentication token.</typeparam>
28 : public abstract class AkkaSaga<TAuthenticationToken>
29 : : ReceiveActor // PersistentActor
30 : , ISaga<TAuthenticationToken>
31 1 : {
32 : /// <summary>
33 : /// Gets or sets the <see cref="ISagaUnitOfWork{TAuthenticationToken}"/>.
34 : /// </summary>
35 : protected ISagaUnitOfWork<TAuthenticationToken> UnitOfWork { get; set; }
36 :
37 : /// <summary>
38 : /// Gets or sets the <see cref="IAkkaSagaRepository{TAuthenticationToken}"/>.
39 : /// </summary>
40 : protected IAkkaSagaRepository<TAuthenticationToken> Repository { get; set; }
41 :
42 : /// <summary>
43 : /// Gets or sets the <see cref="ILogger"/>.
44 : /// </summary>
45 : protected ILogger Logger { get; set; }
46 :
47 : /// <summary>
48 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
49 : /// </summary>
50 : protected ICorrelationIdHelper CorrelationIdHelper { get; set; }
51 :
52 : /// <summary>
53 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
54 : /// </summary>
55 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; set; }
56 :
57 : private ICollection<ISagaEvent<TAuthenticationToken>> Changes { get; set; }
58 :
59 : /// <summary>
60 : /// The identifier of the <see cref="ISaga{TAuthenticationToken}"/>.
61 : /// </summary>
62 : public Guid Id { get; protected set; }
63 :
64 : /// <summary>
65 : /// The current version of this <see cref="ISaga{TAuthenticationToken}"/>.
66 : /// </summary>
67 : public int Version { get; protected set; }
68 :
69 : /// <summary>
70 : /// Gets or sets the <see cref="ICommandPublisher{TAuthenticationToken}"/>.
71 : /// </summary>
72 : protected ICommandPublisher<TAuthenticationToken> CommandPublisher { get; set; }
73 :
74 : /// <summary>
75 : /// Instantiates a new instance of <see cref="AkkaSaga{TAuthenticationToken}"/>
76 : /// </summary>
77 1 : protected AkkaSaga(ISagaUnitOfWork<TAuthenticationToken> unitOfWork, ILogger logger, IAkkaSagaRepository<TAuthenticationToken> repository, ICorrelationIdHelper correlationIdHelper, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICommandPublisher<TAuthenticationToken> commandPublisher)
78 : {
79 : UnitOfWork = unitOfWork;
80 : Logger = logger;
81 : Repository = repository;
82 : CorrelationIdHelper = correlationIdHelper;
83 : AuthenticationTokenHelper = authenticationTokenHelper;
84 : CommandPublisher = commandPublisher;
85 : Changes = new ReadOnlyCollection<ISagaEvent<TAuthenticationToken>>(new List<ISagaEvent<TAuthenticationToken>>());
86 : }
87 :
88 : #region Overrides of ActorBase
89 :
90 : /// <summary>
91 : /// User overridable callback.
92 : /// <p/>
93 : /// Is called when an Actor is started.
94 : /// Actors are automatically started asynchronously when created.
95 : /// Empty default implementation.
96 : /// </summary>
97 1 : protected override void PreStart()
98 : {
99 : base.PreStart();
100 : Repository.LoadSagaHistory(this, throwExceptionOnNoEvents: false);
101 : }
102 :
103 : #endregion
104 :
105 : /// <summary>
106 : /// Executes the provided <paramref name="action"/> passing it the provided <paramref name="event"/>,
107 : /// then calls <see cref="AggregateRepository{TAuthenticationToken}.PublishEvent"/>
108 : /// </summary>
109 1 : protected virtual void Execute<TEvent>(Action<TEvent> action, TEvent @event)
110 : where TEvent : IEvent<TAuthenticationToken>
111 : {
112 : UnitOfWork.Add(this);
113 : try
114 : {
115 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
116 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
117 : action(@event);
118 :
119 : UnitOfWork.Commit();
120 :
121 : Sender.Tell(true, Self);
122 : }
123 : catch(Exception exception)
124 : {
125 : Logger.LogError("Executing an Akka.net request failed.", exception: exception, metaData: new Dictionary<string, object> { { "Type", GetType() }, { "Event", @event } });
126 : Sender.Tell(false, Self);
127 : throw;
128 : }
129 : }
130 :
131 : /// <summary>
132 : /// Get all applied changes that haven't yet been committed.
133 : /// </summary>
134 1 : public IEnumerable<ISagaEvent<TAuthenticationToken>> GetUncommittedChanges()
135 : {
136 : return Changes;
137 : }
138 :
139 : /// <summary>
140 : /// Mark all applied changes as committed, increment <see cref="Version"/> and flush the internal collection of changes.
141 : /// </summary>
142 1 : public virtual void MarkChangesAsCommitted()
143 : {
144 : Version = Version + Changes.Count;
145 : Changes = new ReadOnlyCollection<ISagaEvent<TAuthenticationToken>>(new List<ISagaEvent<TAuthenticationToken>>());
146 : }
147 :
148 : /// <summary>
149 : /// Apply all the <see cref="IEvent{TAuthenticationToken}">events</see> in <paramref name="history"/>
150 : /// using event replay to this instance.
151 : /// </summary>
152 1 : public virtual void LoadFromHistory(IEnumerable<ISagaEvent<TAuthenticationToken>> history)
153 : {
154 : Type sagaType = GetType();
155 : foreach (ISagaEvent<TAuthenticationToken> @event in history.OrderBy(e => e.Version))
156 : {
157 : if (@event.Version != Version + 1)
158 : throw new EventsOutOfOrderException(@event.Id, sagaType, Version + 1, @event.Version);
159 : ApplyChange(@event, true);
160 : }
161 : }
162 :
163 : /// <summary>
164 : /// Call the "Apply" method with a signature matching the provided <paramref name="event"/> without using event replay to this instance.
165 : /// </summary>
166 : /// <remarks>
167 : /// This means a method named "Apply", with return type void and one parameter must exist to be applied.
168 : /// If no method exists, nothing is applied
169 : /// The parameter type must match exactly the <see cref="Type"/> of the provided <paramref name="event"/>.
170 : /// </remarks>
171 1 : protected virtual void ApplyChange(ISagaEvent<TAuthenticationToken> @event)
172 : {
173 : ApplyChange(@event, false);
174 : }
175 :
176 : /// <summary>
177 : /// Calls <see cref="SetId"/>, then <see cref="ApplyChange(Cqrs.Events.ISagaEvent{TAuthenticationToken})"/>.
178 : /// </summary>
179 1 : protected virtual void ApplyChange(IEvent<TAuthenticationToken> @event)
180 : {
181 : var sagaEvent = new SagaEvent<TAuthenticationToken>(@event);
182 : // Set ID
183 : SetId(sagaEvent);
184 : ApplyChange(sagaEvent);
185 : }
186 :
187 : /// <summary>
188 : /// Sets the <see cref="IEvent{TAuthenticationToken}.Id"/> from <see cref="ISagaEvent{TAuthenticationToken}.Event"/> back onto <paramref name="sagaEvent"/>.
189 : /// </summary>
190 1 : protected virtual void SetId(ISagaEvent<TAuthenticationToken> sagaEvent)
191 : {
192 : sagaEvent.Id = sagaEvent.Event.Id;
193 : }
194 :
195 : private void ApplyChange(ISagaEvent<TAuthenticationToken> @event, bool isEventReplay)
196 : {
197 : this.AsDynamic().Apply(@event);
198 : if (!isEventReplay)
199 : {
200 : Changes = new ReadOnlyCollection<ISagaEvent<TAuthenticationToken>>(new[] { @event }.Concat(Changes).ToList());
201 : }
202 : else
203 : {
204 : Id = @event.Id;
205 : Version++;
206 : }
207 : }
208 :
209 : /// <summary>
210 : /// Dynamically calls the "Apply" method, passing it the <see cref="ISagaEvent{TAuthenticationToken}.Event"/> of the provided <paramref name="sagaEvent"/>.
211 : /// </summary>
212 1 : protected virtual void Apply(ISagaEvent<TAuthenticationToken> sagaEvent)
213 : {
214 : this.AsDynamic().Apply(sagaEvent.Event);
215 : }
216 : }
217 : }
|