Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Globalization;
13 : using System.Linq;
14 : using System.Threading;
15 : using System.Threading.Tasks;
16 : using Cqrs.Authentication;
17 : using Cqrs.Bus;
18 : using Cqrs.Configuration;
19 : using Cqrs.Events;
20 : using cdmdotnet.Logging;
21 : using Cqrs.Messages;
22 : using Microsoft.ServiceBus;
23 : using Microsoft.ServiceBus.Messaging;
24 :
25 : namespace Cqrs.Azure.ServiceBus
26 : {
27 : /// <summary>
28 : /// A <see cref="IEventReceiver{TAuthenticationToken}"/> that receives network messages, resolves handlers and executes the handler.
29 : /// </summary>
30 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
31 : // The “,nq” suffix here just asks the expression evaluator to remove the quotes when displaying the final value (nq = no quotes).
32 : [DebuggerDisplay("{DebuggerDisplay,nq}")]
33 : public class AzureEventBusReceiver<TAuthenticationToken>
34 : : AzureEventBus<TAuthenticationToken>
35 : , IEventHandlerRegistrar
36 : , IEventReceiver<TAuthenticationToken>
37 2 : {
38 : /// <summary>
39 : /// The configuration key for
40 : /// the number of receiver <see cref="SubscriptionClient"/> instances to create
41 : /// as used by <see cref="IConfigurationManager"/>.
42 : /// </summary>
43 : protected virtual string NumberOfReceiversCountConfigurationKey
44 : {
45 : get { return "Cqrs.Azure.EventBus.NumberOfReceiversCount"; }
46 : }
47 :
48 : /// <summary>
49 : /// The configuration key for
50 : /// <see cref="OnMessageOptions.MaxConcurrentCalls"/>.
51 : /// as used by <see cref="IConfigurationManager"/>.
52 : /// </summary>
53 : protected virtual string MaximumConcurrentReceiverProcessesCountConfigurationKey
54 : {
55 : get { return "Cqrs.Azure.EventBus.MaximumConcurrentReceiverProcessesCount"; }
56 : }
57 :
58 : /// <summary>
59 : /// The configuration key for
60 : /// the <see cref="SqlFilter.SqlExpression"/> that can be applied to
61 : /// the <see cref="SubscriptionClient"/> instances in the receivers
62 : /// as used by <see cref="IConfigurationManager"/>.
63 : /// </summary>
64 : protected virtual string FilterKeyConfigurationKey
65 : {
66 : get { return "Cqrs.Azure.EventBus.TopicName.SubscriptionName.Filter"; }
67 : }
68 :
69 : /// <summary>
70 : /// The <see cref="SqlFilter.SqlExpression"/> that can be applied to
71 : /// the <see cref="SubscriptionClient"/> instances in the receivers
72 : /// </summary>
73 : protected string FilterKey { get; set; }
74 :
75 : // ReSharper disable StaticMemberInGenericType
76 : /// <summary>
77 : /// Gets the <see cref="RouteManager"/>.
78 : /// </summary>
79 : public static RouteManager Routes { get; private set; }
80 :
81 : /// <summary>
82 : /// The number of handles currently being executed.
83 : /// </summary>
84 : protected static long CurrentHandles { get; set; }
85 : // ReSharper restore StaticMemberInGenericType
86 :
87 : static AzureEventBusReceiver()
88 : {
89 : Routes = new RouteManager();
90 : }
91 :
92 : /// <summary>
93 : /// Instantiates a new instance of <see cref="AzureEventBusReceiver{TAuthenticationToken}"/>.
94 : /// </summary>
95 2 : public AzureEventBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper)
96 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, busHelper, false)
97 : {
98 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventBus.Receiver.UseApplicationInsightTelemetryHelper", correlationIdHelper);
99 : }
100 :
101 : /// <summary>
102 : /// The debugger variable window value.
103 : /// </summary>
104 : internal string DebuggerDisplay
105 : {
106 : get
107 : {
108 : string connectionString = string.Format("ConnectionString : {0}", MessageBusConnectionStringConfigurationKey);
109 : try
110 : {
111 : connectionString = string.Concat(connectionString, "=", GetConnectionString());
112 : }
113 : catch { /* */ }
114 : return string.Format(CultureInfo.InvariantCulture, "{0}, PrivateTopicName : {1}, PrivateTopicSubscriptionName : {2}, PublicTopicName : {3}, PublicTopicSubscriptionName : {4}, FilterKey : {5}, NumberOfReceiversCount : {6}",
115 : connectionString, PrivateTopicName, PrivateTopicSubscriptionName, PublicTopicName, PublicTopicSubscriptionName, FilterKey, NumberOfReceiversCount);
116 : }
117 : }
118 :
119 : #region Overrides of AzureServiceBus<TAuthenticationToken>
120 :
121 : /// <summary>
122 : /// Calls <see cref="AzureServiceBus{TAuthenticationToken}.InstantiateReceiving()"/>
123 : /// then uses a <see cref="Task"/> to apply the <see cref="FilterKey"/> as a <see cref="RuleDescription"/>
124 : /// to the <see cref="SubscriptionClient"/> instances in <paramref name="serviceBusReceivers"/>.
125 : /// </summary>
126 : /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
127 : /// <param name="serviceBusReceivers">The receivers collection to place <see cref="SubscriptionClient"/> instances into.</param>
128 : /// <param name="topicName">The topic name.</param>
129 : /// <param name="topicSubscriptionName">The topic subscription name.</param>
130 2 : protected override void InstantiateReceiving(NamespaceManager namespaceManager, IDictionary<int, SubscriptionClient> serviceBusReceivers, string topicName, string topicSubscriptionName)
131 : {
132 : base.InstantiateReceiving(namespaceManager, serviceBusReceivers, topicName, topicSubscriptionName);
133 :
134 : Task.Factory.StartNewSafely
135 : (() =>
136 : {
137 : // Because refreshing the rule can take a while, we only want to do this when the value changes
138 : string filter;
139 : if (!ConfigurationManager.TryGetSetting(FilterKeyConfigurationKey, out filter))
140 : return;
141 : if (FilterKey == filter)
142 : return;
143 : FilterKey = filter;
144 :
145 : // https://docs.microsoft.com/en-us/azure/application-insights/app-insights-analytics-reference#summarize-operator
146 : // http://www.summa.com/blog/business-blog/everything-you-need-to-know-about-azure-service-bus-brokered-messaging-part-2#rulesfiltersactions
147 : // https://github.com/Azure-Samples/azure-servicebus-messaging-samples/tree/master/TopicFilters
148 : SubscriptionClient client = serviceBusReceivers[0];
149 : bool reAddRule = false;
150 : try
151 : {
152 : IEnumerable<RuleDescription> rules = namespaceManager.GetRules(client.TopicPath, client.Name).ToList();
153 : RuleDescription ruleDescription = rules.SingleOrDefault(rule => rule.Name == "CqrsConfiguredFilter");
154 : if (ruleDescription != null)
155 : {
156 : var sqlFilter = ruleDescription.Filter as SqlFilter;
157 : if (sqlFilter == null && !string.IsNullOrWhiteSpace(filter))
158 : reAddRule = true;
159 : else if (sqlFilter != null && sqlFilter.SqlExpression != filter)
160 : reAddRule = true;
161 : if (sqlFilter != null && reAddRule)
162 : client.RemoveRule("CqrsConfiguredFilter");
163 : }
164 : else if (!string.IsNullOrWhiteSpace(filter))
165 : reAddRule = true;
166 :
167 : ruleDescription = rules.SingleOrDefault(rule => rule.Name == "$Default");
168 : // If there is a default rule and we have a rule, it will cause issues
169 : if (!string.IsNullOrWhiteSpace(filter) && ruleDescription != null)
170 : client.RemoveRule("$Default");
171 : // If we don't have a rule and there is no longer a default rule, it will cause issues
172 : else if (string.IsNullOrWhiteSpace(filter) && !rules.Any())
173 : {
174 : ruleDescription = new RuleDescription
175 : (
176 : "$Default",
177 : new SqlFilter("1=1")
178 : );
179 : client.AddRule(ruleDescription);
180 : }
181 : }
182 : catch (MessagingEntityNotFoundException)
183 : {
184 : }
185 :
186 : if (!reAddRule)
187 : return;
188 :
189 : int loopCounter = 0;
190 : while (loopCounter < 10)
191 : {
192 : try
193 : {
194 : RuleDescription ruleDescription = new RuleDescription
195 : (
196 : "CqrsConfiguredFilter",
197 : new SqlFilter(filter)
198 : );
199 : client.AddRule(ruleDescription);
200 : break;
201 : }
202 : catch (MessagingEntityAlreadyExistsException exception)
203 : {
204 : loopCounter++;
205 : // Still waiting for the delete to complete
206 : Thread.Sleep(1000);
207 : if (loopCounter == 9)
208 : {
209 : Logger.LogError("Setting the filter failed as it already exists.", exception: exception);
210 : TelemetryHelper.TrackException(exception);
211 : }
212 : }
213 : catch (Exception exception)
214 : {
215 : Logger.LogError("Setting the filter failed.", exception: exception);
216 : TelemetryHelper.TrackException(exception);
217 : break;
218 : }
219 : }
220 : });
221 : }
222 :
223 : #endregion
224 :
225 : /// <summary>
226 : /// Register an event handler that will listen and respond to events.
227 : /// </summary>
228 : /// <remarks>
229 : /// In many cases the <paramref name="targetedType"/> will be the handler class itself, what you actually want is the target of what is being updated.
230 : /// </remarks>
231 2 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
232 : where TMessage : IMessage
233 : {
234 : AzureBusHelper.RegisterHandler(TelemetryHelper, Routes, handler, targetedType, holdMessageLock);
235 : }
236 :
237 : /// <summary>
238 : /// Register an event handler that will listen and respond to events.
239 : /// </summary>
240 2 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = false)
241 : where TMessage : IMessage
242 : {
243 : RegisterHandler(handler, null, holdMessageLock);
244 : }
245 :
246 : /// <summary>
247 : /// Register an event handler that will listen and respond to all events.
248 : /// </summary>
249 2 : public void RegisterGlobalEventHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true)
250 : where TMessage : IMessage
251 : {
252 : AzureBusHelper.RegisterGlobalEventHandler(TelemetryHelper, Routes, handler, holdMessageLock);
253 : }
254 :
255 : /// <summary>
256 : /// Receives a <see cref="BrokeredMessage"/> from the event bus.
257 : /// </summary>
258 2 : protected virtual void ReceiveEvent(BrokeredMessage message)
259 : {
260 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
261 : Stopwatch mainStopWatch = Stopwatch.StartNew();
262 : string responseCode = "200";
263 : // Null means it was skipped
264 : bool? wasSuccessfull = true;
265 : string telemetryName = string.Format("Cqrs/Handle/Event/{0}", message.MessageId);
266 : ISingleSignOnToken authenticationToken = null;
267 :
268 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
269 : object value;
270 : if (message.Properties.TryGetValue("Type", out value))
271 : telemetryProperties.Add("MessageType", value.ToString());
272 : TelemetryHelper.TrackMetric("Cqrs/Handle/Event", CurrentHandles++, telemetryProperties);
273 : var brokeredMessageRenewCancellationTokenSource = new CancellationTokenSource();
274 : try
275 : {
276 : Logger.LogDebug(string.Format("An event message arrived with the id '{0}'.", message.MessageId));
277 : string messageBody = message.GetBody<string>();
278 :
279 : IEvent<TAuthenticationToken> @event = AzureBusHelper.ReceiveEvent(messageBody, ReceiveEvent,
280 : string.Format("id '{0}'", message.MessageId),
281 : () =>
282 : {
283 : wasSuccessfull = null;
284 : telemetryName = string.Format("Cqrs/Handle/Event/Skipped/{0}", message.MessageId);
285 : responseCode = "204";
286 : // Remove message from queue
287 : try
288 : {
289 : message.Complete();
290 : }
291 : catch (MessageLockLostException exception)
292 : {
293 : throw new MessageLockLostException(string.Format("The lock supplied for the skipped event message '{0}' is invalid.", message.MessageId), exception);
294 : }
295 : Logger.LogDebug(string.Format("An event message arrived with the id '{0}' but processing was skipped due to event settings.", message.MessageId));
296 : TelemetryHelper.TrackEvent("Cqrs/Handle/Event/Skipped", telemetryProperties);
297 : },
298 : () =>
299 : {
300 : AzureBusHelper.RefreshLock(brokeredMessageRenewCancellationTokenSource, message, "event");
301 : }
302 : );
303 :
304 : if (wasSuccessfull != null)
305 : {
306 : if (@event != null)
307 : {
308 : telemetryName = string.Format("{0}/{1}", @event.GetType().FullName, @event.Id);
309 : authenticationToken = @event.AuthenticationToken as ISingleSignOnToken;
310 :
311 : var telemeteredMessage = @event as ITelemeteredMessage;
312 : if (telemeteredMessage != null)
313 : telemetryName = telemeteredMessage.TelemetryName;
314 :
315 : telemetryName = string.Format("Cqrs/Handle/Event/{0}", telemetryName);
316 : }
317 : // Remove message from queue
318 : try
319 : {
320 : message.Complete();
321 : }
322 : catch (MessageLockLostException exception)
323 : {
324 : throw new MessageLockLostException(string.Format("The lock supplied for event '{0}' of type {1} is invalid.", @event.Id, @event.GetType().Name), exception);
325 : }
326 : }
327 : Logger.LogDebug(string.Format("An event message arrived and was processed with the id '{0}'.", message.MessageId));
328 :
329 : IList<IEvent<TAuthenticationToken>> events;
330 : if (EventWaits.TryGetValue(@event.CorrelationId, out events))
331 : events.Add(@event);
332 : }
333 : catch (MessageLockLostException exception)
334 : {
335 : IDictionary<string, string> subTelemetryProperties = new Dictionary<string, string>(telemetryProperties);
336 : subTelemetryProperties.Add("TimeTaken", mainStopWatch.Elapsed.ToString());
337 : TelemetryHelper.TrackException(exception, null, subTelemetryProperties);
338 : if (ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete)
339 : {
340 : Logger.LogError(exception.Message, exception: exception);
341 : // Indicates a problem, unlock message in queue
342 : message.Abandon();
343 : wasSuccessfull = false;
344 : }
345 : else
346 : {
347 : Logger.LogWarning(exception.Message, exception: exception);
348 : try
349 : {
350 : message.DeadLetter("LockLostButHandled", "The message was handled but the lock was lost.");
351 : }
352 : catch (Exception)
353 : {
354 : // Oh well, move on.
355 : message.Abandon();
356 : }
357 : }
358 : responseCode = "599";
359 : }
360 : catch (Exception exception)
361 : {
362 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
363 : // Indicates a problem, unlock message in queue
364 : Logger.LogError(string.Format("An event message arrived with the id '{0}' but failed to be process.", message.MessageId), exception: exception);
365 : message.Abandon();
366 : wasSuccessfull = false;
367 : responseCode = "500";
368 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
369 : telemetryProperties.Add("ExceptionMessage", exception.Message);
370 : }
371 : finally
372 : {
373 : // Cancel the lock of renewing the task
374 : brokeredMessageRenewCancellationTokenSource.Cancel();
375 : TelemetryHelper.TrackMetric("Cqrs/Handle/Event", CurrentHandles--, telemetryProperties);
376 :
377 : mainStopWatch.Stop();
378 : TelemetryHelper.TrackRequest
379 : (
380 : telemetryName,
381 : authenticationToken,
382 : startedAt,
383 : mainStopWatch.Elapsed,
384 : responseCode,
385 : wasSuccessfull == null || wasSuccessfull.Value,
386 : telemetryProperties
387 : );
388 :
389 : TelemetryHelper.Flush();
390 : }
391 : }
392 :
393 : /// <summary>
394 : /// Receives a <see cref="IEvent{TAuthenticationToken}"/> from the event bus.
395 : /// </summary>
396 2 : public virtual bool? ReceiveEvent(IEvent<TAuthenticationToken> @event)
397 : {
398 : return AzureBusHelper.DefaultReceiveEvent(@event, Routes, "Azure-ServiceBus");
399 : }
400 :
401 : #region Overrides of AzureServiceBus<TAuthenticationToken>
402 :
403 : /// <summary>
404 : /// Returns <see cref="NumberOfReceiversCountConfigurationKey"/> from <see cref="IConfigurationManager"/>
405 : /// if no value is set, returns <see cref="AzureBus{TAuthenticationToken}.DefaultNumberOfReceiversCount"/>.
406 : /// </summary>
407 2 : protected override int GetCurrentNumberOfReceiversCount()
408 : {
409 : string numberOfReceiversCountValue;
410 : int numberOfReceiversCount;
411 : if (ConfigurationManager.TryGetSetting(NumberOfReceiversCountConfigurationKey, out numberOfReceiversCountValue) && !string.IsNullOrWhiteSpace(numberOfReceiversCountValue))
412 : {
413 : if (!int.TryParse(numberOfReceiversCountValue, out numberOfReceiversCount))
414 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
415 : }
416 : else
417 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
418 : return numberOfReceiversCount;
419 : }
420 :
421 : /// <summary>
422 : /// Returns <see cref="MaximumConcurrentReceiverProcessesCountConfigurationKey"/> from <see cref="IConfigurationManager"/>
423 : /// if no value is set, returns <see cref="AzureBus{TAuthenticationToken}.DefaultMaximumConcurrentReceiverProcessesCount"/>.
424 : /// </summary>
425 2 : protected override int GetCurrentMaximumConcurrentReceiverProcessesCount()
426 : {
427 : string maximumConcurrentReceiverProcessesCountValue;
428 : int maximumConcurrentReceiverProcessesCount;
429 : if (ConfigurationManager.TryGetSetting(MaximumConcurrentReceiverProcessesCountConfigurationKey, out maximumConcurrentReceiverProcessesCountValue) && !string.IsNullOrWhiteSpace(maximumConcurrentReceiverProcessesCountValue))
430 : {
431 : if (!int.TryParse(maximumConcurrentReceiverProcessesCountValue, out maximumConcurrentReceiverProcessesCount))
432 : maximumConcurrentReceiverProcessesCount = DefaultMaximumConcurrentReceiverProcessesCount;
433 : }
434 : else
435 : maximumConcurrentReceiverProcessesCount = DefaultMaximumConcurrentReceiverProcessesCount;
436 : return maximumConcurrentReceiverProcessesCount;
437 : }
438 :
439 : #endregion
440 :
441 : #region Implementation of IEventReceiver
442 :
443 : /// <summary>
444 : /// Starts listening and processing instances of <see cref="IEvent{TAuthenticationToken}"/> from the event bus.
445 : /// </summary>
446 2 : public void Start()
447 : {
448 : InstantiateReceiving();
449 :
450 : // Configure the callback options
451 : OnMessageOptions options = new OnMessageOptions
452 : {
453 : AutoComplete = false,
454 : AutoRenewTimeout = TimeSpan.FromMinutes(1),
455 : MaxConcurrentCalls = MaximumConcurrentReceiverProcessesCount
456 : };
457 :
458 : // Callback to handle received messages
459 : RegisterReceiverMessageHandler(ReceiveEvent, options);
460 : }
461 :
462 : #endregion
463 : }
464 : }
|