Line data Source code
1 : #region IMPORTANT NOTE
2 : // This is copied almost exactly into the eventhub except for a string difference. Replicate changes there until a refactor is done.
3 : #endregion
4 :
5 : #region Copyright
6 : // // -----------------------------------------------------------------------
7 : // // <copyright company="cdmdotnet Limited">
8 : // // Copyright cdmdotnet Limited. All rights reserved.
9 : // // </copyright>
10 : // // -----------------------------------------------------------------------
11 : #endregion
12 :
13 : using System;
14 : using System.Collections.Generic;
15 : using System.Diagnostics;
16 : using System.Linq;
17 : using System.Text.RegularExpressions;
18 : using System.Threading;
19 : using System.Threading.Tasks;
20 : using cdmdotnet.Logging;
21 : using Cqrs.Authentication;
22 : using Cqrs.Bus;
23 : using Cqrs.Commands;
24 : using Cqrs.Configuration;
25 : using Cqrs.Events;
26 : using Cqrs.Messages;
27 : using Microsoft.ServiceBus.Messaging;
28 : using Newtonsoft.Json;
29 :
30 : namespace Cqrs.Azure.ServiceBus
31 : {
32 : public class AzureBusHelper<TAuthenticationToken> : IAzureBusHelper<TAuthenticationToken>
33 0 : {
34 0 : public AzureBusHelper(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IBusHelper busHelper, IConfigurationManager configurationManager, IDependencyResolver dependencyResolver)
35 : {
36 : AuthenticationTokenHelper = authenticationTokenHelper;
37 : CorrelationIdHelper = correlationIdHelper;
38 : Logger = logger;
39 : MessageSerialiser = messageSerialiser;
40 : BusHelper = busHelper;
41 : DependencyResolver = dependencyResolver;
42 : ConfigurationManager = configurationManager;
43 : }
44 :
45 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
46 :
47 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
48 :
49 : protected ILogger Logger { get; private set; }
50 :
51 : protected IMessageSerialiser<TAuthenticationToken> MessageSerialiser { get; private set; }
52 :
53 : protected IBusHelper BusHelper { get; private set; }
54 :
55 : protected IConfigurationManager ConfigurationManager { get; private set; }
56 :
57 : protected IDependencyResolver DependencyResolver { get; private set; }
58 :
59 0 : public virtual void PrepareCommand<TCommand>(TCommand command, string framework)
60 : where TCommand : ICommand<TAuthenticationToken>
61 : {
62 : if (command.AuthenticationToken == null || command.AuthenticationToken.Equals(default(TAuthenticationToken)))
63 : command.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
64 : command.CorrelationId = CorrelationIdHelper.GetCorrelationId();
65 :
66 : if (string.IsNullOrWhiteSpace(command.OriginatingFramework))
67 : command.OriginatingFramework = framework;
68 : var frameworks = new List<string>();
69 : if (command.Frameworks != null)
70 : frameworks.AddRange(command.Frameworks);
71 : frameworks.Add("Azure-ServiceBus");
72 : command.Frameworks = frameworks;
73 : }
74 :
75 0 : public virtual bool PrepareAndValidateCommand<TCommand>(TCommand command, string framework)
76 : where TCommand : ICommand<TAuthenticationToken>
77 : {
78 : Type commandType = command.GetType();
79 :
80 : if (command.Frameworks != null && command.Frameworks.Contains(framework))
81 : {
82 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
83 : if (command.Frameworks.Count() != 1)
84 : {
85 : Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
86 : return false;
87 : }
88 : }
89 :
90 : ICommandValidator<TAuthenticationToken, TCommand> commandValidator = null;
91 : try
92 : {
93 : commandValidator = DependencyResolver.Resolve<ICommandValidator<TAuthenticationToken, TCommand>>();
94 : }
95 : catch (Exception exception)
96 : {
97 : Logger.LogDebug("Locating an ICommandValidator failed.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName), exception);
98 : }
99 :
100 : if (commandValidator != null && !commandValidator.IsCommandValid(command))
101 : {
102 : Logger.LogInfo("The provided command is not valid.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, commandType.FullName));
103 : return false;
104 : }
105 :
106 : PrepareCommand(command, framework);
107 : return true;
108 : }
109 :
110 0 : public virtual ICommand<TAuthenticationToken> ReceiveCommand(string messageBody, Func<ICommand<TAuthenticationToken>, bool?> receiveCommandHandler, string messageId, Action skippedAction = null, Action lockRefreshAction = null)
111 : {
112 : ICommand<TAuthenticationToken> command;
113 : try
114 : {
115 : command = MessageSerialiser.DeserialiseCommand(messageBody);
116 : }
117 : catch (JsonSerializationException exception)
118 : {
119 : JsonSerializationException checkException = exception;
120 : bool safeToExit = false;
121 : do
122 : {
123 : if (checkException.Message.StartsWith("Could not load assembly"))
124 : {
125 : safeToExit = true;
126 : break;
127 : }
128 : } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
129 : if (safeToExit)
130 : {
131 : const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
132 : Match match = new Regex(pattern).Match(exception.Message);
133 : if (match.Success)
134 : {
135 : string[] typeParts = match.Value.Split(',');
136 : if (typeParts.Length == 2)
137 : {
138 : string classType = typeParts[0];
139 : bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
140 :
141 : if (!isRequired)
142 : {
143 : if (skippedAction != null)
144 : skippedAction();
145 : return null;
146 : }
147 : }
148 : }
149 : }
150 : throw;
151 : }
152 :
153 : string commandTypeName = command.GetType().FullName;
154 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
155 : Logger.LogInfo(string.Format("A command message arrived with the {0} was of type {1}.", messageId, commandTypeName));
156 :
157 : bool canRefresh;
158 : if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", commandTypeName), out canRefresh))
159 : canRefresh = false;
160 :
161 : if (canRefresh)
162 : {
163 : if (lockRefreshAction == null)
164 : Logger.LogWarning(string.Format("A command message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, commandTypeName));
165 : else
166 : lockRefreshAction();
167 : }
168 :
169 : // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
170 : bool? result = receiveCommandHandler(command);
171 : if (result != null && !result.Value)
172 : if (skippedAction != null)
173 : skippedAction();
174 :
175 : return command;
176 : }
177 :
178 : /// <returns>
179 : /// True indicates the <paramref name="command"/> was successfully handled by a handler.
180 : /// False indicates the <paramref name="command"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
181 : /// Null indicates the command<paramref name="command"/> wasn't handled as it was already handled.
182 : /// </returns>
183 2 : public virtual bool? DefaultReceiveCommand(ICommand<TAuthenticationToken> command, RouteManager routeManager, string framework)
184 : {
185 : Type commandType = command.GetType();
186 :
187 : if (command.Frameworks != null && command.Frameworks.Contains(framework))
188 : {
189 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
190 : if (command.Frameworks.Count() != 1)
191 : {
192 : Logger.LogInfo("The provided command has already been processed in Azure.", string.Format("{0}\\DefaultReceiveCommand({1})", GetType().FullName, commandType.FullName));
193 : return null;
194 : }
195 : }
196 :
197 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
198 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
199 :
200 : bool isRequired = BusHelper.IsEventRequired(commandType);
201 :
202 : RouteHandlerDelegate commandHandler = routeManager.GetSingleHandler(command, isRequired);
203 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
204 : if (commandHandler == null)
205 : {
206 : Logger.LogDebug(string.Format("The command handler for '{0}' is not required.", commandType.FullName));
207 : return false;
208 : }
209 :
210 : Action<IMessage> handler = commandHandler.Delegate;
211 : handler(command);
212 : return true;
213 : }
214 :
215 0 : public virtual void PrepareEvent<TEvent>(TEvent @event, string framework)
216 : where TEvent : IEvent<TAuthenticationToken>
217 : {
218 : if (@event.AuthenticationToken == null || @event.AuthenticationToken.Equals(default(TAuthenticationToken)))
219 : @event.AuthenticationToken = AuthenticationTokenHelper.GetAuthenticationToken();
220 : @event.CorrelationId = CorrelationIdHelper.GetCorrelationId();
221 : @event.TimeStamp = DateTimeOffset.UtcNow;
222 :
223 : if (string.IsNullOrWhiteSpace(@event.OriginatingFramework))
224 : @event.OriginatingFramework = framework;
225 : var frameworks = new List<string>();
226 : if (@event.Frameworks != null)
227 : frameworks.AddRange(@event.Frameworks);
228 : frameworks.Add(framework);
229 : @event.Frameworks = frameworks;
230 : }
231 :
232 0 : public virtual bool PrepareAndValidateEvent<TEvent>(TEvent @event, string framework)
233 : where TEvent : IEvent<TAuthenticationToken>
234 : {
235 : Type eventType = @event.GetType();
236 :
237 : if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
238 : {
239 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
240 : if (@event.Frameworks.Count() != 1)
241 : {
242 : Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\PrepareAndValidateEvent({1})", GetType().FullName, eventType.FullName));
243 : return false;
244 : }
245 : }
246 :
247 : PrepareEvent(@event, framework);
248 : return true;
249 : }
250 :
251 0 : public virtual IEvent<TAuthenticationToken> ReceiveEvent(string messageBody, Func<IEvent<TAuthenticationToken>, bool?> receiveEventHandler, string messageId, Action skippedAction = null, Action lockRefreshAction = null)
252 : {
253 : IEvent<TAuthenticationToken> @event;
254 : try
255 : {
256 : @event = MessageSerialiser.DeserialiseEvent(messageBody);
257 : }
258 : catch (JsonSerializationException exception)
259 : {
260 : JsonSerializationException checkException = exception;
261 : bool safeToExit = false;
262 : do
263 : {
264 : if (checkException.Message.StartsWith("Could not load assembly"))
265 : {
266 : safeToExit = true;
267 : break;
268 : }
269 : } while ((checkException = checkException.InnerException as JsonSerializationException) != null);
270 : if (safeToExit)
271 : {
272 : const string pattern = @"(?<=^Error resolving type specified in JSON ').+?(?='\. Path '\$type')";
273 : Match match = new Regex(pattern).Match(exception.Message);
274 : if (match.Success)
275 : {
276 : string[] typeParts = match.Value.Split(',');
277 : if (typeParts.Length == 2)
278 : {
279 : string classType = typeParts[0];
280 : bool isRequired = BusHelper.IsEventRequired(string.Format("{0}.IsRequired", classType));
281 :
282 : if (!isRequired)
283 : {
284 : if (skippedAction != null)
285 : skippedAction();
286 : return null;
287 : }
288 : }
289 : }
290 : }
291 : throw;
292 : }
293 :
294 : string eventTypeName = @event.GetType().FullName;
295 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
296 : Logger.LogInfo(string.Format("An event message arrived with the {0} was of type {1}.", messageId, eventTypeName));
297 :
298 : bool canRefresh;
299 : if (!ConfigurationManager.TryGetSetting(string.Format("{0}.ShouldRefresh", eventTypeName), out canRefresh))
300 : canRefresh = false;
301 :
302 : if (canRefresh)
303 : {
304 : if (lockRefreshAction == null)
305 : Logger.LogWarning(string.Format("An event message arrived with the {0} was of type {1} and was destined to support lock renewal, but no action was provided.", messageId, eventTypeName));
306 : else
307 : lockRefreshAction();
308 : }
309 :
310 : // a false response means the action wasn't handled, but didn't throw an error, so we assume, by convention, that this means it was skipped.
311 : bool? result = receiveEventHandler(@event);
312 : if (result != null && !result.Value)
313 : if (skippedAction != null)
314 : skippedAction();
315 :
316 : return @event;
317 : }
318 :
319 0 : public virtual void RefreshLock(CancellationTokenSource brokeredMessageRenewCancellationTokenSource, BrokeredMessage message, string type = "message")
320 : {
321 : Task.Factory.StartNewSafely(() =>
322 : {
323 : // The capturing of ObjectDisposedException is because even the properties can throw it.
324 : try
325 : {
326 : long loop = long.MinValue;
327 : while (!brokeredMessageRenewCancellationTokenSource.Token.IsCancellationRequested)
328 : {
329 : // Based on LockedUntilUtc property to determine if the lock expires soon
330 : // We lock for 45 seconds to ensure any thread based issues are mitigated.
331 : if (DateTime.UtcNow > message.LockedUntilUtc.AddSeconds(-45))
332 : {
333 : // If so, renew the lock
334 : for (int i = 0; i < 10; i++)
335 : {
336 : try
337 : {
338 : message.RenewLock();
339 : try
340 : {
341 : Logger.LogDebug(string.Format("Renewed the lock on {1} '{0}'.", message.MessageId, type));
342 : }
343 : catch
344 : {
345 : Trace.TraceError("Renewed the lock on {1} '{0}'.", message.MessageId, type);
346 : }
347 :
348 : break;
349 : }
350 : catch (ObjectDisposedException)
351 : {
352 : return;
353 : }
354 : catch (MessageLockLostException exception)
355 : {
356 : try
357 : {
358 : Logger.LogWarning(string.Format("Renewing the lock on {1} '{0}' failed as the message lock was lost.", message.MessageId, type), exception: exception);
359 : }
360 : catch
361 : {
362 : Trace.TraceError("Renewing the lock on {1} '{0}' failed as the message lock was lost.\r\n{2}", message.MessageId, type, exception.Message);
363 : }
364 : return;
365 : }
366 : catch (Exception exception)
367 : {
368 : try
369 : {
370 : Logger.LogWarning(string.Format("Renewing the lock on {1} '{0}' failed.", message.MessageId, type), exception: exception);
371 : }
372 : catch
373 : {
374 : Trace.TraceError("Renewing the lock on {1} '{0}' failed.\r\n{2}", message.MessageId, type, exception.Message);
375 : }
376 : if (i == 9)
377 : return;
378 : }
379 : }
380 : }
381 :
382 : if (loop++ % 5 == 0)
383 : Thread.Yield();
384 : else
385 : Thread.Sleep(500);
386 : if (loop == long.MaxValue)
387 : loop = long.MinValue;
388 : }
389 : try
390 : {
391 : brokeredMessageRenewCancellationTokenSource.Dispose();
392 : }
393 : catch (ObjectDisposedException) { }
394 : }
395 : catch (ObjectDisposedException) { }
396 : }, brokeredMessageRenewCancellationTokenSource.Token);
397 : }
398 :
399 : /// <returns>
400 : /// True indicates the <paramref name="event"/> was successfully handled by a handler.
401 : /// False indicates the <paramref name="event"/> wasn't handled, but didn't throw an error, so by convention, that means it was skipped.
402 : /// Null indicates the <paramref name="event"/> wasn't handled as it was already handled.
403 : /// </returns>
404 2 : public virtual bool? DefaultReceiveEvent(IEvent<TAuthenticationToken> @event, RouteManager routeManager, string framework)
405 : {
406 : Type eventType = @event.GetType();
407 :
408 : if (@event.Frameworks != null && @event.Frameworks.Contains(framework))
409 : {
410 : // if this is the only framework in the list, then it's fine to handle as it's just pre-stamped, if there is more than one framework, then exit.
411 : if (@event.Frameworks.Count() != 1)
412 : {
413 : Logger.LogInfo("The provided event has already been processed in Azure.", string.Format("{0}\\DefaultReceiveEvent({1})", GetType().FullName, eventType.FullName));
414 : return null;
415 : }
416 : }
417 :
418 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
419 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
420 :
421 : bool isRequired = BusHelper.IsEventRequired(eventType);
422 :
423 : IEnumerable<Action<IMessage>> handlers = routeManager.GetHandlers(@event, isRequired).Select(x => x.Delegate).ToList();
424 : // This check doesn't require an isRequired check as there will be an exception raised above and handled below.
425 : if (!handlers.Any())
426 : {
427 : Logger.LogDebug(string.Format("The event handler for '{0}' is not required.", eventType.FullName));
428 : return false;
429 : }
430 :
431 : foreach (Action<IMessage> handler in handlers)
432 : handler(@event);
433 : return true;
434 : }
435 :
436 0 : public virtual void RegisterHandler<TMessage>(ITelemetryHelper telemetryHelper, RouteManager routeManger, Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
437 : where TMessage : IMessage
438 : {
439 : Action<TMessage> registerableHandler = BusHelper.BuildActionHandler(handler, holdMessageLock);
440 :
441 : routeManger.RegisterHandler(registerableHandler, targetedType);
442 :
443 : telemetryHelper.TrackEvent(string.Format("Cqrs/RegisterHandler/{0}", typeof(TMessage).FullName), new Dictionary<string, string> { { "Type", "Azure/Bus" } });
444 : telemetryHelper.Flush();
445 : }
446 : }
447 : }
|