Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Globalization;
13 : using System.Linq;
14 : using System.Threading;
15 : using System.Threading.Tasks;
16 : using cdmdotnet.Logging;
17 : using Cqrs.Authentication;
18 : using Cqrs.Bus;
19 : using Cqrs.Commands;
20 : using Cqrs.Configuration;
21 : using Cqrs.Messages;
22 : using Microsoft.ServiceBus;
23 : using Microsoft.ServiceBus.Messaging;
24 :
25 : namespace Cqrs.Azure.ServiceBus
26 : {
27 : /// <summary>
28 : /// A <see cref="ICommandReceiver{TAuthenticationToken}"/> that receives network messages, resolves handlers and executes the handler.
29 : /// </summary>
30 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
31 : // The “,nq” suffix here just asks the expression evaluator to remove the quotes when displaying the final value (nq = no quotes).
32 : [DebuggerDisplay("{DebuggerDisplay,nq}")]
33 : public class AzureCommandBusReceiver<TAuthenticationToken>
34 : : AzureCommandBus<TAuthenticationToken>
35 : , ICommandHandlerRegistrar
36 : , ICommandReceiver<TAuthenticationToken>
37 2 : {
38 : /// <summary>
39 : /// The configuration key for
40 : /// the number of receiver <see cref="SubscriptionClient"/> instances to create
41 : /// as used by <see cref="IConfigurationManager"/>.
42 : /// </summary>
43 : protected virtual string NumberOfReceiversCountConfigurationKey
44 : {
45 : get { return "Cqrs.Azure.CommandBus.NumberOfReceiversCount"; }
46 : }
47 :
48 : /// <summary>
49 : /// The configuration key for
50 : /// <see cref="OnMessageOptions.MaxConcurrentCalls"/>.
51 : /// as used by <see cref="IConfigurationManager"/>.
52 : /// </summary>
53 : protected virtual string MaximumConcurrentReceiverProcessesCountConfigurationKey
54 : {
55 : get { return "Cqrs.Azure.CommandBus.MaximumConcurrentReceiverProcessesCount"; }
56 : }
57 :
58 : /// <summary>
59 : /// The configuration key for
60 : /// the <see cref="SqlFilter.SqlExpression"/> that can be applied to
61 : /// the <see cref="SubscriptionClient"/> instances in the receivers
62 : /// as used by <see cref="IConfigurationManager"/>.
63 : /// </summary>
64 : protected virtual string FilterKeyConfigurationKey
65 : {
66 : get { return "Cqrs.Azure.CommandBus.TopicName.SubscriptionName.Filter"; }
67 : }
68 :
69 : /// <summary>
70 : /// The <see cref="SqlFilter.SqlExpression"/> that can be applied to
71 : /// the <see cref="SubscriptionClient"/> instances in the receivers
72 : /// </summary>
73 : protected string FilterKey { get; set; }
74 :
75 : // ReSharper disable StaticMemberInGenericType
76 : /// <summary>
77 : /// Gets the <see cref="RouteManager"/>.
78 : /// </summary>
79 : public static RouteManager Routes { get; private set; }
80 :
81 : /// <summary>
82 : /// The number of handles currently being executed.
83 : /// </summary>
84 : protected static long CurrentHandles { get; set; }
85 : // ReSharper restore StaticMemberInGenericType
86 :
87 : static AzureCommandBusReceiver()
88 : {
89 : Routes = new RouteManager();
90 : }
91 :
92 : /// <summary>
93 : /// Instantiates a new instance of <see cref="AzureCommandBusReceiver{TAuthenticationToken}"/>.
94 : /// </summary>
95 2 : public AzureCommandBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper)
96 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, busHelper, false)
97 : {
98 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.CommandBus.Receiver.UseApplicationInsightTelemetryHelper", correlationIdHelper);
99 : }
100 :
101 : /// <summary>
102 : /// The debugger variable window value.
103 : /// </summary>
104 : internal string DebuggerDisplay
105 : {
106 : get
107 : {
108 : string connectionString = string.Format("ConnectionString : {0}", MessageBusConnectionStringConfigurationKey);
109 : try
110 : {
111 : connectionString = string.Concat(connectionString, "=", GetConnectionString());
112 : }
113 : catch { /* */ }
114 : return string.Format(CultureInfo.InvariantCulture, "{0}, PrivateTopicName : {1}, PrivateTopicSubscriptionName : {2}, PublicTopicName : {3}, PublicTopicSubscriptionName : {4}, FilterKey : {5}, NumberOfReceiversCount : {6}",
115 : connectionString, PrivateTopicName, PrivateTopicSubscriptionName, PublicTopicName, PublicTopicSubscriptionName, FilterKey, NumberOfReceiversCount);
116 : }
117 : }
118 :
119 : #region Overrides of AzureServiceBus<TAuthenticationToken>
120 :
121 : /// <summary>
122 : /// Calls <see cref="AzureServiceBus{TAuthenticationToken}.InstantiateReceiving()"/>
123 : /// then uses a <see cref="Task"/> to apply the <see cref="FilterKey"/> as a <see cref="RuleDescription"/>
124 : /// to the <see cref="SubscriptionClient"/> instances in <paramref name="serviceBusReceivers"/>.
125 : /// </summary>
126 : /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
127 : /// <param name="serviceBusReceivers">The receivers collection to place <see cref="SubscriptionClient"/> instances into.</param>
128 : /// <param name="topicName">The topic name.</param>
129 : /// <param name="topicSubscriptionName">The topic subscription name.</param>
130 2 : protected override void InstantiateReceiving(NamespaceManager namespaceManager, IDictionary<int, SubscriptionClient> serviceBusReceivers, string topicName, string topicSubscriptionName)
131 : {
132 : base.InstantiateReceiving(namespaceManager, serviceBusReceivers, topicName, topicSubscriptionName);
133 :
134 : Task.Factory.StartNewSafely
135 : (() =>
136 : {
137 : // Because refreshing the rule can take a while, we only want to do this when the value changes
138 : string filter;
139 : if (!ConfigurationManager.TryGetSetting(FilterKeyConfigurationKey, out filter))
140 : return;
141 : if (FilterKey == filter)
142 : return;
143 : FilterKey = filter;
144 :
145 : // https://docs.microsoft.com/en-us/azure/application-insights/app-insights-analytics-reference#summarize-operator
146 : // http://www.summa.com/blog/business-blog/everything-you-need-to-know-about-azure-service-bus-brokered-messaging-part-2#rulesfiltersactions
147 : // https://github.com/Azure-Samples/azure-servicebus-messaging-samples/tree/master/TopicFilters
148 : SubscriptionClient client = serviceBusReceivers[0];
149 : bool reAddRule = false;
150 : try
151 : {
152 : IEnumerable<RuleDescription> rules = namespaceManager.GetRules(client.TopicPath, client.Name).ToList();
153 : RuleDescription ruleDescription = rules.SingleOrDefault(rule => rule.Name == "CqrsConfiguredFilter");
154 : if (ruleDescription != null)
155 : {
156 : var sqlFilter = ruleDescription.Filter as SqlFilter;
157 : if (sqlFilter == null && !string.IsNullOrWhiteSpace(filter))
158 : reAddRule = true;
159 : else if (sqlFilter != null && sqlFilter.SqlExpression != filter)
160 : reAddRule = true;
161 : if (sqlFilter != null && reAddRule)
162 : client.RemoveRule("CqrsConfiguredFilter");
163 : }
164 : else if (!string.IsNullOrWhiteSpace(filter))
165 : reAddRule = true;
166 :
167 : ruleDescription = rules.SingleOrDefault(rule => rule.Name == "$Default");
168 : // If there is a default rule and we have a rule, it will cause issues
169 : if (!string.IsNullOrWhiteSpace(filter) && ruleDescription != null)
170 : client.RemoveRule("$Default");
171 : // If we don't have a rule and there is no longer a default rule, it will cause issues
172 : else if (string.IsNullOrWhiteSpace(filter) && !rules.Any())
173 : {
174 : ruleDescription = new RuleDescription
175 : (
176 : "$Default",
177 : new SqlFilter("1=1")
178 : );
179 : client.AddRule(ruleDescription);
180 : }
181 : }
182 : catch (MessagingEntityNotFoundException)
183 : {
184 : }
185 :
186 : if (!reAddRule)
187 : return;
188 :
189 : int loopCounter = 0;
190 : while (loopCounter < 10)
191 : {
192 : try
193 : {
194 : RuleDescription ruleDescription = new RuleDescription
195 : (
196 : "CqrsConfiguredFilter",
197 : new SqlFilter(filter)
198 : );
199 : client.AddRule(ruleDescription);
200 : break;
201 : }
202 : catch (MessagingEntityAlreadyExistsException exception)
203 : {
204 : loopCounter++;
205 : // Still waiting for the delete to complete
206 : Thread.Sleep(1000);
207 : if (loopCounter == 9)
208 : {
209 : Logger.LogError("Setting the filter failed as it already exists.", exception: exception);
210 : TelemetryHelper.TrackException(exception);
211 : }
212 : }
213 : catch (Exception exception)
214 : {
215 : Logger.LogError("Setting the filter failed.", exception: exception);
216 : TelemetryHelper.TrackException(exception);
217 : break;
218 : }
219 : }
220 : });
221 :
222 : }
223 :
224 : #endregion
225 :
226 : /// <summary>
227 : /// Register a command handler that will listen and respond to commands.
228 : /// </summary>
229 : /// <remarks>
230 : /// In many cases the <paramref name="targetedType"/> will be the handler class itself, what you actually want is the target of what is being updated.
231 : /// </remarks>
232 2 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
233 : where TMessage : IMessage
234 : {
235 : AzureBusHelper.RegisterHandler(TelemetryHelper, Routes, handler, targetedType, holdMessageLock);
236 : }
237 :
238 : /// <summary>
239 : /// Register a command handler that will listen and respond to commands.
240 : /// </summary>
241 2 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true)
242 : where TMessage : IMessage
243 : {
244 : RegisterHandler(handler, null, holdMessageLock);
245 : }
246 :
247 : /// <summary>
248 : /// Receives a <see cref="BrokeredMessage"/> from the command bus.
249 : /// </summary>
250 2 : protected virtual void ReceiveCommand(BrokeredMessage message)
251 : {
252 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
253 : Stopwatch mainStopWatch = Stopwatch.StartNew();
254 : string responseCode = "200";
255 : // Null means it was skipped
256 : bool? wasSuccessfull = true;
257 : string telemetryName = string.Format("Cqrs/Handle/Command/{0}", message.MessageId);
258 : ISingleSignOnToken authenticationToken = null;
259 :
260 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
261 : object value;
262 : if (message.Properties.TryGetValue("Type", out value))
263 : telemetryProperties.Add("MessageType", value.ToString());
264 : TelemetryHelper.TrackMetric("Cqrs/Handle/Command", CurrentHandles++, telemetryProperties);
265 : var brokeredMessageRenewCancellationTokenSource = new CancellationTokenSource();
266 : try
267 : {
268 : Logger.LogDebug(string.Format("A command message arrived with the id '{0}'.", message.MessageId));
269 : string messageBody = message.GetBody<string>();
270 :
271 :
272 : ICommand<TAuthenticationToken> command = AzureBusHelper.ReceiveCommand(messageBody, ReceiveCommand,
273 : string.Format("id '{0}'", message.MessageId),
274 : () =>
275 : {
276 : wasSuccessfull = null;
277 : telemetryName = string.Format("Cqrs/Handle/Command/Skipped/{0}", message.MessageId);
278 : responseCode = "204";
279 : // Remove message from queue
280 : try
281 : {
282 : message.Complete();
283 : }
284 : catch (MessageLockLostException exception)
285 : {
286 : throw new MessageLockLostException(string.Format("The lock supplied for the skipped command message '{0}' is invalid.", message.MessageId), exception);
287 : }
288 : Logger.LogDebug(string.Format("A command message arrived with the id '{0}' but processing was skipped due to event settings.", message.MessageId));
289 : TelemetryHelper.TrackEvent("Cqrs/Handle/Command/Skipped", telemetryProperties);
290 : },
291 : () =>
292 : {
293 : AzureBusHelper.RefreshLock(brokeredMessageRenewCancellationTokenSource, message, "command");
294 : }
295 : );
296 :
297 : if (wasSuccessfull != null)
298 : {
299 : if (command != null)
300 : {
301 : telemetryName = string.Format("{0}/{1}", command.GetType().FullName, command.Id);
302 : authenticationToken = command.AuthenticationToken as ISingleSignOnToken;
303 :
304 : var telemeteredMessage = command as ITelemeteredMessage;
305 : if (telemeteredMessage != null)
306 : telemetryName = telemeteredMessage.TelemetryName;
307 :
308 : telemetryName = string.Format("Cqrs/Handle/Command/{0}", telemetryName);
309 : }
310 : // Remove message from queue
311 : try
312 : {
313 : message.Complete();
314 : }
315 : catch (MessageLockLostException exception)
316 : {
317 : throw new MessageLockLostException(string.Format("The lock supplied for command '{0}' of type {1} is invalid.", command.Id, command.GetType().Name), exception);
318 : }
319 : }
320 : Logger.LogDebug(string.Format("A command message arrived and was processed with the id '{0}'.", message.MessageId));
321 : }
322 : catch (MessageLockLostException exception)
323 : {
324 : IDictionary<string, string> subTelemetryProperties = new Dictionary<string, string>(telemetryProperties);
325 : subTelemetryProperties.Add("TimeTaken", mainStopWatch.Elapsed.ToString());
326 : TelemetryHelper.TrackException(exception, null, subTelemetryProperties);
327 : if (ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete)
328 : {
329 : Logger.LogError(exception.Message, exception: exception);
330 : // Indicates a problem, unlock message in queue
331 : message.Abandon();
332 : wasSuccessfull = false;
333 : }
334 : else
335 : {
336 : Logger.LogWarning(exception.Message, exception: exception);
337 : try
338 : {
339 : message.DeadLetter("LockLostButHandled", "The message was handled but the lock was lost.");
340 : }
341 : catch (Exception)
342 : {
343 : // Oh well, move on.
344 : message.Abandon();
345 : }
346 : }
347 : responseCode = "599";
348 : }
349 : catch (Exception exception)
350 : {
351 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
352 : // Indicates a problem, unlock message in queue
353 : Logger.LogError(string.Format("A command message arrived with the id '{0}' but failed to be process.", message.MessageId), exception: exception);
354 : message.Abandon();
355 : wasSuccessfull = false;
356 : responseCode = "500";
357 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
358 : telemetryProperties.Add("ExceptionMessage", exception.Message);
359 : }
360 : finally
361 : {
362 : // Cancel the lock of renewing the task
363 : brokeredMessageRenewCancellationTokenSource.Cancel();
364 : TelemetryHelper.TrackMetric("Cqrs/Handle/Command", CurrentHandles--, telemetryProperties);
365 :
366 : mainStopWatch.Stop();
367 : TelemetryHelper.TrackRequest
368 : (
369 : telemetryName,
370 : authenticationToken,
371 : startedAt,
372 : mainStopWatch.Elapsed,
373 : responseCode,
374 : wasSuccessfull == null || wasSuccessfull.Value,
375 : telemetryProperties
376 : );
377 :
378 : TelemetryHelper.Flush();
379 : }
380 : }
381 :
382 : /// <summary>
383 : /// Receives a <see cref="ICommand{TAuthenticationToken}"/> from the command bus.
384 : /// </summary>
385 2 : public virtual bool? ReceiveCommand(ICommand<TAuthenticationToken> command)
386 : {
387 : return AzureBusHelper.DefaultReceiveCommand(command, Routes, "Azure-ServiceBus");
388 : }
389 :
390 : #region Overrides of AzureBus<TAuthenticationToken>
391 :
392 : /// <summary>
393 : /// Returns <see cref="NumberOfReceiversCountConfigurationKey"/> from <see cref="IConfigurationManager"/>
394 : /// if no value is set, returns <see cref="AzureBus{TAuthenticationToken}.DefaultNumberOfReceiversCount"/>.
395 : /// </summary>
396 2 : protected override int GetCurrentNumberOfReceiversCount()
397 : {
398 : string numberOfReceiversCountValue;
399 : int numberOfReceiversCount;
400 : if (ConfigurationManager.TryGetSetting(NumberOfReceiversCountConfigurationKey, out numberOfReceiversCountValue) && !string.IsNullOrWhiteSpace(numberOfReceiversCountValue))
401 : {
402 : if (!int.TryParse(numberOfReceiversCountValue, out numberOfReceiversCount))
403 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
404 : }
405 : else
406 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
407 : return numberOfReceiversCount;
408 : }
409 :
410 : /// <summary>
411 : /// Returns <see cref="MaximumConcurrentReceiverProcessesCountConfigurationKey"/> from <see cref="IConfigurationManager"/>
412 : /// if no value is set, returns <see cref="AzureBus{TAuthenticationToken}.DefaultMaximumConcurrentReceiverProcessesCount"/>.
413 : /// </summary>
414 2 : protected override int GetCurrentMaximumConcurrentReceiverProcessesCount()
415 : {
416 : string maximumConcurrentReceiverProcessesCountValue;
417 : int maximumConcurrentReceiverProcessesCount;
418 : if (ConfigurationManager.TryGetSetting(MaximumConcurrentReceiverProcessesCountConfigurationKey, out maximumConcurrentReceiverProcessesCountValue) && !string.IsNullOrWhiteSpace(maximumConcurrentReceiverProcessesCountValue))
419 : {
420 : if (!int.TryParse(maximumConcurrentReceiverProcessesCountValue, out maximumConcurrentReceiverProcessesCount))
421 : maximumConcurrentReceiverProcessesCount = DefaultMaximumConcurrentReceiverProcessesCount;
422 : }
423 : else
424 : maximumConcurrentReceiverProcessesCount = DefaultMaximumConcurrentReceiverProcessesCount;
425 : return maximumConcurrentReceiverProcessesCount;
426 : }
427 :
428 : #endregion
429 :
430 : #region Implementation of ICommandReceiver
431 :
432 : /// <summary>
433 : /// Starts listening and processing instances of <see cref="ICommand{TAuthenticationToken}"/> from the command bus.
434 : /// </summary>
435 2 : public void Start()
436 : {
437 : InstantiateReceiving();
438 :
439 : // Configure the callback options
440 : OnMessageOptions options = new OnMessageOptions
441 : {
442 : AutoComplete = false,
443 : AutoRenewTimeout = TimeSpan.FromMinutes(1)
444 : // I think this is intentionally left out
445 : // , MaxConcurrentCalls = MaximumConcurrentReceiverProcessesCount
446 : };
447 :
448 : // Callback to handle received messages
449 : RegisterReceiverMessageHandler(ReceiveCommand, options);
450 : }
451 :
452 : #endregion
453 : }
454 : }
|