Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Linq;
13 : using System.Runtime.Caching;
14 : using Cqrs.Domain;
15 : using Cqrs.Events;
16 :
17 : namespace Cqrs.Cache
18 : {
19 : /// <summary>
20 : /// Uses <see cref="MemoryCache.Default"/> to provide a caching mechanism to improve performance of a <see cref="IAggregateRepository{TAuthenticationToken}"/>.
21 : /// </summary>
22 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of authentication token.</typeparam>
23 : public class CacheRepository<TAuthenticationToken>
24 : : IAggregateRepository<TAuthenticationToken>
25 1 : {
26 : /// <summary>
27 : /// Sets or set the <see cref="IAggregateRepository{TAuthenticationToken}"/> that will be used, and cached over.
28 : /// </summary>
29 : private IAggregateRepository<TAuthenticationToken> Repository { get; set; }
30 :
31 : /// <summary>
32 : /// Gets or sets the <see cref="IEventStore{TAuthenticationToken}"/> used to retrieve events from when a cache hit occurs.
33 : /// </summary>
34 : private IEventStore<TAuthenticationToken> EventStore { get; set; }
35 :
36 : /// <summary>
37 : /// Gets or sets the <see cref="MemoryCache"/> used.
38 : /// </summary>
39 : private MemoryCache Cache { get; set; }
40 :
41 : private Func<CacheItemPolicy> PolicyFactory { get; set; }
42 :
43 : private static readonly ConcurrentDictionary<string, object> Locks = new ConcurrentDictionary<string, object>();
44 :
45 : /// <summary>
46 : /// Instantiates a new instance of <see cref="CacheRepository{TAuthenticationToken}"/>.
47 : /// </summary>
48 1 : public CacheRepository(IAggregateRepository<TAuthenticationToken> repository, IEventStore<TAuthenticationToken> eventStore)
49 : {
50 : if(repository == null)
51 : throw new ArgumentNullException("repository");
52 : if(eventStore == null)
53 : throw new ArgumentNullException("eventStore");
54 :
55 : Repository = repository;
56 : EventStore = eventStore;
57 : Cache = MemoryCache.Default;
58 : PolicyFactory = () => new CacheItemPolicy
59 : {
60 : SlidingExpiration = new TimeSpan(0,0,15,0),
61 : RemovedCallback = x =>
62 : {
63 : object o;
64 : Locks.TryRemove(x.CacheItem.Key, out o);
65 : }
66 : };
67 : }
68 :
69 : /// <summary>
70 : /// Locks the cache, adds the provided <paramref name="aggregate"/> to the cache if not already in it, then calls <see cref="IAggregateRepository{TAuthenticationToken}.Save{TAggregateRoot}"/> on <see cref="Repository"/>.
71 : /// In the event of an <see cref="Exception"/> the <paramref name="aggregate"/> is always ejected out of the <see cref="Cache"/>.
72 : /// </summary>
73 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
74 : /// <param name="aggregate">The <see cref="IAggregateRoot{TAuthenticationToken}"/> to save and persist.</param>
75 : /// <param name="expectedVersion">The version number the <see cref="IAggregateRoot{TAuthenticationToken}"/> is expected to be at.</param>
76 1 : public virtual void Save<TAggregateRoot>(TAggregateRoot aggregate, int? expectedVersion = null)
77 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
78 : {
79 : var idstring = aggregate.Id.ToString();
80 : try
81 : {
82 : lock (Locks.GetOrAdd(idstring, x => new object()))
83 : {
84 : if (aggregate.Id != Guid.Empty && !IsTracked(aggregate.Id))
85 : Cache.Add(idstring, aggregate, PolicyFactory.Invoke());
86 : Repository.Save(aggregate, expectedVersion);
87 : }
88 : }
89 : catch (Exception)
90 : {
91 : Cache.Remove(idstring);
92 : throw;
93 : }
94 : }
95 :
96 : /// <summary>
97 : /// Locks the cache, checks if the aggregate is tracked in the <see cref="Cache"/>, if it is
98 : /// retrieves the aggregate from the <see cref="Cache"/> and then uses either the provided <paramref name="events"/> or makes a call <see cref="IEventStore{TAuthenticationToken}.Get(System.Type,System.Guid,bool,int)"/> on the <see cref="EventStore"/>
99 : /// and rehydrates the cached aggregate with any new events from it's cached version.
100 : /// If the aggregate is not in the <see cref="Cache"/>
101 : /// <see cref="IAggregateRepository{TAuthenticationToken}.Get{TAggregateRoot}"/> is called on the <see cref="Repository"/>.
102 : /// </summary>
103 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
104 : /// <param name="aggregateId">The ID of the <see cref="IAggregateRoot{TAuthenticationToken}"/> to get.</param>
105 : /// <param name="events">
106 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
107 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
108 : /// </param>
109 1 : public virtual TAggregateRoot Get<TAggregateRoot>(Guid aggregateId, IList<IEvent<TAuthenticationToken>> events = null)
110 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
111 : {
112 : string idstring = aggregateId.ToString();
113 : try
114 : {
115 : IList<IEvent<TAuthenticationToken>> theseEvents = null;
116 : lock (Locks.GetOrAdd(idstring, _ => new object()))
117 : {
118 : TAggregateRoot aggregate;
119 : if (IsTracked(aggregateId))
120 : {
121 : aggregate = (TAggregateRoot)Cache.Get(idstring);
122 : theseEvents = events ?? EventStore.Get<TAggregateRoot>(aggregateId, false, aggregate.Version).ToList();
123 : if (theseEvents.Any() && theseEvents.First().Version != aggregate.Version + 1)
124 : {
125 : Cache.Remove(idstring);
126 : }
127 : else
128 : {
129 : aggregate.LoadFromHistory(theseEvents);
130 : return aggregate;
131 : }
132 : }
133 :
134 : aggregate = Repository.Get<TAggregateRoot>(aggregateId, theseEvents);
135 : Cache.Add(aggregateId.ToString(), aggregate, PolicyFactory.Invoke());
136 : return aggregate;
137 : }
138 : }
139 : catch (Exception)
140 : {
141 : Cache.Remove(idstring);
142 : throw;
143 : }
144 : }
145 :
146 : /// <summary>
147 : /// Retrieves an <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <typeparamref name="TAggregateRoot"/> up to and including the provided <paramref name="version"/>.
148 : /// </summary>
149 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
150 : /// <param name="aggregateId">The identifier of the <see cref="IAggregateRoot{TAuthenticationToken}"/> to retrieve.</param>
151 : /// <param name="version">Load events up-to and including from this version</param>
152 : /// <param name="events">
153 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
154 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
155 : /// </param>
156 1 : public TAggregateRoot GetToVersion<TAggregateRoot>(Guid aggregateId, int version, IList<IEvent<TAuthenticationToken>> events = null)
157 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
158 : {
159 : throw new InvalidOperationException("Verion replay is not appriopriate with caching.");
160 : }
161 :
162 : /// <summary>
163 : /// Retrieves an <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <typeparamref name="TAggregateRoot"/> up to and including the provided <paramref name="versionedDate"/>.
164 : /// </summary>
165 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
166 : /// <param name="aggregateId">The identifier of the <see cref="IAggregateRoot{TAuthenticationToken}"/> to retrieve.</param>
167 : /// <param name="versionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
168 : /// <param name="events">
169 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
170 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
171 : /// </param>
172 1 : public TAggregateRoot GetToDate<TAggregateRoot>(Guid aggregateId, DateTime versionedDate, IList<IEvent<TAuthenticationToken>> events = null)
173 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
174 : {
175 : throw new InvalidOperationException("Verion replay is not appriopriate with caching.");
176 : }
177 :
178 : private bool IsTracked(Guid id)
179 : {
180 : return Cache.Contains(id.ToString());
181 : }
182 : }
183 : }
|