Line data Source code
1 : using System;
2 : using System.Collections.Concurrent;
3 : using System.Collections.Generic;
4 : using System.Diagnostics;
5 : using System.Linq;
6 : using System.Threading;
7 : using System.Threading.Tasks;
8 : using cdmdotnet.Logging;
9 : using Cqrs.Authentication;
10 : using Cqrs.Commands;
11 : using Cqrs.Configuration;
12 : using Cqrs.Events;
13 : using Cqrs.Messages;
14 :
15 : namespace Cqrs.Bus
16 : {
17 : public class BusHelper : IBusHelper
18 0 : {
19 0 : public BusHelper(IConfigurationManager configurationManager)
20 : {
21 : ConfigurationManager = configurationManager;
22 : CachedChecks = new ConcurrentDictionary<string, Tuple<bool, DateTime>>();
23 : bool isblackListRequired;
24 : if (!ConfigurationManager.TryGetSetting("Cqrs.MessageBus.BlackListProcessing", out isblackListRequired))
25 : isblackListRequired = true;
26 : EventBlackListProcessing = isblackListRequired;
27 : StartRefreshCachedChecks();
28 : }
29 :
30 : protected IConfigurationManager ConfigurationManager { get; private set; }
31 :
32 : protected IDictionary<string, Tuple<bool, DateTime>> CachedChecks { get; private set; }
33 :
34 : protected bool EventBlackListProcessing { get; private set; }
35 :
36 0 : protected virtual void RefreshCachedChecks()
37 : {
38 : // First refresh the EventBlackListProcessing property
39 : bool isblackListRequired;
40 : if (!ConfigurationManager.TryGetSetting("Cqrs.MessageBus.BlackListProcessing", out isblackListRequired))
41 : isblackListRequired = true;
42 : EventBlackListProcessing = isblackListRequired;
43 :
44 : // Now in a dictionary safe way check each key for a value.
45 : IList<string> keys = CachedChecks.Keys.ToList();
46 : foreach (string configurationKey in keys)
47 : {
48 : Tuple<bool, DateTime> pair = CachedChecks[configurationKey];
49 : bool value;
50 : // If we can't a value or there is no specific setting, remove it from the cache
51 : if (!ConfigurationManager.TryGetSetting(configurationKey, out value))
52 : CachedChecks.Remove(configurationKey);
53 : // Refresh the value and reset it's expiry if the value has changed
54 : else if (pair.Item1 != value)
55 : CachedChecks[configurationKey] = new Tuple<bool, DateTime>(value, DateTime.UtcNow);
56 : // Check it's age - by adding 20 minutes from being obtained or refreshed and if it's older than now remove it
57 : else if (pair.Item2.AddMinutes(20) < DateTime.UtcNow)
58 : CachedChecks.Remove(configurationKey);
59 : }
60 : }
61 :
62 0 : protected virtual void StartRefreshCachedChecks()
63 : {
64 : Task.Factory.StartNewSafely(() =>
65 : {
66 : long loop = 0;
67 : while (true)
68 : {
69 : RefreshCachedChecks();
70 :
71 : if (loop++%5 == 0)
72 : Thread.Yield();
73 : else
74 : Thread.Sleep(1000);
75 : if (loop == long.MaxValue)
76 : loop = long.MinValue;
77 : }
78 : });
79 : }
80 :
81 : /// <summary>
82 : /// Checks if a white-list or black-list approach is taken, then checks the <see cref="IConfigurationManager"/> to see if a key exists defining if the event is required or not.
83 : /// If the event is required and it cannot be resolved, an error will be raised.
84 : /// Otherwise the event will be marked as processed.
85 : /// </summary>
86 : /// <param name="messageType">The <see cref="Type"/> of the message being processed.</param>
87 1 : public virtual bool IsEventRequired(Type messageType)
88 : {
89 : return IsEventRequired(string.Format("{0}.IsRequired", messageType.FullName));
90 : }
91 :
92 : /// <summary>
93 : /// Checks if a white-list or black-list approach is taken, then checks the <see cref="IConfigurationManager"/> to see if a key exists defining if the event is required or not.
94 : /// If the event is required and it cannot be resolved, an error will be raised.
95 : /// Otherwise the event will be marked as processed.
96 : /// </summary>
97 : /// <param name="configurationKey">The configuration key to check.</param>
98 1 : public virtual bool IsEventRequired(string configurationKey)
99 : {
100 : Tuple<bool, DateTime> settings;
101 : bool isRequired;
102 : if (!CachedChecks.TryGetValue(configurationKey, out settings))
103 : {
104 : // If we can't a value or there is no specific setting, we default to EventBlackListProcessing
105 : if (!ConfigurationManager.TryGetSetting(configurationKey, out isRequired))
106 : isRequired = EventBlackListProcessing;
107 :
108 : // Now cache the response
109 : try
110 : {
111 : CachedChecks.Add(configurationKey, new Tuple<bool, DateTime>(isRequired, DateTime.UtcNow));
112 : }
113 : catch (ArgumentException exception)
114 : {
115 : if (exception.Message != "The key already existed in the dictionary.")
116 : throw;
117 : // It's been added since we checked... adding locks is slow, so just move on.
118 : }
119 : }
120 : // Don't refresh the expiry, we'll just update the cache every so often which is faster than constantly changing dictionary values.
121 : else
122 : isRequired = settings.Item1;
123 :
124 : return isRequired;
125 : }
126 :
127 : /// <summary>
128 : /// Build a message handler that implements telemetry capturing as well as off thread handling.
129 : /// </summary>
130 1 : public virtual Action<TMessage> BuildTelemeteredActionHandler<TMessage, TAuthenticationToken>(ITelemetryHelper telemetryHelper, Action<TMessage> handler, bool holdMessageLock, string source)
131 : where TMessage : IMessage
132 : {
133 : Action<TMessage> registerableMessageHandler = message =>
134 : {
135 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
136 : Stopwatch mainStopWatch = Stopwatch.StartNew();
137 : string responseCode = "200";
138 : bool wasSuccessfull = true;
139 :
140 : string telemetryName = message.GetType().FullName;
141 : var telemeteredMessage = message as ITelemeteredMessage;
142 : string messagePrefix = null;
143 : object authenticationToken = null;
144 : var @event = message as IEvent<TAuthenticationToken>;
145 : if (@event != null)
146 : {
147 : messagePrefix = "Event/";
148 : telemetryName = string.Format("{0}/{1}", telemetryName, @event.Id);
149 : authenticationToken = @event.AuthenticationToken;
150 : }
151 : else
152 : {
153 : var command = message as ICommand<TAuthenticationToken>;
154 : if (command != null)
155 : {
156 : messagePrefix = "Command/";
157 : telemetryName = string.Format("{0}/{1}", telemetryName, command.Id);
158 : authenticationToken = command.AuthenticationToken;
159 : }
160 : }
161 :
162 : if (telemeteredMessage != null)
163 : telemetryName = telemeteredMessage.TelemetryName;
164 :
165 : telemetryHelper.TrackEvent(string.Format("Cqrs/Handle/{0}{1}/Started", messagePrefix, telemetryName));
166 :
167 : try
168 : {
169 : handler(message);
170 : }
171 : catch (Exception exception)
172 : {
173 : telemetryHelper.TrackException(exception);
174 : wasSuccessfull = false;
175 : responseCode = "500";
176 : throw;
177 : }
178 : finally
179 : {
180 : telemetryHelper.TrackEvent(string.Format("Cqrs/Handle/{0}{1}/Finished", messagePrefix, telemetryName));
181 :
182 : mainStopWatch.Stop();
183 : if (authenticationToken is ISingleSignOnToken)
184 : telemetryHelper.TrackRequest
185 : (
186 : string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
187 : (ISingleSignOnToken)authenticationToken,
188 : startedAt,
189 : mainStopWatch.Elapsed,
190 : responseCode,
191 : wasSuccessfull,
192 : new Dictionary<string, string> { { "Type", source } }
193 : );
194 : else if (authenticationToken is Guid)
195 : telemetryHelper.TrackRequest
196 : (
197 : string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
198 : (Guid?)authenticationToken,
199 : startedAt,
200 : mainStopWatch.Elapsed,
201 : responseCode,
202 : wasSuccessfull,
203 : new Dictionary<string, string> { { "Type", source } }
204 : );
205 : else if (authenticationToken is int)
206 : telemetryHelper.TrackRequest
207 : (
208 : string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
209 : (int?)authenticationToken,
210 : startedAt,
211 : mainStopWatch.Elapsed,
212 : responseCode,
213 : wasSuccessfull,
214 : new Dictionary<string, string> { { "Type", source } }
215 : );
216 : else
217 : {
218 : string token = authenticationToken == null ? null : authenticationToken.ToString();
219 : telemetryHelper.TrackRequest
220 : (
221 : string.Format("Cqrs/Handle/{0}{1}", messagePrefix, telemetryName),
222 : token,
223 : startedAt,
224 : mainStopWatch.Elapsed,
225 : responseCode,
226 : wasSuccessfull,
227 : new Dictionary<string, string> { { "Type", source } }
228 : );
229 : }
230 :
231 : telemetryHelper.Flush();
232 : }
233 : };
234 :
235 : return BuildActionHandler(registerableMessageHandler, holdMessageLock);
236 : }
237 :
238 : /// <summary>
239 : /// Build a message handler that implements telemetry capturing as well as off thread handling.
240 : /// </summary>
241 1 : public virtual Action<TMessage> BuildActionHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock)
242 : where TMessage : IMessage
243 : {
244 : Action<TMessage> registerableMessageHandler = handler;
245 :
246 : Action<TMessage> registerableHandler = registerableMessageHandler;
247 : if (!holdMessageLock)
248 : {
249 : registerableHandler = message =>
250 : {
251 : Task.Factory.StartNewSafely(() =>
252 : {
253 : registerableMessageHandler(message);
254 : });
255 : };
256 : }
257 :
258 : return registerableHandler;
259 : }
260 : }
261 : }
|