Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Linq;
13 : using System.Runtime.Caching;
14 : using Cqrs.Domain;
15 : using Cqrs.Events;
16 :
17 : namespace Cqrs.Cache
18 : {
19 : /// <summary>
20 : /// Uses <see cref="MemoryCache.Default"/> to provide a caching mechanism to improve performance of a <see cref="IAggregateRepository{TAuthenticationToken}"/>.
21 : /// </summary>
22 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of authentication token.</typeparam>
23 : public class CacheRepository<TAuthenticationToken> : IAggregateRepository<TAuthenticationToken>
24 1 : {
25 : /// <summary>
26 : /// Sets or set the <see cref="IAggregateRepository{TAuthenticationToken}"/> that will be used, and cached over.
27 : /// </summary>
28 : private IAggregateRepository<TAuthenticationToken> Repository { get; set; }
29 :
30 : /// <summary>
31 : /// Gets or sets the <see cref="IEventStore{TAuthenticationToken}"/> used to retrieve events from when a cache hit occurs.
32 : /// </summary>
33 : private IEventStore<TAuthenticationToken> EventStore { get; set; }
34 :
35 : /// <summary>
36 : /// Gets or sets the <see cref="MemoryCache"/> used.
37 : /// </summary>
38 : private MemoryCache Cache { get; set; }
39 :
40 : private Func<CacheItemPolicy> PolicyFactory { get; set; }
41 :
42 : private static readonly ConcurrentDictionary<string, object> Locks = new ConcurrentDictionary<string, object>();
43 :
44 : /// <summary>
45 : /// Instantiates a new instance of <see cref="CacheRepository{TAuthenticationToken}"/>.
46 : /// </summary>
47 1 : public CacheRepository(IAggregateRepository<TAuthenticationToken> repository, IEventStore<TAuthenticationToken> eventStore)
48 : {
49 : if(repository == null)
50 : throw new ArgumentNullException("repository");
51 : if(eventStore == null)
52 : throw new ArgumentNullException("eventStore");
53 :
54 : Repository = repository;
55 : EventStore = eventStore;
56 : Cache = MemoryCache.Default;
57 : PolicyFactory = () => new CacheItemPolicy
58 : {
59 : SlidingExpiration = new TimeSpan(0,0,15,0),
60 : RemovedCallback = x =>
61 : {
62 : object o;
63 : Locks.TryRemove(x.CacheItem.Key, out o);
64 : }
65 : };
66 : }
67 :
68 : /// <summary>
69 : /// Locks the cache, adds the provided <paramref name="aggregate"/> to the cache if not already in it, then calls <see cref="IAggregateRepository{TAuthenticationToken}.Save{TAggregateRoot}"/> on <see cref="Repository"/>.
70 : /// In the event of an <see cref="Exception"/> the <paramref name="aggregate"/> is always ejected out of the <see cref="Cache"/>.
71 : /// </summary>
72 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
73 : /// <param name="aggregate">The <see cref="IAggregateRoot{TAuthenticationToken}"/> to save and persist.</param>
74 : /// <param name="expectedVersion">The version number the <see cref="IAggregateRoot{TAuthenticationToken}"/> is expected to be at.</param>
75 1 : public virtual void Save<TAggregateRoot>(TAggregateRoot aggregate, int? expectedVersion = null)
76 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
77 : {
78 : var idstring = aggregate.Id.ToString();
79 : try
80 : {
81 : lock (Locks.GetOrAdd(idstring, x => new object()))
82 : {
83 : if (aggregate.Id != Guid.Empty && !IsTracked(aggregate.Id))
84 : Cache.Add(idstring, aggregate, PolicyFactory.Invoke());
85 : Repository.Save(aggregate, expectedVersion);
86 : }
87 : }
88 : catch (Exception)
89 : {
90 : Cache.Remove(idstring);
91 : throw;
92 : }
93 : }
94 :
95 : /// <summary>
96 : /// Locks the cache, checks if the aggregate is tracked in the <see cref="Cache"/>, if it is
97 : /// retrieves the aggregate from the <see cref="Cache"/> and then uses either the provided <paramref name="events"/> or makes a call <see cref="IEventStore{TAuthenticationToken}.Get(System.Type,System.Guid,bool,int)"/> on the <see cref="EventStore"/>
98 : /// and rehydrates the cached aggregate with any new events from it's cached version.
99 : /// If the aggregate is not in the <see cref="Cache"/>
100 : /// <see cref="IAggregateRepository{TAuthenticationToken}.Get{TAggregateRoot}"/> is called on the <see cref="Repository"/>.
101 : /// </summary>
102 : /// <typeparam name="TAggregateRoot">The <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</typeparam>
103 : /// <param name="aggregateId">The ID of the <see cref="IAggregateRoot{TAuthenticationToken}"/> to get.</param>
104 : /// <param name="events">
105 : /// A collection of <see cref="IEvent{TAuthenticationToken}"/> to replay on the retrieved <see cref="IAggregateRoot{TAuthenticationToken}"/>.
106 : /// If null, the <see cref="IEventStore{TAuthenticationToken}"/> will be used to retrieve a list of <see cref="IEvent{TAuthenticationToken}"/> for you.
107 : /// </param>
108 1 : public virtual TAggregateRoot Get<TAggregateRoot>(Guid aggregateId, IList<IEvent<TAuthenticationToken>> events = null)
109 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
110 : {
111 : string idstring = aggregateId.ToString();
112 : try
113 : {
114 : IList<IEvent<TAuthenticationToken>> theseEvents = null;
115 : lock (Locks.GetOrAdd(idstring, _ => new object()))
116 : {
117 : TAggregateRoot aggregate;
118 : if (IsTracked(aggregateId))
119 : {
120 : aggregate = (TAggregateRoot)Cache.Get(idstring);
121 : theseEvents = events ?? EventStore.Get<TAggregateRoot>(aggregateId, false, aggregate.Version).ToList();
122 : if (theseEvents.Any() && theseEvents.First().Version != aggregate.Version + 1)
123 : {
124 : Cache.Remove(idstring);
125 : }
126 : else
127 : {
128 : aggregate.LoadFromHistory(theseEvents);
129 : return aggregate;
130 : }
131 : }
132 :
133 : aggregate = Repository.Get<TAggregateRoot>(aggregateId, theseEvents);
134 : Cache.Add(aggregateId.ToString(), aggregate, PolicyFactory.Invoke());
135 : return aggregate;
136 : }
137 : }
138 : catch (Exception)
139 : {
140 : Cache.Remove(idstring);
141 : throw;
142 : }
143 : }
144 :
145 : private bool IsTracked(Guid id)
146 : {
147 : return Cache.Contains(id.ToString());
148 : }
149 : }
150 : }
|