Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Collections.ObjectModel;
12 : using System.Linq;
13 : using Akka.Actor;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Commands;
17 : using Cqrs.Domain;
18 : using Cqrs.Domain.Exceptions;
19 : using Cqrs.Events;
20 : using Cqrs.Infrastructure;
21 :
22 : namespace Cqrs.Akka.Domain
23 : {
24 : public abstract class AkkaSaga<TAuthenticationToken>
25 : : ReceiveActor // PersistentActor
26 : , ISaga<TAuthenticationToken>
27 0 : {
28 : protected ISagaUnitOfWork<TAuthenticationToken> UnitOfWork { get; set; }
29 :
30 : protected IAkkaSagaRepository<TAuthenticationToken> Repository { get; set; }
31 :
32 : protected ILogger Logger { get; set; }
33 :
34 : protected ICorrelationIdHelper CorrelationIdHelper { get; set; }
35 :
36 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; set; }
37 :
38 : private ICollection<ISagaEvent<TAuthenticationToken>> Changes { get; set; }
39 :
40 : public Guid Id { get; protected set; }
41 :
42 : public int Version { get; protected set; }
43 :
44 : protected ICommandPublisher<TAuthenticationToken> CommandPublisher { get; set; }
45 :
46 0 : protected AkkaSaga(ISagaUnitOfWork<TAuthenticationToken> unitOfWork, ILogger logger, IAkkaSagaRepository<TAuthenticationToken> repository, ICorrelationIdHelper correlationIdHelper, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICommandPublisher<TAuthenticationToken> commandPublisher)
47 : {
48 : UnitOfWork = unitOfWork;
49 : Logger = logger;
50 : Repository = repository;
51 : CorrelationIdHelper = correlationIdHelper;
52 : AuthenticationTokenHelper = authenticationTokenHelper;
53 : CommandPublisher = commandPublisher;
54 : Changes = new ReadOnlyCollection<ISagaEvent<TAuthenticationToken>>(new List<ISagaEvent<TAuthenticationToken>>());
55 : }
56 :
57 : #region Overrides of ActorBase
58 :
59 : /// <summary>
60 : /// User overridable callback.
61 : /// <p/>
62 : /// Is called when an Actor is started.
63 : /// Actors are automatically started asynchronously when created.
64 : /// Empty default implementation.
65 : /// </summary>
66 1 : protected override void PreStart()
67 : {
68 : base.PreStart();
69 : Repository.LoadSagaHistory(this, throwExceptionOnNoEvents: false);
70 : }
71 :
72 : #endregion
73 :
74 0 : protected virtual void Execute<TEvent>(Action<TEvent> action, TEvent @event)
75 : where TEvent : IEvent<TAuthenticationToken>
76 : {
77 : UnitOfWork.Add(this);
78 : try
79 : {
80 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
81 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
82 : action(@event);
83 :
84 : UnitOfWork.Commit();
85 :
86 : Sender.Tell(true, Self);
87 : }
88 : catch(Exception exception)
89 : {
90 : Logger.LogError("Executing an Akka.net request failed.", exception: exception, metaData: new Dictionary<string, object> { { "Type", GetType() }, { "Event", @event } });
91 : Sender.Tell(false, Self);
92 : throw;
93 : }
94 : }
95 :
96 0 : public IEnumerable<ISagaEvent<TAuthenticationToken>> GetUncommittedChanges()
97 : {
98 : return Changes;
99 : }
100 :
101 0 : public virtual void MarkChangesAsCommitted()
102 : {
103 : Version = Version + Changes.Count;
104 : Changes = new ReadOnlyCollection<ISagaEvent<TAuthenticationToken>>(new List<ISagaEvent<TAuthenticationToken>>());
105 : }
106 :
107 0 : public virtual void LoadFromHistory(IEnumerable<ISagaEvent<TAuthenticationToken>> history)
108 : {
109 : Type sagaType = GetType();
110 : foreach (ISagaEvent<TAuthenticationToken> @event in history.OrderBy(e => e.Version))
111 : {
112 : if (@event.Version != Version + 1)
113 : throw new EventsOutOfOrderException(@event.Id, sagaType, Version + 1, @event.Version);
114 : ApplyChange(@event, true);
115 : }
116 : }
117 :
118 0 : protected virtual void ApplyChange(ISagaEvent<TAuthenticationToken> @event)
119 : {
120 : ApplyChange(@event, false);
121 : }
122 :
123 0 : protected virtual void ApplyChange(IEvent<TAuthenticationToken> @event)
124 : {
125 : var sagaEvent = new SagaEvent<TAuthenticationToken>(@event);
126 : // Set ID
127 : SetId(sagaEvent);
128 : ApplyChange(sagaEvent);
129 : }
130 :
131 0 : protected virtual void SetId(ISagaEvent<TAuthenticationToken> sagaEvent)
132 : {
133 : sagaEvent.Id = sagaEvent.Event.Id;
134 : }
135 :
136 :
137 : private void ApplyChange(ISagaEvent<TAuthenticationToken> @event, bool isEventReplay)
138 : {
139 : this.AsDynamic().Apply(@event);
140 : if (!isEventReplay)
141 : {
142 : Changes = new ReadOnlyCollection<ISagaEvent<TAuthenticationToken>>(new[] { @event }.Concat(Changes).ToList());
143 : }
144 : else
145 : {
146 : Id = @event.Id;
147 : Version++;
148 : }
149 : }
150 :
151 0 : protected virtual void Apply(ISagaEvent<TAuthenticationToken> sagaEvent)
152 : {
153 : this.AsDynamic().Apply(sagaEvent.Event);
154 : }
155 : }
156 : }
|