Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Linq;
13 : using System.Threading;
14 : using System.Threading.Tasks;
15 : using cdmdotnet.Logging;
16 : using Cqrs.Authentication;
17 : using Cqrs.Bus;
18 : using Cqrs.Commands;
19 : using Cqrs.Configuration;
20 : using Cqrs.Messages;
21 : using Microsoft.ServiceBus;
22 : using Microsoft.ServiceBus.Messaging;
23 :
24 : namespace Cqrs.Azure.ServiceBus
25 : {
26 : public class AzureCommandBusReceiver<TAuthenticationToken>
27 : : AzureCommandBus<TAuthenticationToken>
28 : , ICommandHandlerRegistrar
29 : , ICommandReceiver<TAuthenticationToken>
30 0 : {
31 : protected virtual string NumberOfReceiversCountConfigurationKey
32 : {
33 : get { return "Cqrs.Azure.CommandBus.NumberOfReceiversCount"; }
34 : }
35 :
36 : protected virtual string MaximumConcurrentReceiverProcessesCountConfigurationKey
37 : {
38 : get { return "Cqrs.Azure.CommandBus.MaximumConcurrentReceiverProcessesCount"; }
39 : }
40 :
41 : protected virtual string FilterKeyConfigurationKey
42 : {
43 : get { return "Cqrs.Azure.CommandBus.TopicName.SubscriptionName.Filter"; }
44 : }
45 :
46 : protected string FilterKey { get; set; }
47 :
48 : // ReSharper disable StaticMemberInGenericType
49 : protected static RouteManager Routes { get; private set; }
50 :
51 : protected static long CurrentHandles { get; set; }
52 : // ReSharper restore StaticMemberInGenericType
53 :
54 : static AzureCommandBusReceiver()
55 : {
56 : Routes = new RouteManager();
57 : }
58 :
59 0 : public AzureCommandBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper)
60 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, busHelper, false)
61 : {
62 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.CommandBus.Receiver.UseApplicationInsightTelemetryHelper", correlationIdHelper);
63 : }
64 :
65 : #region Overrides of AzureServiceBus<TAuthenticationToken>
66 :
67 0 : protected override void InstantiateReceiving(NamespaceManager namespaceManager, IDictionary<int, SubscriptionClient> serviceBusReceivers, string topicName, string topicSubscriptionName)
68 : {
69 : base.InstantiateReceiving(namespaceManager, serviceBusReceivers, topicName, topicSubscriptionName);
70 :
71 : Task.Factory.StartNewSafely
72 : (() =>
73 : {
74 : // Because refreshing the rule can take a while, we only want to do this when the value changes
75 : string filter;
76 : if (!ConfigurationManager.TryGetSetting(FilterKeyConfigurationKey, out filter))
77 : return;
78 : if (FilterKey == filter)
79 : return;
80 : FilterKey = filter;
81 :
82 : // https://docs.microsoft.com/en-us/azure/application-insights/app-insights-analytics-reference#summarize-operator
83 : // http://www.summa.com/blog/business-blog/everything-you-need-to-know-about-azure-service-bus-brokered-messaging-part-2#rulesfiltersactions
84 : // https://github.com/Azure-Samples/azure-servicebus-messaging-samples/tree/master/TopicFilters
85 : SubscriptionClient client = serviceBusReceivers[0];
86 : bool reAddRule = false;
87 : try
88 : {
89 : IEnumerable<RuleDescription> rules = namespaceManager.GetRules(client.TopicPath, client.Name).ToList();
90 : RuleDescription ruleDescription = rules.SingleOrDefault(rule => rule.Name == "CqrsConfiguredFilter");
91 : if (ruleDescription != null)
92 : {
93 : var sqlFilter = ruleDescription.Filter as SqlFilter;
94 : if (sqlFilter == null && !string.IsNullOrWhiteSpace(filter))
95 : reAddRule = true;
96 : else if (sqlFilter != null && sqlFilter.SqlExpression != filter)
97 : reAddRule = true;
98 : if (sqlFilter != null && reAddRule)
99 : client.RemoveRule("CqrsConfiguredFilter");
100 : }
101 : else if (!string.IsNullOrWhiteSpace(filter))
102 : reAddRule = true;
103 :
104 : ruleDescription = rules.SingleOrDefault(rule => rule.Name == "$Default");
105 : // If there is a default rule and we have a rule, it will cause issues
106 : if (!string.IsNullOrWhiteSpace(filter) && ruleDescription != null)
107 : client.RemoveRule("$Default");
108 : // If we don't have a rule and there is no longer a default rule, it will cause issues
109 : else if (string.IsNullOrWhiteSpace(filter) && !rules.Any())
110 : {
111 : ruleDescription = new RuleDescription
112 : (
113 : "$Default",
114 : new SqlFilter("1=1")
115 : );
116 : client.AddRule(ruleDescription);
117 : }
118 : }
119 : catch (MessagingEntityNotFoundException)
120 : {
121 : }
122 :
123 : if (!reAddRule)
124 : return;
125 :
126 : int loopCounter = 0;
127 : while (loopCounter < 10)
128 : {
129 : try
130 : {
131 : RuleDescription ruleDescription = new RuleDescription
132 : (
133 : "CqrsConfiguredFilter",
134 : new SqlFilter(filter)
135 : );
136 : client.AddRule(ruleDescription);
137 : break;
138 : }
139 : catch (MessagingEntityAlreadyExistsException exception)
140 : {
141 : loopCounter++;
142 : // Still waiting for the delete to complete
143 : Thread.Sleep(1000);
144 : if (loopCounter == 9)
145 : {
146 : Logger.LogError("Setting the filter failed as it already exists.", exception: exception);
147 : TelemetryHelper.TrackException(exception);
148 : }
149 : }
150 : catch (Exception exception)
151 : {
152 : Logger.LogError("Setting the filter failed.", exception: exception);
153 : TelemetryHelper.TrackException(exception);
154 : break;
155 : }
156 : }
157 : });
158 :
159 : }
160 :
161 : #endregion
162 :
163 2 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
164 : where TMessage : IMessage
165 : {
166 : AzureBusHelper.RegisterHandler(TelemetryHelper, Routes, handler, targetedType, holdMessageLock);
167 : }
168 :
169 : /// <summary>
170 : /// Register an event or command handler that will listen and respond to events or commands.
171 : /// </summary>
172 2 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true)
173 : where TMessage : IMessage
174 : {
175 : RegisterHandler(handler, null, holdMessageLock);
176 : }
177 :
178 0 : protected virtual void ReceiveCommand(BrokeredMessage message)
179 : {
180 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
181 : Stopwatch mainStopWatch = Stopwatch.StartNew();
182 : string responseCode = "200";
183 : // Null means it was skipped
184 : bool? wasSuccessfull = true;
185 : string telemetryName = string.Format("Cqrs/Handle/Command/{0}", message.MessageId);
186 : ISingleSignOnToken authenticationToken = null;
187 :
188 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
189 : object value;
190 : if (message.Properties.TryGetValue("Type", out value))
191 : telemetryProperties.Add("MessageType", value.ToString());
192 : TelemetryHelper.TrackMetric("Cqrs/Handle/Command", CurrentHandles++, telemetryProperties);
193 : var brokeredMessageRenewCancellationTokenSource = new CancellationTokenSource();
194 : try
195 : {
196 : Logger.LogDebug(string.Format("A command message arrived with the id '{0}'.", message.MessageId));
197 : string messageBody = message.GetBody<string>();
198 :
199 :
200 : ICommand<TAuthenticationToken> command = AzureBusHelper.ReceiveCommand(messageBody, ReceiveCommand,
201 : string.Format("id '{0}'", message.MessageId),
202 : () =>
203 : {
204 : wasSuccessfull = null;
205 : telemetryName = string.Format("Cqrs/Handle/Command/Skipped/{0}", message.MessageId);
206 : responseCode = "204";
207 : // Remove message from queue
208 : try
209 : {
210 : message.Complete();
211 : }
212 : catch (MessageLockLostException exception)
213 : {
214 : throw new MessageLockLostException(string.Format("The lock supplied for the skipped command message '{0}' is invalid.", message.MessageId), exception);
215 : }
216 : Logger.LogDebug(string.Format("A command message arrived with the id '{0}' but processing was skipped due to event settings.", message.MessageId));
217 : TelemetryHelper.TrackEvent("Cqrs/Handle/Command/Skipped", telemetryProperties);
218 : },
219 : () =>
220 : {
221 : AzureBusHelper.RefreshLock(brokeredMessageRenewCancellationTokenSource, message, "command");
222 : }
223 : );
224 :
225 : if (wasSuccessfull != null)
226 : {
227 : if (command != null)
228 : {
229 : telemetryName = string.Format("{0}/{1}", command.GetType().FullName, command.Id);
230 : authenticationToken = command.AuthenticationToken as ISingleSignOnToken;
231 :
232 : var telemeteredMessage = command as ITelemeteredMessage;
233 : if (telemeteredMessage != null)
234 : telemetryName = telemeteredMessage.TelemetryName;
235 :
236 : telemetryName = string.Format("Cqrs/Handle/Command/{0}", telemetryName);
237 : }
238 : // Remove message from queue
239 : try
240 : {
241 : message.Complete();
242 : }
243 : catch (MessageLockLostException exception)
244 : {
245 : throw new MessageLockLostException(string.Format("The lock supplied for command '{0}' of type {1} is invalid.", command.Id, command.GetType().Name), exception);
246 : }
247 : }
248 : Logger.LogDebug(string.Format("A command message arrived and was processed with the id '{0}'.", message.MessageId));
249 : }
250 : catch (MessageLockLostException exception)
251 : {
252 : IDictionary<string, string> subTelemetryProperties = new Dictionary<string, string>(telemetryProperties);
253 : subTelemetryProperties.Add("TimeTaken", mainStopWatch.Elapsed.ToString());
254 : TelemetryHelper.TrackException(exception, null, subTelemetryProperties);
255 : if (ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete)
256 : {
257 : Logger.LogError(exception.Message, exception: exception);
258 : // Indicates a problem, unlock message in queue
259 : message.Abandon();
260 : wasSuccessfull = false;
261 : }
262 : else
263 : {
264 : Logger.LogWarning(exception.Message, exception: exception);
265 : try
266 : {
267 : message.DeadLetter("LockLostButHandled", "The message was handled but the lock was lost.");
268 : }
269 : catch (Exception)
270 : {
271 : // Oh well, move on.
272 : message.Abandon();
273 : }
274 : }
275 : responseCode = "599";
276 : }
277 : catch (Exception exception)
278 : {
279 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
280 : // Indicates a problem, unlock message in queue
281 : Logger.LogError(string.Format("A command message arrived with the id '{0}' but failed to be process.", message.MessageId), exception: exception);
282 : message.Abandon();
283 : wasSuccessfull = false;
284 : responseCode = "500";
285 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
286 : telemetryProperties.Add("ExceptionMessage", exception.Message);
287 : }
288 : finally
289 : {
290 : // Cancel the lock of renewing the task
291 : brokeredMessageRenewCancellationTokenSource.Cancel();
292 : TelemetryHelper.TrackMetric("Cqrs/Handle/Command", CurrentHandles--, telemetryProperties);
293 :
294 : mainStopWatch.Stop();
295 : TelemetryHelper.TrackRequest
296 : (
297 : telemetryName,
298 : authenticationToken,
299 : startedAt,
300 : mainStopWatch.Elapsed,
301 : responseCode,
302 : wasSuccessfull == null || wasSuccessfull.Value,
303 : telemetryProperties
304 : );
305 :
306 : TelemetryHelper.Flush();
307 : }
308 : }
309 :
310 0 : public virtual bool? ReceiveCommand(ICommand<TAuthenticationToken> command)
311 : {
312 : return AzureBusHelper.DefaultReceiveCommand(command, Routes, "Azure-ServiceBus");
313 : }
314 :
315 : #region Overrides of AzureBus<TAuthenticationToken>
316 :
317 0 : protected override int GetCurrentNumberOfReceiversCount()
318 : {
319 : string numberOfReceiversCountValue;
320 : int numberOfReceiversCount;
321 : if (ConfigurationManager.TryGetSetting(NumberOfReceiversCountConfigurationKey, out numberOfReceiversCountValue) && !string.IsNullOrWhiteSpace(numberOfReceiversCountValue))
322 : {
323 : if (!int.TryParse(numberOfReceiversCountValue, out numberOfReceiversCount))
324 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
325 : }
326 : else
327 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
328 : return numberOfReceiversCount;
329 : }
330 :
331 : #region Overrides of AzureBus<TAuthenticationToken>
332 :
333 0 : protected override int GetCurrentMaximumConcurrentReceiverProcessesCount()
334 : {
335 : string maximumConcurrentReceiverProcessesCountValue;
336 : int maximumConcurrentReceiverProcessesCount;
337 : if (ConfigurationManager.TryGetSetting(MaximumConcurrentReceiverProcessesCountConfigurationKey, out maximumConcurrentReceiverProcessesCountValue) && !string.IsNullOrWhiteSpace(maximumConcurrentReceiverProcessesCountValue))
338 : {
339 : if (!int.TryParse(maximumConcurrentReceiverProcessesCountValue, out maximumConcurrentReceiverProcessesCount))
340 : maximumConcurrentReceiverProcessesCount = DefaultMaximumConcurrentReceiverProcessesCount;
341 : }
342 : else
343 : maximumConcurrentReceiverProcessesCount = DefaultMaximumConcurrentReceiverProcessesCount;
344 : return maximumConcurrentReceiverProcessesCount;
345 : }
346 :
347 : #endregion
348 :
349 : #endregion
350 :
351 : #region Implementation of ICommandReceiver
352 :
353 0 : public void Start()
354 : {
355 : InstantiateReceiving();
356 :
357 : // Configure the callback options
358 : OnMessageOptions options = new OnMessageOptions
359 : {
360 : AutoComplete = false,
361 : AutoRenewTimeout = TimeSpan.FromMinutes(1)
362 : };
363 :
364 : // Callback to handle received messages
365 : RegisterReceiverMessageHandler(ReceiveCommand, options);
366 : }
367 :
368 : #endregion
369 : }
370 : }
|