Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Collections.ObjectModel;
12 : using System.Linq;
13 : using Akka.Actor;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Commands;
17 : using Cqrs.Domain;
18 : using Cqrs.Domain.Exceptions;
19 : using Cqrs.Events;
20 : using Cqrs.Infrastructure;
21 :
22 : namespace Cqrs.Akka.Domain
23 : {
24 : /// <summary>
25 : /// An <see cref="IAggregateRoot{TAuthenticationToken}"/> that is safe to use within Akka.NET
26 : /// </summary>
27 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of authentication token.</typeparam>
28 : public abstract class AkkaAggregateRoot<TAuthenticationToken>
29 : : ReceiveActor // PersistentActor
30 : , IAggregateRoot<TAuthenticationToken>
31 1 : {
32 : /// <summary>
33 : /// Gets or sets the <see cref="IUnitOfWork{TAuthenticationToken}"/>.
34 : /// </summary>
35 : protected IUnitOfWork<TAuthenticationToken> UnitOfWork { get; set; }
36 :
37 : /// <summary>
38 : /// Gets or sets the <see cref="IAkkaAggregateRepository{TAuthenticationToken}"/>.
39 : /// </summary>
40 : protected IAkkaAggregateRepository<TAuthenticationToken> Repository { get; set; }
41 :
42 : /// <summary>
43 : /// Gets or sets the <see cref="ILogger"/>.
44 : /// </summary>
45 : protected ILogger Logger { get; set; }
46 :
47 : /// <summary>
48 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
49 : /// </summary>
50 : protected ICorrelationIdHelper CorrelationIdHelper { get; set; }
51 :
52 : /// <summary>
53 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
54 : /// </summary>
55 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; set; }
56 :
57 : private ICollection<IEvent<TAuthenticationToken>> Changes { get; set; }
58 :
59 : /// <summary>
60 : /// The identifier of this <see cref="IAggregateRoot{TAuthenticationToken}"/>.
61 : /// </summary>
62 : public Guid Id { get; protected set; }
63 :
64 : /// <summary>
65 : /// The current version of this <see cref="IAggregateRoot{TAuthenticationToken}"/>.
66 : /// </summary>
67 : public int Version { get; protected set; }
68 :
69 : /// <summary>
70 : /// Instantiates a new instance of <see cref="AkkaAggregateRoot{TAuthenticationToken}"/>
71 : /// </summary>
72 1 : protected AkkaAggregateRoot(IUnitOfWork<TAuthenticationToken> unitOfWork, ILogger logger, IAkkaAggregateRepository<TAuthenticationToken> repository, ICorrelationIdHelper correlationIdHelper, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper)
73 : {
74 : UnitOfWork = unitOfWork;
75 : Logger = logger;
76 : Repository = repository;
77 : CorrelationIdHelper = correlationIdHelper;
78 : AuthenticationTokenHelper = authenticationTokenHelper;
79 : Changes = new ReadOnlyCollection<IEvent<TAuthenticationToken>>(new List<IEvent<TAuthenticationToken>>());
80 : }
81 :
82 : #region Overrides of ActorBase
83 :
84 : /// <summary>
85 : /// User overridable callback.
86 : /// <p/>
87 : /// Is called when an Actor is started.
88 : /// Actors are automatically started asynchronously when created.
89 : /// Empty default implementation.
90 : /// </summary>
91 1 : protected override void PreStart()
92 : {
93 : base.PreStart();
94 : Repository.LoadAggregateHistory(this, throwExceptionOnNoEvents: false);
95 : }
96 :
97 : #endregion
98 :
99 : /// <summary>
100 : /// Executes the provided <paramref name="action"/> passing it the provided <paramref name="command"/>,
101 : /// then calls <see cref="AggregateRepository{TAuthenticationToken}.PublishEvent"/>
102 : /// </summary>
103 1 : protected virtual void Execute<TCommand>(Action<TCommand> action, TCommand command)
104 : where TCommand : ICommand<TAuthenticationToken>
105 : {
106 : UnitOfWork.Add(this);
107 : try
108 : {
109 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
110 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
111 : action(command);
112 :
113 : UnitOfWork.Commit();
114 :
115 : Sender.Tell(true, Self);
116 : }
117 : catch(Exception exception)
118 : {
119 : Logger.LogError("Executing an Akka.net request failed.", exception: exception, metaData: new Dictionary<string, object> { { "Type", GetType() }, { "Command", command } });
120 : Sender.Tell(false, Self);
121 : throw;
122 : }
123 : }
124 :
125 : /// <summary>
126 : /// Get all applied changes that haven't yet been committed.
127 : /// </summary>
128 1 : public IEnumerable<IEvent<TAuthenticationToken>> GetUncommittedChanges()
129 : {
130 : return Changes;
131 : }
132 :
133 : /// <summary>
134 : /// Mark all applied changes as committed, increment <see cref="Version"/> and flush the internal collection of changes.
135 : /// </summary>
136 1 : public virtual void MarkChangesAsCommitted()
137 : {
138 : Version = Version + Changes.Count;
139 : Changes = new ReadOnlyCollection<IEvent<TAuthenticationToken>>(new List<IEvent<TAuthenticationToken>>());
140 : }
141 :
142 : /// <summary>
143 : /// Apply all the <see cref="IEvent{TAuthenticationToken}">events</see> in <paramref name="history"/>
144 : /// using event replay to this instance.
145 : /// </summary>
146 1 : public virtual void LoadFromHistory(IEnumerable<IEvent<TAuthenticationToken>> history)
147 : {
148 : Type aggregateType = GetType();
149 : foreach (IEvent<TAuthenticationToken> @event in history.OrderBy(e =>e.Version))
150 : {
151 : if (@event.Version != Version + 1)
152 : throw new EventsOutOfOrderException(@event.Id, aggregateType, Version + 1, @event.Version);
153 : ApplyChange(@event, true);
154 : }
155 : }
156 :
157 : /// <summary>
158 : /// Call the "Apply" method with a signature matching the provided <paramref name="event"/> without using event replay to this instance.
159 : /// </summary>
160 : /// <remarks>
161 : /// This means a method named "Apply", with return type void and one parameter must exist to be applied.
162 : /// If no method exists, nothing is applied
163 : /// The parameter type must match exactly the <see cref="Type"/> of the provided <paramref name="event"/>.
164 : /// </remarks>
165 1 : protected virtual void ApplyChange(IEvent<TAuthenticationToken> @event)
166 : {
167 : ApplyChange(@event, false);
168 : }
169 :
170 : private void ApplyChange(IEvent<TAuthenticationToken> @event, bool isEventReplay)
171 : {
172 : this.AsDynamic().Apply(@event);
173 : if (!isEventReplay)
174 : {
175 : Changes = new ReadOnlyCollection<IEvent<TAuthenticationToken>>(new []{@event}.Concat(Changes).ToList());
176 : }
177 : else
178 : {
179 : Id = @event.Id;
180 : Version++;
181 : }
182 : }
183 : }
184 : }
|