Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Runtime.Caching;
13 : using System.Threading;
14 : using System.Threading.Tasks;
15 : using cdmdotnet.Logging;
16 : using Cqrs.Configuration;
17 : using Cqrs.Domain;
18 : using Cqrs.Messages;
19 :
20 : namespace Cqrs.Events
21 : {
22 : /// <summary>
23 : /// A, <see cref="EventStore{TAuthenticationToken}"/> that uses a <see cref="MemoryCache"/> implementation, flushing out data (I.E. it's not persisted)
24 : /// </summary>
25 : public class MemoryCacheEventStore<TAuthenticationToken>
26 : : EventStore<TAuthenticationToken>
27 1 : {
28 : /// <summary>
29 : /// Gets or sets the <see cref="IConfigurationManager"/>.
30 : /// </summary>
31 : protected IConfigurationManager ConfigurationManager { get; private set; }
32 :
33 : /// <summary>
34 : /// Gets or sets the <see cref="MemoryCache"/> of data grouped by event <see cref="Type"/>.
35 : /// </summary>
36 : protected MemoryCache EventStoreByType { get; private set; }
37 :
38 : /// <summary>
39 : /// Gets or sets the <see cref="MemoryCache"/> of data grouped by event <see cref="IMessage.CorrelationId"/>.
40 : /// </summary>
41 : protected MemoryCache EventStoreByCorrelationId { get; private set; }
42 :
43 : /// <summary>
44 : /// Gets of sets the SlidingExpirationValue, the value of "Cqrs.EventStore.MemoryCacheEventStore.SlidingExpiration" from <see cref="ConfigurationManager"/>.
45 : /// </summary>
46 : protected string SlidingExpirationValue { get; set; }
47 :
48 : /// <summary>
49 : /// Gets of sets the SlidingExpiration
50 : /// </summary>
51 : protected TimeSpan SlidingExpiration { get; set; }
52 :
53 : /// <summary>
54 : /// Instantiates a new instance of <see cref="MemoryCacheEventStore{TAuthenticationToken}"/> and calls <see cref="StartRefreshSlidingExpirationValue"/>.
55 : /// </summary>
56 1 : public MemoryCacheEventStore(IConfigurationManager configurationManager, IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger)
57 : : base(eventBuilder, eventDeserialiser, logger)
58 : {
59 : Guid id = Guid.NewGuid();
60 : ConfigurationManager = configurationManager;
61 : EventStoreByType = new MemoryCache(string.Format("EventStoreByType-{0:N}", id));
62 : EventStoreByCorrelationId = new MemoryCache(string.Format("EventStoreByCorrelationId-{0:N}", id));
63 :
64 : StartRefreshSlidingExpirationValue();
65 : }
66 :
67 : #region Overrides of EventStore<TAuthenticationToken>
68 :
69 : /// <summary>
70 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
71 : /// </summary>
72 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
73 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
74 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
75 : /// <param name="fromVersion">Load events starting from this version</param>
76 1 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
77 : {
78 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
79 :
80 : if (!EventStoreByType.Contains(streamName))
81 : {
82 : Logger.LogDebug(string.Format("The event store has no items '{0}'.", streamName));
83 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
84 : }
85 :
86 : CacheItem item = EventStoreByType.GetCacheItem(streamName);
87 : if (item == null)
88 : {
89 : Logger.LogDebug(string.Format("The event store had an item '{0}' but doesn't now.", streamName));
90 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
91 : }
92 :
93 : var events = item.Value as IEnumerable<EventData>;
94 : if (events == null)
95 : {
96 : if (item.Value == null)
97 : Logger.LogDebug(string.Format("The event store had an item '{0}' but it was null.", streamName));
98 : else
99 : Logger.LogWarning(string.Format("The event store had an item '{0}' but it was of type {1}.", streamName, item.Value.GetType()));
100 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
101 : }
102 : IEnumerable<EventData> query = events
103 : .Where(eventData => eventData.Version > fromVersion)
104 : .OrderByDescending(eventData => eventData.Version);
105 :
106 : if (useLastEventOnly)
107 : query = query.AsQueryable().Take(1);
108 :
109 : return query
110 : .Select(EventDeserialiser.Deserialise)
111 : .ToList();
112 : }
113 :
114 : /// <summary>
115 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="version"/>.
116 : /// </summary>
117 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
118 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
119 : /// <param name="version">Load events up-to and including from this version</param>
120 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetToVersion(Type aggregateRootType, Guid aggregateId, int version)
121 : {
122 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
123 :
124 : if (!EventStoreByType.Contains(streamName))
125 : {
126 : Logger.LogDebug(string.Format("The event store has no items '{0}'.", streamName));
127 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
128 : }
129 :
130 : CacheItem item = EventStoreByType.GetCacheItem(streamName);
131 : if (item == null)
132 : {
133 : Logger.LogDebug(string.Format("The event store had an item '{0}' but doesn't now.", streamName));
134 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
135 : }
136 :
137 : var events = item.Value as IEnumerable<EventData>;
138 : if (events == null)
139 : {
140 : if (item.Value == null)
141 : Logger.LogDebug(string.Format("The event store had an item '{0}' but it was null.", streamName));
142 : else
143 : Logger.LogWarning(string.Format("The event store had an item '{0}' but it was of type {1}.", streamName, item.Value.GetType()));
144 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
145 : }
146 : IEnumerable<EventData> query = events
147 : .Where(eventData => eventData.Version <= version)
148 : .OrderByDescending(eventData => eventData.Version);
149 :
150 : return query
151 : .Select(EventDeserialiser.Deserialise)
152 : .ToList();
153 : }
154 :
155 : /// <summary>
156 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> up to and including the provided <paramref name="versionedDate"/>.
157 : /// </summary>
158 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
159 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
160 : /// <param name="versionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
161 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetToDate(Type aggregateRootType, Guid aggregateId, DateTime versionedDate)
162 : {
163 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
164 :
165 : if (!EventStoreByType.Contains(streamName))
166 : {
167 : Logger.LogDebug(string.Format("The event store has no items '{0}'.", streamName));
168 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
169 : }
170 :
171 : CacheItem item = EventStoreByType.GetCacheItem(streamName);
172 : if (item == null)
173 : {
174 : Logger.LogDebug(string.Format("The event store had an item '{0}' but doesn't now.", streamName));
175 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
176 : }
177 :
178 : var events = item.Value as IEnumerable<EventData>;
179 : if (events == null)
180 : {
181 : if (item.Value == null)
182 : Logger.LogDebug(string.Format("The event store had an item '{0}' but it was null.", streamName));
183 : else
184 : Logger.LogWarning(string.Format("The event store had an item '{0}' but it was of type {1}.", streamName, item.Value.GetType()));
185 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
186 : }
187 : IEnumerable<EventData> query = events
188 : .Where(eventData => eventData.Timestamp <= versionedDate)
189 : .OrderByDescending(eventData => eventData.Version);
190 :
191 : return query
192 : .Select(EventDeserialiser.Deserialise)
193 : .ToList();
194 : }
195 :
196 : /// <summary>
197 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/> from and including the provided <paramref name="fromVersionedDate"/> up to and including the provided <paramref name="toVersionedDate"/>.
198 : /// </summary>
199 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
200 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
201 : /// <param name="fromVersionedDate">Load events from and including from this <see cref="DateTime"/></param>
202 : /// <param name="toVersionedDate">Load events up-to and including from this <see cref="DateTime"/></param>
203 1 : public override IEnumerable<IEvent<TAuthenticationToken>> GetBetweenDates(Type aggregateRootType, Guid aggregateId, DateTime fromVersionedDate, DateTime toVersionedDate)
204 : {
205 : throw new NotImplementedException();
206 : }
207 :
208 : /// <summary>
209 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
210 : /// </summary>
211 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
212 1 : public override IEnumerable<EventData> Get(Guid correlationId)
213 : {
214 : if (!EventStoreByCorrelationId.Contains(correlationId.ToString("N")))
215 : {
216 : Logger.LogDebug(string.Format("The event store has no items by the correlationId '{0:N}'.", correlationId));
217 : return Enumerable.Empty<EventData>();
218 : }
219 :
220 : CacheItem item = EventStoreByCorrelationId.GetCacheItem(correlationId.ToString("N"));
221 : if (item == null)
222 : {
223 : Logger.LogDebug(string.Format("The event store had some items by the correlationId '{0:N}' but doesn't now.", correlationId));
224 : return Enumerable.Empty<EventData>();
225 : }
226 :
227 : var events = item.Value as IEnumerable<EventData>;
228 : if (events == null)
229 : {
230 : if (item.Value == null)
231 : Logger.LogDebug(string.Format("The event store had some items by the correlationId '{0:N}' but it was null.", correlationId));
232 : else
233 : Logger.LogWarning(string.Format("The event store had some items by the correlationId '{0:N}' but it was of type {1}.", correlationId, item.Value.GetType()));
234 : return Enumerable.Empty<EventData>();
235 : }
236 : IEnumerable<EventData> query = events.OrderBy(eventData => eventData.Timestamp);
237 :
238 : return query.ToList();
239 : }
240 :
241 : /// <summary>
242 : /// Persist the provided <paramref name="eventData"/> into storage.
243 : /// </summary>
244 : /// <param name="eventData">The <see cref="EventData"/> to persist.</param>
245 1 : protected override void PersistEvent(EventData eventData)
246 : {
247 : IList<EventData> events = new List<EventData>();
248 :
249 : // By correlationId first
250 : Guid correlationId = eventData.CorrelationId;
251 : object item = EventStoreByCorrelationId.AddOrGetExisting(correlationId.ToString("N"), events, GetDetaultCacheItemPolicy());
252 : if (item != null)
253 : {
254 : events = item as IList<EventData>;
255 : if (events == null)
256 : {
257 : Logger.LogWarning(string.Format("The event store had some items by the correlationId '{0:N}' but it doesn't now.", correlationId));
258 : throw new Exception(string.Format("The event store had some items by the correlationId '{0:N}' but it doesn't now.", correlationId));
259 : }
260 : }
261 :
262 : events.Add(eventData);
263 : // Reset the variable for it's next usage
264 : events = new List<EventData>();
265 :
266 : // By type next
267 : string streamName = eventData.AggregateId;
268 : item = EventStoreByType.AddOrGetExisting(streamName, events, GetDetaultCacheItemPolicy());
269 : if (item != null)
270 : {
271 : events = item as IList<EventData>;
272 : if (events == null)
273 : {
274 : Logger.LogWarning(string.Format("The event store had an item by id '{0}' but it doesn't now.", streamName));
275 : throw new Exception(string.Format("The event store had an item by id '{0}' but it doesn't now.", streamName));
276 : }
277 : }
278 :
279 : events.Add(eventData);
280 : }
281 :
282 : #endregion
283 :
284 : /// <summary>
285 : /// Reads "Cqrs.EventStore.MemoryCacheEventStore.SlidingExpiration" from <see cref="ConfigurationManager"/>, checks if it has changed and then
286 : /// Update <see cref="SlidingExpiration"/> with the new value.
287 : /// </summary>
288 1 : protected virtual void RefreshSlidingExpirationValue()
289 : {
290 : // First refresh the EventBlackListProcessing property
291 : string slidingExpirationValue;
292 : if (!ConfigurationManager.TryGetSetting("Cqrs.EventStore.MemoryCacheEventStore.SlidingExpiration", out slidingExpirationValue))
293 : slidingExpirationValue = "0, 15, 0";
294 :
295 : if (SlidingExpirationValue != slidingExpirationValue)
296 : {
297 : string[] slidingExpirationParts = slidingExpirationValue.Split(',');
298 : if (slidingExpirationParts.Length != 3 || slidingExpirationParts.Length != 4)
299 : return;
300 :
301 : int adjuster = slidingExpirationParts.Length == 3 ? 0 : 1;
302 : int days = 0;
303 : int hours;
304 : int minutes;
305 : int seconds;
306 : if (!int.TryParse(slidingExpirationParts[0 + adjuster].Trim(), out hours))
307 : return;
308 : if (!int.TryParse(slidingExpirationParts[1 + adjuster].Trim(), out minutes))
309 : return;
310 : if (!int.TryParse(slidingExpirationParts[2 + adjuster].Trim(), out seconds))
311 : return;
312 : if (slidingExpirationParts.Length == 4)
313 : if (!int.TryParse(slidingExpirationParts[0].Trim(), out days))
314 : return;
315 : SlidingExpirationValue = slidingExpirationValue;
316 : if (slidingExpirationParts.Length == 4)
317 : SlidingExpiration = new TimeSpan(days, hours, minutes, seconds);
318 : else
319 : SlidingExpiration = new TimeSpan(hours, minutes, seconds);
320 : }
321 : }
322 :
323 : /// <summary>
324 : /// Start a <see cref="Task"/> that will call <see cref="RefreshSlidingExpirationValue"/> in a loop with a 1 second wait time between loops.
325 : /// </summary>
326 1 : protected virtual void StartRefreshSlidingExpirationValue()
327 : {
328 : Task.Factory.StartNewSafely(() =>
329 : {
330 : long loop = 0;
331 : while (true)
332 : {
333 : RefreshSlidingExpirationValue();
334 :
335 : if (loop++ % 5 == 0)
336 : Thread.Yield();
337 : else
338 : Thread.Sleep(1000);
339 : if (loop == long.MaxValue)
340 : loop = long.MinValue;
341 : }
342 : });
343 : }
344 :
345 : /// <summary>
346 : /// Get's a <see cref="CacheItemPolicy"/> with the <see cref="CacheItemPolicy.SlidingExpiration"/> set to 15 minutes
347 : /// </summary>
348 1 : protected virtual CacheItemPolicy GetDetaultCacheItemPolicy()
349 : {
350 : return new CacheItemPolicy { SlidingExpiration = SlidingExpiration };
351 : }
352 : }
353 : }
|