Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Runtime.Caching;
13 : using System.Threading;
14 : using System.Threading.Tasks;
15 : using cdmdotnet.Logging;
16 : using Cqrs.Configuration;
17 :
18 : namespace Cqrs.Events
19 : {
20 : /// <summary>
21 : /// A, <see cref="EventStore{TAuthenticationToken}"/> that uses a <see cref="MemoryCache"/> implementation, flushing out data (I.E. it's not persisted)
22 : /// </summary>
23 : public class MemoryCacheEventStore<TAuthenticationToken>
24 : : EventStore<TAuthenticationToken>
25 1 : {
26 : protected IConfigurationManager ConfigurationManager { get; private set; }
27 :
28 : protected MemoryCache EventStoreByType { get; private set; }
29 :
30 : protected MemoryCache EventStoreByCorrelationId { get; private set; }
31 :
32 : protected string SlidingExpirationValue { get; set; }
33 :
34 : protected TimeSpan SlidingExpiration { get; set; }
35 :
36 0 : public MemoryCacheEventStore(IConfigurationManager configurationManager, IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger)
37 : : base(eventBuilder, eventDeserialiser, logger)
38 : {
39 : Guid id = Guid.NewGuid();
40 : ConfigurationManager = configurationManager;
41 : EventStoreByType = new MemoryCache(string.Format("EventStoreByType-{0:N}", id));
42 : EventStoreByCorrelationId = new MemoryCache(string.Format("EventStoreByCorrelationId-{0:N}", id));
43 :
44 : StartRefreshSlidingExpirationValue();
45 : }
46 :
47 : #region Overrides of EventStore<TAuthenticationToken>
48 :
49 0 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
50 : {
51 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
52 :
53 : if (!EventStoreByType.Contains(streamName))
54 : {
55 : Logger.LogDebug(string.Format("The event store has no items '{0}'.", streamName));
56 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
57 : }
58 :
59 : CacheItem item = EventStoreByType.GetCacheItem(streamName);
60 : if (item == null)
61 : {
62 : Logger.LogDebug(string.Format("The event store had an item '{0}' but doesn't now.", streamName));
63 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
64 : }
65 :
66 : var events = item.Value as IEnumerable<EventData>;
67 : if (events == null)
68 : {
69 : if (item.Value == null)
70 : Logger.LogDebug(string.Format("The event store had an item '{0}' but it was null.", streamName));
71 : else
72 : Logger.LogWarning(string.Format("The event store had an item '{0}' but it was of type {1}.", streamName, item.Value.GetType()));
73 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
74 : }
75 : IEnumerable<EventData> query = events
76 : .Where(eventData => eventData.Version > fromVersion)
77 : .OrderByDescending(eventData => eventData.Version);
78 :
79 : if (useLastEventOnly)
80 : query = query.AsQueryable().Take(1);
81 :
82 : return query
83 : .Select(EventDeserialiser.Deserialise)
84 : .ToList();
85 : }
86 :
87 0 : public override IEnumerable<EventData> Get(Guid correlationId)
88 : {
89 : if (!EventStoreByCorrelationId.Contains(correlationId.ToString("N")))
90 : {
91 : Logger.LogDebug(string.Format("The event store has no items by the correlationId '{0:N}'.", correlationId));
92 : return Enumerable.Empty<EventData>();
93 : }
94 :
95 : CacheItem item = EventStoreByCorrelationId.GetCacheItem(correlationId.ToString("N"));
96 : if (item == null)
97 : {
98 : Logger.LogDebug(string.Format("The event store had some items by the correlationId '{0:N}' but doesn't now.", correlationId));
99 : return Enumerable.Empty<EventData>();
100 : }
101 :
102 : var events = item.Value as IEnumerable<EventData>;
103 : if (events == null)
104 : {
105 : if (item.Value == null)
106 : Logger.LogDebug(string.Format("The event store had some items by the correlationId '{0:N}' but it was null.", correlationId));
107 : else
108 : Logger.LogWarning(string.Format("The event store had some items by the correlationId '{0:N}' but it was of type {1}.", correlationId, item.Value.GetType()));
109 : return Enumerable.Empty<EventData>();
110 : }
111 : IEnumerable<EventData> query = events.OrderBy(eventData => eventData.Timestamp);
112 :
113 : return query.ToList();
114 : }
115 :
116 0 : protected override void PersistEvent(EventData eventData)
117 : {
118 : IList<EventData> events = new List<EventData>();
119 :
120 : // By correlationId first
121 : Guid correlationId = eventData.CorrelationId;
122 : object item = EventStoreByCorrelationId.AddOrGetExisting(correlationId.ToString("N"), events, GetDetaultCacheItemPolicy());
123 : if (item != null)
124 : {
125 : events = item as IList<EventData>;
126 : if (events == null)
127 : {
128 : Logger.LogWarning(string.Format("The event store had some items by the correlationId '{0:N}' but it doesn't now.", correlationId));
129 : throw new Exception(string.Format("The event store had some items by the correlationId '{0:N}' but it doesn't now.", correlationId));
130 : }
131 : }
132 :
133 : events.Add(eventData);
134 : // Reset the variable for it's next usage
135 : events = new List<EventData>();
136 :
137 : // By type next
138 : string streamName = eventData.AggregateId;
139 : item = EventStoreByType.AddOrGetExisting(streamName, events, GetDetaultCacheItemPolicy());
140 : if (item != null)
141 : {
142 : events = item as IList<EventData>;
143 : if (events == null)
144 : {
145 : Logger.LogWarning(string.Format("The event store had an item by id '{0}' but it doesn't now.", streamName));
146 : throw new Exception(string.Format("The event store had an item by id '{0}' but it doesn't now.", streamName));
147 : }
148 : }
149 :
150 : events.Add(eventData);
151 : }
152 :
153 : #endregion
154 :
155 0 : protected virtual void RefreshSlidingExpirationValue()
156 : {
157 : // First refresh the EventBlackListProcessing property
158 : string slidingExpirationValue;
159 : if (!ConfigurationManager.TryGetSetting("Cqrs.EventStore.MemoryCacheEventStore.SlidingExpiration", out slidingExpirationValue))
160 : slidingExpirationValue = "0, 15, 0";
161 :
162 : if (SlidingExpirationValue != slidingExpirationValue)
163 : {
164 : string[] slidingExpirationParts = slidingExpirationValue.Split(',');
165 : if (slidingExpirationParts.Length != 3 || slidingExpirationParts.Length != 4)
166 : return;
167 :
168 : int adjuster = slidingExpirationParts.Length == 3 ? 0 : 1;
169 : int days = 0;
170 : int hours;
171 : int minutes;
172 : int seconds;
173 : if (!int.TryParse(slidingExpirationParts[0 + adjuster].Trim(), out hours))
174 : return;
175 : if (!int.TryParse(slidingExpirationParts[1 + adjuster].Trim(), out minutes))
176 : return;
177 : if (!int.TryParse(slidingExpirationParts[2 + adjuster].Trim(), out seconds))
178 : return;
179 : if (slidingExpirationParts.Length == 4)
180 : if (!int.TryParse(slidingExpirationParts[0].Trim(), out days))
181 : return;
182 : SlidingExpirationValue = slidingExpirationValue;
183 : if (slidingExpirationParts.Length == 4)
184 : SlidingExpiration = new TimeSpan(days, hours, minutes, seconds);
185 : else
186 : SlidingExpiration = new TimeSpan(hours, minutes, seconds);
187 : }
188 : }
189 :
190 0 : protected virtual void StartRefreshSlidingExpirationValue()
191 : {
192 : Task.Factory.StartNewSafely(() =>
193 : {
194 : long loop = 0;
195 : while (true)
196 : {
197 : RefreshSlidingExpirationValue();
198 :
199 : if (loop++ % 5 == 0)
200 : Thread.Yield();
201 : else
202 : Thread.Sleep(1000);
203 : if (loop == long.MaxValue)
204 : loop = long.MinValue;
205 : }
206 : });
207 : }
208 :
209 : /// <summary>
210 : /// Get's a <see cref="CacheItemPolicy"/> with the <see cref="CacheItemPolicy.SlidingExpiration"/> set to 15 minutes
211 : /// </summary>
212 1 : protected virtual CacheItemPolicy GetDetaultCacheItemPolicy()
213 : {
214 : return new CacheItemPolicy { SlidingExpiration = SlidingExpiration };
215 : }
216 : }
217 : }
|