Line data Source code
1 : using System;
2 : using System.Collections.Concurrent;
3 : using System.Collections.Generic;
4 : using System.Linq;
5 : using System.Runtime.Caching;
6 : using Cqrs.Domain;
7 : using Cqrs.Events;
8 :
9 : namespace Cqrs.Cache
10 : {
11 : public class CacheRepository<TAuthenticationToken> : IRepository<TAuthenticationToken>
12 0 : {
13 : private IRepository<TAuthenticationToken> Repository { get; set; }
14 :
15 : private IEventStore<TAuthenticationToken> EventStore { get; set; }
16 :
17 : private MemoryCache Cache { get; set; }
18 :
19 : private Func<CacheItemPolicy> PolicyFactory { get; set; }
20 :
21 : private static readonly ConcurrentDictionary<string, object> Locks = new ConcurrentDictionary<string, object>();
22 :
23 0 : public CacheRepository(IRepository<TAuthenticationToken> repository, IEventStore<TAuthenticationToken> eventStore)
24 : {
25 : if(repository == null)
26 : throw new ArgumentNullException("repository");
27 : if(eventStore == null)
28 : throw new ArgumentNullException("eventStore");
29 :
30 : Repository = repository;
31 : EventStore = eventStore;
32 : Cache = MemoryCache.Default;
33 : PolicyFactory = () => new CacheItemPolicy
34 : {
35 : SlidingExpiration = new TimeSpan(0,0,15,0),
36 : RemovedCallback = x =>
37 : {
38 : object o;
39 : Locks.TryRemove(x.CacheItem.Key, out o);
40 : }
41 : };
42 : }
43 :
44 0 : public void Save<TAggregateRoot>(TAggregateRoot aggregate, int? expectedVersion = null)
45 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
46 : {
47 : var idstring = aggregate.Id.ToString();
48 : try
49 : {
50 : lock (Locks.GetOrAdd(idstring, x => new object()))
51 : {
52 : if (aggregate.Id != Guid.Empty && !IsTracked(aggregate.Id))
53 : Cache.Add(idstring, aggregate, PolicyFactory.Invoke());
54 : Repository.Save(aggregate, expectedVersion);
55 : }
56 : }
57 : catch (Exception)
58 : {
59 : Cache.Remove(idstring);
60 : throw;
61 : }
62 : }
63 :
64 0 : public TAggregateRoot Get<TAggregateRoot>(Guid aggregateId, IList<IEvent<TAuthenticationToken>> events = null)
65 : where TAggregateRoot : IAggregateRoot<TAuthenticationToken>
66 : {
67 : string idstring = aggregateId.ToString();
68 : try
69 : {
70 : IList<IEvent<TAuthenticationToken>> theseEvents = null;
71 : lock (Locks.GetOrAdd(idstring, _ => new object()))
72 : {
73 : TAggregateRoot aggregate;
74 : if (IsTracked(aggregateId))
75 : {
76 : aggregate = (TAggregateRoot)Cache.Get(idstring);
77 : theseEvents = events ?? EventStore.Get<TAggregateRoot>(aggregateId, false, aggregate.Version).ToList();
78 : if (theseEvents.Any() && theseEvents.First().Version != aggregate.Version + 1)
79 : {
80 : Cache.Remove(idstring);
81 : }
82 : else
83 : {
84 : aggregate.LoadFromHistory(theseEvents);
85 : return aggregate;
86 : }
87 : }
88 :
89 : aggregate = Repository.Get<TAggregateRoot>(aggregateId, theseEvents);
90 : Cache.Add(aggregateId.ToString(), aggregate, PolicyFactory.Invoke());
91 : return aggregate;
92 : }
93 : }
94 : catch (Exception)
95 : {
96 : Cache.Remove(idstring);
97 : throw;
98 : }
99 : }
100 :
101 : private bool IsTracked(Guid id)
102 : {
103 : return Cache.Contains(id.ToString());
104 : }
105 : }
106 : }
|