Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Runtime.Caching;
13 : using System.Threading;
14 : using System.Threading.Tasks;
15 : using cdmdotnet.Logging;
16 : using Cqrs.Configuration;
17 : using Cqrs.Domain;
18 : using Cqrs.Messages;
19 :
20 : namespace Cqrs.Events
21 : {
22 : /// <summary>
23 : /// A, <see cref="EventStore{TAuthenticationToken}"/> that uses a <see cref="MemoryCache"/> implementation, flushing out data (I.E. it's not persisted)
24 : /// </summary>
25 : public class MemoryCacheEventStore<TAuthenticationToken>
26 : : EventStore<TAuthenticationToken>
27 1 : {
28 : /// <summary>
29 : /// Gets or sets the <see cref="IConfigurationManager"/>.
30 : /// </summary>
31 : protected IConfigurationManager ConfigurationManager { get; private set; }
32 :
33 : /// <summary>
34 : /// Gets or sets the <see cref="MemoryCache"/> of data grouped by event <see cref="Type"/>.
35 : /// </summary>
36 : protected MemoryCache EventStoreByType { get; private set; }
37 :
38 : /// <summary>
39 : /// Gets or sets the <see cref="MemoryCache"/> of data grouped by event <see cref="IMessage.CorrelationId"/>.
40 : /// </summary>
41 : protected MemoryCache EventStoreByCorrelationId { get; private set; }
42 :
43 : /// <summary>
44 : /// Gets of sets the SlidingExpirationValue, the value of "Cqrs.EventStore.MemoryCacheEventStore.SlidingExpiration" from <see cref="ConfigurationManager"/>.
45 : /// </summary>
46 : protected string SlidingExpirationValue { get; set; }
47 :
48 : /// <summary>
49 : /// Gets of sets the SlidingExpiration
50 : /// </summary>
51 : protected TimeSpan SlidingExpiration { get; set; }
52 :
53 : /// <summary>
54 : /// Instantiates a new instance of <see cref="MemoryCacheEventStore{TAuthenticationToken}"/> and calls <see cref="StartRefreshSlidingExpirationValue"/>.
55 : /// </summary>
56 1 : public MemoryCacheEventStore(IConfigurationManager configurationManager, IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger)
57 : : base(eventBuilder, eventDeserialiser, logger)
58 : {
59 : Guid id = Guid.NewGuid();
60 : ConfigurationManager = configurationManager;
61 : EventStoreByType = new MemoryCache(string.Format("EventStoreByType-{0:N}", id));
62 : EventStoreByCorrelationId = new MemoryCache(string.Format("EventStoreByCorrelationId-{0:N}", id));
63 :
64 : StartRefreshSlidingExpirationValue();
65 : }
66 :
67 : #region Overrides of EventStore<TAuthenticationToken>
68 :
69 : /// <summary>
70 : /// Gets a collection of <see cref="IEvent{TAuthenticationToken}"/> for the <see cref="IAggregateRoot{TAuthenticationToken}"/> of type <paramref name="aggregateRootType"/> with the ID matching the provided <paramref name="aggregateId"/>.
71 : /// </summary>
72 : /// <param name="aggregateRootType"> <see cref="Type"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/> the <see cref="IEvent{TAuthenticationToken}"/> was raised in.</param>
73 : /// <param name="aggregateId">The <see cref="IAggregateRoot{TAuthenticationToken}.Id"/> of the <see cref="IAggregateRoot{TAuthenticationToken}"/>.</param>
74 : /// <param name="useLastEventOnly">Loads only the last event<see cref="IEvent{TAuthenticationToken}"/>.</param>
75 : /// <param name="fromVersion">Load events starting from this version</param>
76 1 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
77 : {
78 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
79 :
80 : if (!EventStoreByType.Contains(streamName))
81 : {
82 : Logger.LogDebug(string.Format("The event store has no items '{0}'.", streamName));
83 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
84 : }
85 :
86 : CacheItem item = EventStoreByType.GetCacheItem(streamName);
87 : if (item == null)
88 : {
89 : Logger.LogDebug(string.Format("The event store had an item '{0}' but doesn't now.", streamName));
90 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
91 : }
92 :
93 : var events = item.Value as IEnumerable<EventData>;
94 : if (events == null)
95 : {
96 : if (item.Value == null)
97 : Logger.LogDebug(string.Format("The event store had an item '{0}' but it was null.", streamName));
98 : else
99 : Logger.LogWarning(string.Format("The event store had an item '{0}' but it was of type {1}.", streamName, item.Value.GetType()));
100 : return Enumerable.Empty<IEvent<TAuthenticationToken>>();
101 : }
102 : IEnumerable<EventData> query = events
103 : .Where(eventData => eventData.Version > fromVersion)
104 : .OrderByDescending(eventData => eventData.Version);
105 :
106 : if (useLastEventOnly)
107 : query = query.AsQueryable().Take(1);
108 :
109 : return query
110 : .Select(EventDeserialiser.Deserialise)
111 : .ToList();
112 : }
113 :
114 : /// <summary>
115 : /// Get all <see cref="IEvent{TAuthenticationToken}"/> instances for the given <paramref name="correlationId"/>.
116 : /// </summary>
117 : /// <param name="correlationId">The <see cref="IMessage.CorrelationId"/> of the <see cref="IEvent{TAuthenticationToken}"/> instances to retrieve.</param>
118 1 : public override IEnumerable<EventData> Get(Guid correlationId)
119 : {
120 : if (!EventStoreByCorrelationId.Contains(correlationId.ToString("N")))
121 : {
122 : Logger.LogDebug(string.Format("The event store has no items by the correlationId '{0:N}'.", correlationId));
123 : return Enumerable.Empty<EventData>();
124 : }
125 :
126 : CacheItem item = EventStoreByCorrelationId.GetCacheItem(correlationId.ToString("N"));
127 : if (item == null)
128 : {
129 : Logger.LogDebug(string.Format("The event store had some items by the correlationId '{0:N}' but doesn't now.", correlationId));
130 : return Enumerable.Empty<EventData>();
131 : }
132 :
133 : var events = item.Value as IEnumerable<EventData>;
134 : if (events == null)
135 : {
136 : if (item.Value == null)
137 : Logger.LogDebug(string.Format("The event store had some items by the correlationId '{0:N}' but it was null.", correlationId));
138 : else
139 : Logger.LogWarning(string.Format("The event store had some items by the correlationId '{0:N}' but it was of type {1}.", correlationId, item.Value.GetType()));
140 : return Enumerable.Empty<EventData>();
141 : }
142 : IEnumerable<EventData> query = events.OrderBy(eventData => eventData.Timestamp);
143 :
144 : return query.ToList();
145 : }
146 :
147 : /// <summary>
148 : /// Persist the provided <paramref name="eventData"/> into storage.
149 : /// </summary>
150 : /// <param name="eventData">The <see cref="EventData"/> to persist.</param>
151 1 : protected override void PersistEvent(EventData eventData)
152 : {
153 : IList<EventData> events = new List<EventData>();
154 :
155 : // By correlationId first
156 : Guid correlationId = eventData.CorrelationId;
157 : object item = EventStoreByCorrelationId.AddOrGetExisting(correlationId.ToString("N"), events, GetDetaultCacheItemPolicy());
158 : if (item != null)
159 : {
160 : events = item as IList<EventData>;
161 : if (events == null)
162 : {
163 : Logger.LogWarning(string.Format("The event store had some items by the correlationId '{0:N}' but it doesn't now.", correlationId));
164 : throw new Exception(string.Format("The event store had some items by the correlationId '{0:N}' but it doesn't now.", correlationId));
165 : }
166 : }
167 :
168 : events.Add(eventData);
169 : // Reset the variable for it's next usage
170 : events = new List<EventData>();
171 :
172 : // By type next
173 : string streamName = eventData.AggregateId;
174 : item = EventStoreByType.AddOrGetExisting(streamName, events, GetDetaultCacheItemPolicy());
175 : if (item != null)
176 : {
177 : events = item as IList<EventData>;
178 : if (events == null)
179 : {
180 : Logger.LogWarning(string.Format("The event store had an item by id '{0}' but it doesn't now.", streamName));
181 : throw new Exception(string.Format("The event store had an item by id '{0}' but it doesn't now.", streamName));
182 : }
183 : }
184 :
185 : events.Add(eventData);
186 : }
187 :
188 : #endregion
189 :
190 : /// <summary>
191 : /// Reads "Cqrs.EventStore.MemoryCacheEventStore.SlidingExpiration" from <see cref="ConfigurationManager"/>, checks if it has changed and then
192 : /// Update <see cref="SlidingExpiration"/> with the new value.
193 : /// </summary>
194 1 : protected virtual void RefreshSlidingExpirationValue()
195 : {
196 : // First refresh the EventBlackListProcessing property
197 : string slidingExpirationValue;
198 : if (!ConfigurationManager.TryGetSetting("Cqrs.EventStore.MemoryCacheEventStore.SlidingExpiration", out slidingExpirationValue))
199 : slidingExpirationValue = "0, 15, 0";
200 :
201 : if (SlidingExpirationValue != slidingExpirationValue)
202 : {
203 : string[] slidingExpirationParts = slidingExpirationValue.Split(',');
204 : if (slidingExpirationParts.Length != 3 || slidingExpirationParts.Length != 4)
205 : return;
206 :
207 : int adjuster = slidingExpirationParts.Length == 3 ? 0 : 1;
208 : int days = 0;
209 : int hours;
210 : int minutes;
211 : int seconds;
212 : if (!int.TryParse(slidingExpirationParts[0 + adjuster].Trim(), out hours))
213 : return;
214 : if (!int.TryParse(slidingExpirationParts[1 + adjuster].Trim(), out minutes))
215 : return;
216 : if (!int.TryParse(slidingExpirationParts[2 + adjuster].Trim(), out seconds))
217 : return;
218 : if (slidingExpirationParts.Length == 4)
219 : if (!int.TryParse(slidingExpirationParts[0].Trim(), out days))
220 : return;
221 : SlidingExpirationValue = slidingExpirationValue;
222 : if (slidingExpirationParts.Length == 4)
223 : SlidingExpiration = new TimeSpan(days, hours, minutes, seconds);
224 : else
225 : SlidingExpiration = new TimeSpan(hours, minutes, seconds);
226 : }
227 : }
228 :
229 : /// <summary>
230 : /// Start a <see cref="Task"/> that will call <see cref="RefreshSlidingExpirationValue"/> in a loop with a 1 second wait time between loops.
231 : /// </summary>
232 1 : protected virtual void StartRefreshSlidingExpirationValue()
233 : {
234 : Task.Factory.StartNewSafely(() =>
235 : {
236 : long loop = 0;
237 : while (true)
238 : {
239 : RefreshSlidingExpirationValue();
240 :
241 : if (loop++ % 5 == 0)
242 : Thread.Yield();
243 : else
244 : Thread.Sleep(1000);
245 : if (loop == long.MaxValue)
246 : loop = long.MinValue;
247 : }
248 : });
249 : }
250 :
251 : /// <summary>
252 : /// Get's a <see cref="CacheItemPolicy"/> with the <see cref="CacheItemPolicy.SlidingExpiration"/> set to 15 minutes
253 : /// </summary>
254 1 : protected virtual CacheItemPolicy GetDetaultCacheItemPolicy()
255 : {
256 : return new CacheItemPolicy { SlidingExpiration = SlidingExpiration };
257 : }
258 : }
259 : }
|