Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Diagnostics;
13 : using System.Linq;
14 : using System.Threading;
15 : using System.Threading.Tasks;
16 : using cdmdotnet.Logging;
17 : using cdmdotnet.StateManagement;
18 : using Cqrs.Authentication;
19 : using Cqrs.Commands;
20 : using Cqrs.Configuration;
21 : using Cqrs.Events;
22 : using Cqrs.Messages;
23 :
24 : namespace Cqrs.Bus
25 : {
26 : /// <summary>
27 : /// A helper for command and event buses that also caches <see cref="IConfigurationManager"/> look ups.
28 : /// </summary>
29 : public class BusHelper : IBusHelper
30 1 : {
31 : /// <summary>
32 : /// Instantiates a new instance of <see cref="BusHelper"/>
33 : /// </summary>
34 1 : public BusHelper(IConfigurationManager configurationManager, IContextItemCollectionFactory factory)
35 : {
36 : Cache = factory.GetCurrentContext();
37 : ConfigurationManager = configurationManager;
38 : CachedChecks = new ConcurrentDictionary<string, Tuple<bool, DateTime>>();
39 : NullableCachedChecks = new ConcurrentDictionary<string, Tuple<bool?, DateTime>>();
40 : bool isblackListRequired;
41 : if (!ConfigurationManager.TryGetSetting("Cqrs.MessageBus.BlackListProcessing", out isblackListRequired))
42 : isblackListRequired = true;
43 : EventBlackListProcessing = isblackListRequired;
44 : StartRefreshCachedChecks();
45 : }
46 :
47 : /// <summary>
48 : /// Gets or sets the <see cref="IConfigurationManager"/>.
49 : /// </summary>
50 : protected IConfigurationManager ConfigurationManager { get; private set; }
51 :
52 : /// <summary>
53 : /// A collection of <see cref="Tuple{T1, T2}"/> holding the configurations value (always a <see cref="bool"/>) and the <see cref="DateTime"/>
54 : /// The value was last checked, keyed by it's configuration key.
55 : /// </summary>
56 : protected IDictionary<string, Tuple<bool, DateTime>> CachedChecks { get; private set; }
57 :
58 : /// <summary>
59 : /// A collection of <see cref="Tuple{T1, T2}"/> holding the configurations value (always a <see cref="bool"/>) and the <see cref="DateTime"/>
60 : /// The value was last checked, keyed by it's configuration key.
61 : /// </summary>
62 : protected IDictionary<string, Tuple<bool?, DateTime>> NullableCachedChecks { get; private set; }
63 :
64 : /// <summary>
65 : /// The current value of "Cqrs.MessageBus.BlackListProcessing" from <see cref="ConfigurationManager"/>.
66 : /// </summary>
67 : protected bool EventBlackListProcessing { get; private set; }
68 :
69 : /// <summary>
70 : /// Refreshes <see cref="EventBlackListProcessing"/> and every item currently in <see cref="CachedChecks"/>.
71 : /// </summary>
72 1 : protected virtual void RefreshCachedChecks()
73 : {
74 : // First refresh the EventBlackListProcessing property
75 : bool isblackListRequired;
76 : if (!ConfigurationManager.TryGetSetting("Cqrs.MessageBus.BlackListProcessing", out isblackListRequired))
77 : isblackListRequired = true;
78 : EventBlackListProcessing = isblackListRequired;
79 :
80 : // Now in a dictionary safe way check each key for a value.
81 : IList<string> keys = CachedChecks.Keys.ToList();
82 : foreach (string configurationKey in keys)
83 : {
84 : Tuple<bool, DateTime> pair = CachedChecks[configurationKey];
85 : bool value;
86 : // If we can't a value or there is no specific setting, remove it from the cache
87 : if (!ConfigurationManager.TryGetSetting(configurationKey, out value))
88 : CachedChecks.Remove(configurationKey);
89 : // Refresh the value and reset it's expiry if the value has changed
90 : else if (pair.Item1 != value)
91 : CachedChecks[configurationKey] = new Tuple<bool, DateTime>(value, DateTime.UtcNow);
92 : // Check it's age - by adding 20 minutes from being obtained or refreshed and if it's older than now remove it
93 : else if (pair.Item2.AddMinutes(20) < DateTime.UtcNow)
94 : CachedChecks.Remove(configurationKey);
95 : }
96 :
97 : // Now in a dictionary safe way check each key for a value.
98 : keys = NullableCachedChecks.Keys.ToList();
99 : foreach (string configurationKey in keys)
100 : {
101 : Tuple<bool?, DateTime> pair = NullableCachedChecks[configurationKey];
102 : // Check it's age - by adding 20 minutes from being obtained or refreshed and if it's older than now remove it
103 : if (pair.Item2.AddMinutes(20) < DateTime.UtcNow)
104 : NullableCachedChecks.Remove(configurationKey);
105 : }
106 : }
107 :
108 : /// <summary>
109 : /// Starts <see cref="RefreshCachedChecks"/> in a <see cref="Task"/> on a one second loop.
110 : /// </summary>
111 1 : protected virtual void StartRefreshCachedChecks()
112 : {
113 : Task.Factory.StartNewSafely(() =>
114 : {
115 : long loop = 0;
116 : while (true)
117 : {
118 : RefreshCachedChecks();
119 :
120 : if (loop++%5 == 0)
121 : Thread.Yield();
122 : else
123 : Thread.Sleep(1000);
124 : if (loop == long.MaxValue)
125 : loop = long.MinValue;
126 : }
127 : });
128 : }
129 :
130 : /// <summary>
131 : /// Checks if a white-list or black-list approach is taken, then checks the <see cref="IConfigurationManager"/> to see if a key exists defining if the event is required or not.
132 : /// If the event is required and it cannot be resolved, an error will be raised.
133 : /// Otherwise the event will be marked as processed.
134 : /// </summary>
135 : /// <param name="messageType">The <see cref="Type"/> of the message being processed.</param>
136 1 : public virtual bool IsEventRequired(Type messageType)
137 : {
138 : return IsEventRequired(string.Format("{0}.IsRequired", messageType.FullName));
139 : }
140 :
141 : /// <summary>
142 : /// Checks if a white-list or black-list approach is taken, then checks the <see cref="IConfigurationManager"/> to see if a key exists defining if the event is required or not.
143 : /// If the event is required and it cannot be resolved, an error will be raised.
144 : /// Otherwise the event will be marked as processed.
145 : /// </summary>
146 : /// <param name="configurationKey">The configuration key to check.</param>
147 1 : public virtual bool IsEventRequired(string configurationKey)
148 : {
149 : Tuple<bool, DateTime> settings;
150 : bool isRequired;
151 : if (!CachedChecks.TryGetValue(configurationKey, out settings))
152 : {
153 : // If we can't find a value or there is no specific setting, we default to EventBlackListProcessing
154 : if (!ConfigurationManager.TryGetSetting(configurationKey, out isRequired))
155 : isRequired = EventBlackListProcessing;
156 :
157 : // Now cache the response
158 : try
159 : {
160 : CachedChecks.Add(configurationKey, new Tuple<bool, DateTime>(isRequired, DateTime.UtcNow));
161 : }
162 : catch (ArgumentException exception)
163 : {
164 : if (exception.Message != "The key already existed in the dictionary.")
165 : throw;
166 : // It's been added since we checked... adding locks is slow, so just move on.
167 : }
168 : }
169 : // Don't refresh the expiry, we'll just update the cache every so often which is faster than constantly changing dictionary values.
170 : else
171 : isRequired = settings.Item1;
172 :
173 : return isRequired;
174 : }
175 :
176 : /// <summary>
177 : /// Checks if the private bus is required to send the message. Note, this does not imply the public bus is not required as well.
178 : /// </summary>
179 : /// <param name="messageType">The <see cref="Type"/> of the message being processed.</param>
180 : /// <returns>Null for unconfigured, True for private bus transmission, false otherwise.</returns>
181 1 : public virtual bool? IsPrivateBusRequired(Type messageType)
182 : {
183 : return IsABusRequired(messageType, false);
184 : }
185 :
186 : /// <summary>
187 : /// Checks if the public bus is required to send the message. Note, this does not imply the public bus is not required as well.
188 : /// </summary>
189 : /// <param name="messageType">The <see cref="Type"/> of the message being processed.</param>
190 : /// <returns>Null for unconfigured, True for private bus transmission, false otherwise.</returns>
191 1 : public virtual bool? IsPublicBusRequired(Type messageType)
192 : {
193 : return IsABusRequired(messageType, true);
194 : }
195 :
196 : /// <summary>
197 : /// Checks if the particular bus is required to send the message. Note, this does not imply the public bus is not required as well.
198 : /// </summary>
199 : /// <param name="messageType">The <see cref="Type"/> of the message being processed.</param>
200 : /// <param name="checkPublic">Check for the public or private bus.</param>
201 : /// <returns>Null for unconfigured, True for a particular bus transmission, false otherwise.</returns>
202 1 : protected virtual bool? IsABusRequired(Type messageType, bool checkPublic)
203 : {
204 : string configurationKey = string.Format(checkPublic ? "{0}.IsPublicBusRequired" : "{0}.IsPrivateBusRequired", messageType.FullName);
205 : Tuple<bool?, DateTime> settings;
206 : bool? isRequired;
207 : if (!NullableCachedChecks.TryGetValue(configurationKey, out settings))
208 : {
209 : bool isRequired1;
210 : // Check if there is a cached value
211 : if (ConfigurationManager.TryGetSetting(configurationKey, out isRequired1))
212 : isRequired = isRequired1;
213 : // If not, check the attributes
214 : else if (checkPublic)
215 : {
216 : var eventAttribute = Attribute.GetCustomAttribute(messageType, typeof(PublicEventAttribute)) as PublicEventAttribute;
217 : isRequired = eventAttribute == null ? (bool?) null : true;
218 : }
219 : // If not, check the attributes
220 : else
221 : {
222 : var eventAttribute = Attribute.GetCustomAttribute(messageType, typeof(PrivateEventAttribute)) as PrivateEventAttribute;
223 : isRequired = eventAttribute == null ? (bool?)null : true;
224 : }
225 :
226 : // Now cache the response
227 : try
228 : {
229 : NullableCachedChecks.Add(configurationKey, new Tuple<bool?, DateTime>(isRequired, DateTime.UtcNow));
230 : }
231 : catch (ArgumentException exception)
232 : {
233 : if (exception.Message != "The key already existed in the dictionary.")
234 : throw;
235 : // It's been added since we checked... adding locks is slow, so just move on.
236 : }
237 : }
238 : // Don't refresh the expiry, we'll just update the cache every so often which is faster than constantly changing dictionary values.
239 : else
240 : isRequired = settings.Item1;
241 :
242 : // If all the above is still not difinitive, react to the bus the originating message was received on, but we only need to check for private.
243 : // We do this here so caching is atleast used, but this cannot be cached as that would be wrong
244 : if (isRequired == null && !checkPublic)
245 : if (GetWasPrivateBusUsed())
246 : return true;
247 :
248 : return isRequired;
249 : }
250 :
251 : /// <summary>
252 : /// Build a message handler that implements telemetry capturing as well as off thread handling.
253 : /// </summary>
254 1 : public virtual Action<TMessage> BuildTelemeteredActionHandler<TMessage, TAuthenticationToken>(ITelemetryHelper telemetryHelper, Action<TMessage> handler, bool holdMessageLock, string source)
255 : where TMessage : IMessage
256 : {
257 : Action<TMessage> registerableMessageHandler = message =>
258 : {
259 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
260 : Stopwatch mainStopWatch = Stopwatch.StartNew();
261 : string responseCode = "200";
262 : bool wasSuccessfull = true;
263 :
264 : string telemetryName = message.GetType().FullName;
265 : var telemeteredMessage = message as ITelemeteredMessage;
266 : string messagePrefix = null;
267 : object authenticationToken = null;
268 : var @event = message as IEvent<TAuthenticationToken>;
269 : if (@event != null)
270 : {
271 : messagePrefix = "Event/";
272 : telemetryName = string.Format("{0}/{1}/{2}", telemetryName, @event.GetIdentity(), @event.Id);
273 : authenticationToken = @event.AuthenticationToken;
274 : }
275 : else
276 : {
277 : var command = message as ICommand<TAuthenticationToken>;
278 : if (command != null)
279 : {
280 : messagePrefix = "Command/";
281 : telemetryName = string.Format("{0}/{1}/{2}", telemetryName, command.GetIdentity(), command.Id);
282 : authenticationToken = command.AuthenticationToken;
283 : }
284 : }
285 :
286 : if (telemeteredMessage != null)
287 : telemetryName = telemeteredMessage.TelemetryName;
288 :
289 : telemetryHelper.TrackEvent(string.Format("Cqrs/Handle/{0}{1}/Started", messagePrefix, telemetryName));
290 :
291 : try
292 : {
293 : handler(message);
294 : }
295 : catch (Exception exception)
296 : {
297 : telemetryHelper.TrackException(exception);
298 : wasSuccessfull = false;
299 : responseCode = "500";
300 : throw;
301 : }
302 : finally
303 : {
304 : telemetryHelper.TrackEvent(string.Format("Cqrs/Handle/{0}{1}/Finished", messagePrefix, telemetryName));
305 :
306 : mainStopWatch.Stop();
307 : if (authenticationToken is ISingleSignOnToken)
308 : telemetryHelper.TrackRequest
309 : (
310 : string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
311 : (ISingleSignOnToken)authenticationToken,
312 : startedAt,
313 : mainStopWatch.Elapsed,
314 : responseCode,
315 : wasSuccessfull,
316 : new Dictionary<string, string> { { "Type", source } }
317 : );
318 : else if (authenticationToken is Guid)
319 : telemetryHelper.TrackRequest
320 : (
321 : string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
322 : (Guid?)authenticationToken,
323 : startedAt,
324 : mainStopWatch.Elapsed,
325 : responseCode,
326 : wasSuccessfull,
327 : new Dictionary<string, string> { { "Type", source } }
328 : );
329 : else if (authenticationToken is int)
330 : telemetryHelper.TrackRequest
331 : (
332 : string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
333 : (int?)authenticationToken,
334 : startedAt,
335 : mainStopWatch.Elapsed,
336 : responseCode,
337 : wasSuccessfull,
338 : new Dictionary<string, string> { { "Type", source } }
339 : );
340 : else
341 : {
342 : string token = authenticationToken == null ? null : authenticationToken.ToString();
343 : telemetryHelper.TrackRequest
344 : (
345 : string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
346 : token,
347 : startedAt,
348 : mainStopWatch.Elapsed,
349 : responseCode,
350 : wasSuccessfull,
351 : new Dictionary<string, string> { { "Type", source } }
352 : );
353 : }
354 :
355 : telemetryHelper.Flush();
356 : }
357 : };
358 :
359 : return BuildActionHandler(registerableMessageHandler, holdMessageLock);
360 : }
361 :
362 : /// <summary>
363 : /// Build a message handler that implements telemetry capturing as well as off thread handling.
364 : /// </summary>
365 1 : public virtual Action<TMessage> BuildActionHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock)
366 : where TMessage : IMessage
367 : {
368 : Action<TMessage> registerableMessageHandler = handler;
369 :
370 : Action<TMessage> registerableHandler = registerableMessageHandler;
371 : if (!holdMessageLock)
372 : {
373 : registerableHandler = message =>
374 : {
375 : Task.Factory.StartNewSafely(() =>
376 : {
377 : registerableMessageHandler(message);
378 : });
379 : };
380 : }
381 :
382 : return registerableHandler;
383 : }
384 :
385 : /// <summary>
386 : /// The key used to store the authentication token in the <see cref="Cache"/>.
387 : /// </summary>
388 : protected string CacheKey = "WasPrivateBusUsed";
389 :
390 : /// <summary>
391 : /// Get or set the Cache.
392 : /// </summary>
393 : protected IContextItemCollection Cache { get; private set; }
394 :
395 : /// <summary>
396 : /// Indicates if the message was received via the private bus or not. If false, this implies the public was use used.
397 : /// </summary>
398 1 : public bool GetWasPrivateBusUsed()
399 : {
400 : try
401 : {
402 : return Cache.GetData<bool>(CacheKey);
403 : }
404 : catch
405 : {
406 : return false;
407 : }
408 : }
409 :
410 : /// <summary>
411 : /// Set whether the message was received via the private bus or not. If false, this indicates the public was use used.
412 : /// </summary>
413 1 : public bool SetWasPrivateBusUsed(bool wasPrivate)
414 : {
415 : return Cache.SetData(CacheKey, wasPrivate);
416 : }
417 : }
418 : }
|