Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Collections.ObjectModel;
12 : using System.Linq;
13 : using Akka.Actor;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Akka.Snapshots;
16 : using Cqrs.Authentication;
17 : using Cqrs.Commands;
18 : using Cqrs.Domain;
19 : using Cqrs.Domain.Exceptions;
20 : using Cqrs.Events;
21 : using Cqrs.Infrastructure;
22 :
23 : namespace Cqrs.Akka.Domain
24 : {
25 : /// <summary>
26 : /// An <see cref="IAggregateRoot{TAuthenticationToken}"/> that is safe to use within Akka.NET
27 : /// </summary>
28 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of authentication token.</typeparam>
29 : public abstract class AkkaAggregateRoot<TAuthenticationToken>
30 : : ReceiveActor // PersistentActor
31 : , IAggregateRoot<TAuthenticationToken>
32 1 : {
33 : /// <summary>
34 : /// Gets or sets the <see cref="IUnitOfWork{TAuthenticationToken}"/>.
35 : /// </summary>
36 : protected IUnitOfWork<TAuthenticationToken> UnitOfWork { get; set; }
37 :
38 : /// <summary>
39 : /// Gets or sets the <see cref="IAkkaAggregateRepository{TAuthenticationToken}"/>.
40 : /// </summary>
41 : protected IAkkaAggregateRepository<TAuthenticationToken> Repository { get; set; }
42 :
43 : /// <summary>
44 : /// Gets or sets the <see cref="ILogger"/>.
45 : /// </summary>
46 : protected ILogger Logger { get; set; }
47 :
48 : /// <summary>
49 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>.
50 : /// </summary>
51 : protected ICorrelationIdHelper CorrelationIdHelper { get; set; }
52 :
53 : /// <summary>
54 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>.
55 : /// </summary>
56 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; set; }
57 :
58 : private ICollection<IEvent<TAuthenticationToken>> Changes { get; set; }
59 :
60 : /// <summary>
61 : /// The identifier of this <see cref="IAggregateRoot{TAuthenticationToken}"/>.
62 : /// </summary>
63 : public Guid Id { get; protected set; }
64 :
65 : /// <summary>
66 : /// The current version of this <see cref="IAggregateRoot{TAuthenticationToken}"/>.
67 : /// </summary>
68 : public int Version { get; protected set; }
69 :
70 : /// <summary>
71 : /// Instantiates a new instance of <see cref="AkkaAggregateRoot{TAuthenticationToken}"/>
72 : /// </summary>
73 1 : protected AkkaAggregateRoot(IUnitOfWork<TAuthenticationToken> unitOfWork, ILogger logger, IAkkaAggregateRepository<TAuthenticationToken> repository, ICorrelationIdHelper correlationIdHelper, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper)
74 : {
75 : UnitOfWork = unitOfWork;
76 : Logger = logger;
77 : Repository = repository;
78 : CorrelationIdHelper = correlationIdHelper;
79 : AuthenticationTokenHelper = authenticationTokenHelper;
80 : Changes = new ReadOnlyCollection<IEvent<TAuthenticationToken>>(new List<IEvent<TAuthenticationToken>>());
81 : }
82 :
83 : #region Overrides of ActorBase
84 :
85 : /// <summary>
86 : /// User overridable callback.
87 : /// <p/>
88 : /// Is called when an Actor is started.
89 : /// Actors are automatically started asynchronously when created.
90 : /// Empty default implementation.
91 : /// </summary>
92 1 : protected override void PreStart()
93 : {
94 : base.PreStart();
95 : Repository.LoadAggregateHistory(this, throwExceptionOnNoEvents: false);
96 : }
97 :
98 : #endregion
99 :
100 : /// <summary>
101 : /// Executes the provided <paramref name="action"/> passing it the provided <paramref name="command"/>,
102 : /// then calls <see cref="AggregateRepository{TAuthenticationToken}.PublishEvent"/>
103 : /// </summary>
104 1 : protected virtual void Execute<TCommand>(Action<TCommand> action, TCommand command)
105 : where TCommand : ICommand<TAuthenticationToken>
106 : {
107 : Type baseType = GetType().BaseType;
108 : UnitOfWork.Add(this, baseType != null && baseType.Name == typeof(AkkaSnapshotAggregateRoot<,>).Name);
109 : try
110 : {
111 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
112 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
113 : action(command);
114 :
115 : UnitOfWork.Commit();
116 :
117 : Sender.Tell(true, Self);
118 : }
119 : catch(Exception exception)
120 : {
121 : Logger.LogError("Executing an Akka.net request failed.", exception: exception, metaData: new Dictionary<string, object> { { "Type", GetType() }, { "Command", command } });
122 : Sender.Tell(false, Self);
123 : throw;
124 : }
125 : }
126 :
127 : /// <summary>
128 : /// Get all applied changes that haven't yet been committed.
129 : /// </summary>
130 1 : public IEnumerable<IEvent<TAuthenticationToken>> GetUncommittedChanges()
131 : {
132 : return Changes;
133 : }
134 :
135 : /// <summary>
136 : /// Mark all applied changes as committed, increment <see cref="Version"/> and flush the internal collection of changes.
137 : /// </summary>
138 1 : public virtual void MarkChangesAsCommitted()
139 : {
140 : Version = Version + Changes.Count;
141 : Changes = new ReadOnlyCollection<IEvent<TAuthenticationToken>>(new List<IEvent<TAuthenticationToken>>());
142 : }
143 :
144 : /// <summary>
145 : /// Apply all the <see cref="IEvent{TAuthenticationToken}">events</see> in <paramref name="history"/>
146 : /// using event replay to this instance.
147 : /// </summary>
148 1 : public virtual void LoadFromHistory(IEnumerable<IEvent<TAuthenticationToken>> history)
149 : {
150 : Type aggregateType = GetType();
151 : foreach (IEvent<TAuthenticationToken> @event in history.OrderBy(e =>e.Version))
152 : {
153 : if (@event.Version != Version + 1)
154 : throw new EventsOutOfOrderException(@event.GetIdentity(), aggregateType, Version + 1, @event.Version);
155 : ApplyChange(@event, true);
156 : }
157 : }
158 :
159 : /// <summary>
160 : /// Call the "Apply" method with a signature matching the provided <paramref name="event"/> without using event replay to this instance.
161 : /// </summary>
162 : /// <remarks>
163 : /// This means a method named "Apply", with return type void and one parameter must exist to be applied.
164 : /// If no method exists, nothing is applied
165 : /// The parameter type must match exactly the <see cref="Type"/> of the provided <paramref name="event"/>.
166 : /// </remarks>
167 1 : protected virtual void ApplyChange(IEvent<TAuthenticationToken> @event)
168 : {
169 : ApplyChange(@event, false);
170 : }
171 :
172 : private void ApplyChange(IEvent<TAuthenticationToken> @event, bool isEventReplay)
173 : {
174 : this.AsDynamic().Apply(@event);
175 : if (!isEventReplay)
176 : {
177 : Changes = new ReadOnlyCollection<IEvent<TAuthenticationToken>>(new []{@event}.Concat(Changes).ToList());
178 : }
179 : else
180 : {
181 : Id = @event.GetIdentity();
182 : Version++;
183 : }
184 : }
185 : }
186 : }
|