Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Diagnostics;
13 : using System.Linq;
14 : using System.Threading;
15 : using System.Threading.Tasks;
16 : using cdmdotnet.Logging;
17 : using Cqrs.Authentication;
18 : using Cqrs.Commands;
19 : using Cqrs.Configuration;
20 : using Cqrs.Events;
21 : using Cqrs.Messages;
22 :
23 : namespace Cqrs.Bus
24 : {
25 : /// <summary>
26 : /// A helper for command and event buses that also caches <see cref="IConfigurationManager"/> look ups.
27 : /// </summary>
28 : public class BusHelper : IBusHelper
29 1 : {
30 : /// <summary>
31 : /// Instantiates a new instance of <see cref="BusHelper"/>
32 : /// </summary>
33 1 : public BusHelper(IConfigurationManager configurationManager)
34 : {
35 : ConfigurationManager = configurationManager;
36 : CachedChecks = new ConcurrentDictionary<string, Tuple<bool, DateTime>>();
37 : bool isblackListRequired;
38 : if (!ConfigurationManager.TryGetSetting("Cqrs.MessageBus.BlackListProcessing", out isblackListRequired))
39 : isblackListRequired = true;
40 : EventBlackListProcessing = isblackListRequired;
41 : StartRefreshCachedChecks();
42 : }
43 :
44 : /// <summary>
45 : /// Gets or sets the <see cref="IConfigurationManager"/>.
46 : /// </summary>
47 : protected IConfigurationManager ConfigurationManager { get; private set; }
48 :
49 : /// <summary>
50 : /// A collection of <see cref="Tuple{T1, T2}"/> holding the configurations value (always a <see cref="bool"/>) and the <see cref="DateTime"/>
51 : /// The value was last checked, keyed by it's configuration key.
52 : /// </summary>
53 : protected IDictionary<string, Tuple<bool, DateTime>> CachedChecks { get; private set; }
54 :
55 : /// <summary>
56 : /// The current value of "Cqrs.MessageBus.BlackListProcessing" from <see cref="ConfigurationManager"/>.
57 : /// </summary>
58 : protected bool EventBlackListProcessing { get; private set; }
59 :
60 : /// <summary>
61 : /// Refreshes <see cref="EventBlackListProcessing"/> and every item currently in <see cref="CachedChecks"/>.
62 : /// </summary>
63 1 : protected virtual void RefreshCachedChecks()
64 : {
65 : // First refresh the EventBlackListProcessing property
66 : bool isblackListRequired;
67 : if (!ConfigurationManager.TryGetSetting("Cqrs.MessageBus.BlackListProcessing", out isblackListRequired))
68 : isblackListRequired = true;
69 : EventBlackListProcessing = isblackListRequired;
70 :
71 : // Now in a dictionary safe way check each key for a value.
72 : IList<string> keys = CachedChecks.Keys.ToList();
73 : foreach (string configurationKey in keys)
74 : {
75 : Tuple<bool, DateTime> pair = CachedChecks[configurationKey];
76 : bool value;
77 : // If we can't a value or there is no specific setting, remove it from the cache
78 : if (!ConfigurationManager.TryGetSetting(configurationKey, out value))
79 : CachedChecks.Remove(configurationKey);
80 : // Refresh the value and reset it's expiry if the value has changed
81 : else if (pair.Item1 != value)
82 : CachedChecks[configurationKey] = new Tuple<bool, DateTime>(value, DateTime.UtcNow);
83 : // Check it's age - by adding 20 minutes from being obtained or refreshed and if it's older than now remove it
84 : else if (pair.Item2.AddMinutes(20) < DateTime.UtcNow)
85 : CachedChecks.Remove(configurationKey);
86 : }
87 : }
88 :
89 : /// <summary>
90 : /// Starts <see cref="RefreshCachedChecks"/> in a <see cref="Task"/> on a one second loop.
91 : /// </summary>
92 1 : protected virtual void StartRefreshCachedChecks()
93 : {
94 : Task.Factory.StartNewSafely(() =>
95 : {
96 : long loop = 0;
97 : while (true)
98 : {
99 : RefreshCachedChecks();
100 :
101 : if (loop++%5 == 0)
102 : Thread.Yield();
103 : else
104 : Thread.Sleep(1000);
105 : if (loop == long.MaxValue)
106 : loop = long.MinValue;
107 : }
108 : });
109 : }
110 :
111 : /// <summary>
112 : /// Checks if a white-list or black-list approach is taken, then checks the <see cref="IConfigurationManager"/> to see if a key exists defining if the event is required or not.
113 : /// If the event is required and it cannot be resolved, an error will be raised.
114 : /// Otherwise the event will be marked as processed.
115 : /// </summary>
116 : /// <param name="messageType">The <see cref="Type"/> of the message being processed.</param>
117 1 : public virtual bool IsEventRequired(Type messageType)
118 : {
119 : return IsEventRequired(string.Format("{0}.IsRequired", messageType.FullName));
120 : }
121 :
122 : /// <summary>
123 : /// Checks if a white-list or black-list approach is taken, then checks the <see cref="IConfigurationManager"/> to see if a key exists defining if the event is required or not.
124 : /// If the event is required and it cannot be resolved, an error will be raised.
125 : /// Otherwise the event will be marked as processed.
126 : /// </summary>
127 : /// <param name="configurationKey">The configuration key to check.</param>
128 1 : public virtual bool IsEventRequired(string configurationKey)
129 : {
130 : Tuple<bool, DateTime> settings;
131 : bool isRequired;
132 : if (!CachedChecks.TryGetValue(configurationKey, out settings))
133 : {
134 : // If we can't a value or there is no specific setting, we default to EventBlackListProcessing
135 : if (!ConfigurationManager.TryGetSetting(configurationKey, out isRequired))
136 : isRequired = EventBlackListProcessing;
137 :
138 : // Now cache the response
139 : try
140 : {
141 : CachedChecks.Add(configurationKey, new Tuple<bool, DateTime>(isRequired, DateTime.UtcNow));
142 : }
143 : catch (ArgumentException exception)
144 : {
145 : if (exception.Message != "The key already existed in the dictionary.")
146 : throw;
147 : // It's been added since we checked... adding locks is slow, so just move on.
148 : }
149 : }
150 : // Don't refresh the expiry, we'll just update the cache every so often which is faster than constantly changing dictionary values.
151 : else
152 : isRequired = settings.Item1;
153 :
154 : return isRequired;
155 : }
156 :
157 : /// <summary>
158 : /// Build a message handler that implements telemetry capturing as well as off thread handling.
159 : /// </summary>
160 1 : public virtual Action<TMessage> BuildTelemeteredActionHandler<TMessage, TAuthenticationToken>(ITelemetryHelper telemetryHelper, Action<TMessage> handler, bool holdMessageLock, string source)
161 : where TMessage : IMessage
162 : {
163 : Action<TMessage> registerableMessageHandler = message =>
164 : {
165 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
166 : Stopwatch mainStopWatch = Stopwatch.StartNew();
167 : string responseCode = "200";
168 : bool wasSuccessfull = true;
169 :
170 : string telemetryName = message.GetType().FullName;
171 : var telemeteredMessage = message as ITelemeteredMessage;
172 : string messagePrefix = null;
173 : object authenticationToken = null;
174 : var @event = message as IEvent<TAuthenticationToken>;
175 : if (@event != null)
176 : {
177 : messagePrefix = "Event/";
178 : telemetryName = string.Format("{0}/{1}", telemetryName, @event.Id);
179 : authenticationToken = @event.AuthenticationToken;
180 : }
181 : else
182 : {
183 : var command = message as ICommand<TAuthenticationToken>;
184 : if (command != null)
185 : {
186 : messagePrefix = "Command/";
187 : telemetryName = string.Format("{0}/{1}", telemetryName, command.Id);
188 : authenticationToken = command.AuthenticationToken;
189 : }
190 : }
191 :
192 : if (telemeteredMessage != null)
193 : telemetryName = telemeteredMessage.TelemetryName;
194 :
195 : telemetryHelper.TrackEvent(string.Format("Cqrs/Handle/{0}{1}/Started", messagePrefix, telemetryName));
196 :
197 : try
198 : {
199 : handler(message);
200 : }
201 : catch (Exception exception)
202 : {
203 : telemetryHelper.TrackException(exception);
204 : wasSuccessfull = false;
205 : responseCode = "500";
206 : throw;
207 : }
208 : finally
209 : {
210 : telemetryHelper.TrackEvent(string.Format("Cqrs/Handle/{0}{1}/Finished", messagePrefix, telemetryName));
211 :
212 : mainStopWatch.Stop();
213 : if (authenticationToken is ISingleSignOnToken)
214 : telemetryHelper.TrackRequest
215 : (
216 : string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
217 : (ISingleSignOnToken)authenticationToken,
218 : startedAt,
219 : mainStopWatch.Elapsed,
220 : responseCode,
221 : wasSuccessfull,
222 : new Dictionary<string, string> { { "Type", source } }
223 : );
224 : else if (authenticationToken is Guid)
225 : telemetryHelper.TrackRequest
226 : (
227 : string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
228 : (Guid?)authenticationToken,
229 : startedAt,
230 : mainStopWatch.Elapsed,
231 : responseCode,
232 : wasSuccessfull,
233 : new Dictionary<string, string> { { "Type", source } }
234 : );
235 : else if (authenticationToken is int)
236 : telemetryHelper.TrackRequest
237 : (
238 : string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
239 : (int?)authenticationToken,
240 : startedAt,
241 : mainStopWatch.Elapsed,
242 : responseCode,
243 : wasSuccessfull,
244 : new Dictionary<string, string> { { "Type", source } }
245 : );
246 : else
247 : {
248 : string token = authenticationToken == null ? null : authenticationToken.ToString();
249 : telemetryHelper.TrackRequest
250 : (
251 : string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
252 : token,
253 : startedAt,
254 : mainStopWatch.Elapsed,
255 : responseCode,
256 : wasSuccessfull,
257 : new Dictionary<string, string> { { "Type", source } }
258 : );
259 : }
260 :
261 : telemetryHelper.Flush();
262 : }
263 : };
264 :
265 : return BuildActionHandler(registerableMessageHandler, holdMessageLock);
266 : }
267 :
268 : /// <summary>
269 : /// Build a message handler that implements telemetry capturing as well as off thread handling.
270 : /// </summary>
271 1 : public virtual Action<TMessage> BuildActionHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock)
272 : where TMessage : IMessage
273 : {
274 : Action<TMessage> registerableMessageHandler = handler;
275 :
276 : Action<TMessage> registerableHandler = registerableMessageHandler;
277 : if (!holdMessageLock)
278 : {
279 : registerableHandler = message =>
280 : {
281 : Task.Factory.StartNewSafely(() =>
282 : {
283 : registerableMessageHandler(message);
284 : });
285 : };
286 : }
287 :
288 : return registerableHandler;
289 : }
290 : }
291 : }
|