Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Linq;
13 : using System.Threading;
14 : using System.Threading.Tasks;
15 : using Cqrs.Authentication;
16 : using Cqrs.Bus;
17 : using Cqrs.Configuration;
18 : using Cqrs.Events;
19 : using cdmdotnet.Logging;
20 : using Cqrs.Messages;
21 : using Microsoft.ServiceBus;
22 : using Microsoft.ServiceBus.Messaging;
23 :
24 : namespace Cqrs.Azure.ServiceBus
25 : {
26 : public class AzureEventBusReceiver<TAuthenticationToken>
27 : : AzureEventBus<TAuthenticationToken>
28 : , IEventHandlerRegistrar
29 : , IEventReceiver<TAuthenticationToken>
30 0 : {
31 : protected virtual string NumberOfReceiversCountConfigurationKey
32 : {
33 : get { return "Cqrs.Azure.EventBus.NumberOfReceiversCount"; }
34 : }
35 :
36 : protected virtual string MaximumConcurrentReceiverProcessesCountConfigurationKey
37 : {
38 : get { return "Cqrs.Azure.EventBus.MaximumConcurrentReceiverProcessesCount"; }
39 : }
40 :
41 : protected virtual string FilterKeyConfigurationKey
42 : {
43 : get { return "Cqrs.Azure.EventBus.TopicName.SubscriptionName.Filter"; }
44 : }
45 :
46 : protected string FilterKey { get; set; }
47 :
48 : // ReSharper disable StaticMemberInGenericType
49 : protected static RouteManager Routes { get; private set; }
50 :
51 : protected static long CurrentHandles { get; set; }
52 : // ReSharper restore StaticMemberInGenericType
53 :
54 : static AzureEventBusReceiver()
55 : {
56 : Routes = new RouteManager();
57 : }
58 :
59 0 : public AzureEventBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper)
60 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, busHelper, false)
61 : {
62 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventBus.Receiver.UseApplicationInsightTelemetryHelper", correlationIdHelper);
63 : }
64 :
65 0 : public void Start()
66 : {
67 : InstantiateReceiving();
68 :
69 : // Configure the callback options
70 : OnMessageOptions options = new OnMessageOptions
71 : {
72 : AutoComplete = false,
73 : AutoRenewTimeout = TimeSpan.FromMinutes(1),
74 : MaxConcurrentCalls = MaximumConcurrentReceiverProcessesCount
75 : };
76 :
77 : // Callback to handle received messages
78 : RegisterReceiverMessageHandler(ReceiveEvent, options);
79 : }
80 :
81 : #region Overrides of AzureServiceBus<TAuthenticationToken>
82 :
83 0 : protected override void InstantiateReceiving(NamespaceManager namespaceManager, IDictionary<int, SubscriptionClient> serviceBusReceivers, string topicName, string topicSubscriptionName)
84 : {
85 : base.InstantiateReceiving(namespaceManager, serviceBusReceivers, topicName, topicSubscriptionName);
86 :
87 : Task.Factory.StartNewSafely
88 : (() =>
89 : {
90 : // Because refreshing the rule can take a while, we only want to do this when the value changes
91 : string filter;
92 : if (!ConfigurationManager.TryGetSetting(FilterKeyConfigurationKey, out filter))
93 : return;
94 : if (FilterKey == filter)
95 : return;
96 : FilterKey = filter;
97 :
98 : // https://docs.microsoft.com/en-us/azure/application-insights/app-insights-analytics-reference#summarize-operator
99 : // http://www.summa.com/blog/business-blog/everything-you-need-to-know-about-azure-service-bus-brokered-messaging-part-2#rulesfiltersactions
100 : // https://github.com/Azure-Samples/azure-servicebus-messaging-samples/tree/master/TopicFilters
101 : SubscriptionClient client = serviceBusReceivers[0];
102 : bool reAddRule = false;
103 : try
104 : {
105 : IEnumerable<RuleDescription> rules = namespaceManager.GetRules(client.TopicPath, client.Name).ToList();
106 : RuleDescription ruleDescription = rules.SingleOrDefault(rule => rule.Name == "CqrsConfiguredFilter");
107 : if (ruleDescription != null)
108 : {
109 : var sqlFilter = ruleDescription.Filter as SqlFilter;
110 : if (sqlFilter == null && !string.IsNullOrWhiteSpace(filter))
111 : reAddRule = true;
112 : else if (sqlFilter != null && sqlFilter.SqlExpression != filter)
113 : reAddRule = true;
114 : if (sqlFilter != null && reAddRule)
115 : client.RemoveRule("CqrsConfiguredFilter");
116 : }
117 : else if (!string.IsNullOrWhiteSpace(filter))
118 : reAddRule = true;
119 :
120 : ruleDescription = rules.SingleOrDefault(rule => rule.Name == "$Default");
121 : // If there is a default rule and we have a rule, it will cause issues
122 : if (!string.IsNullOrWhiteSpace(filter) && ruleDescription != null)
123 : client.RemoveRule("$Default");
124 : // If we don't have a rule and there is no longer a default rule, it will cause issues
125 : else if (string.IsNullOrWhiteSpace(filter) && !rules.Any())
126 : {
127 : ruleDescription = new RuleDescription
128 : (
129 : "$Default",
130 : new SqlFilter("1=1")
131 : );
132 : client.AddRule(ruleDescription);
133 : }
134 : }
135 : catch (MessagingEntityNotFoundException)
136 : {
137 : }
138 :
139 : if (!reAddRule)
140 : return;
141 :
142 : int loopCounter = 0;
143 : while (loopCounter < 10)
144 : {
145 : try
146 : {
147 : RuleDescription ruleDescription = new RuleDescription
148 : (
149 : "CqrsConfiguredFilter",
150 : new SqlFilter(filter)
151 : );
152 : client.AddRule(ruleDescription);
153 : break;
154 : }
155 : catch (MessagingEntityAlreadyExistsException exception)
156 : {
157 : loopCounter++;
158 : // Still waiting for the delete to complete
159 : Thread.Sleep(1000);
160 : if (loopCounter == 9)
161 : {
162 : Logger.LogError("Setting the filter failed as it already exists.", exception: exception);
163 : TelemetryHelper.TrackException(exception);
164 : }
165 : }
166 : catch (Exception exception)
167 : {
168 : Logger.LogError("Setting the filter failed.", exception: exception);
169 : TelemetryHelper.TrackException(exception);
170 : break;
171 : }
172 : }
173 : });
174 : }
175 :
176 : #endregion
177 :
178 : /// <summary>
179 : /// Register an event or command handler that will listen and respond to events or commands.
180 : /// </summary>
181 2 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
182 : where TMessage : IMessage
183 : {
184 : AzureBusHelper.RegisterHandler(TelemetryHelper, Routes, handler, targetedType, holdMessageLock);
185 : }
186 :
187 : /// <summary>
188 : /// Register an event or command handler that will listen and respond to events or commands.
189 : /// </summary>
190 2 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = false)
191 : where TMessage : IMessage
192 : {
193 : RegisterHandler(handler, null, holdMessageLock);
194 : }
195 :
196 0 : protected virtual void ReceiveEvent(BrokeredMessage message)
197 : {
198 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
199 : Stopwatch mainStopWatch = Stopwatch.StartNew();
200 : string responseCode = "200";
201 : // Null means it was skipped
202 : bool? wasSuccessfull = true;
203 : string telemetryName = string.Format("Cqrs/Handle/Event/{0}", message.MessageId);
204 : ISingleSignOnToken authenticationToken = null;
205 :
206 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
207 : object value;
208 : if (message.Properties.TryGetValue("Type", out value))
209 : telemetryProperties.Add("MessageType", value.ToString());
210 : TelemetryHelper.TrackMetric("Cqrs/Handle/Event", CurrentHandles++, telemetryProperties);
211 : var brokeredMessageRenewCancellationTokenSource = new CancellationTokenSource();
212 : try
213 : {
214 : Logger.LogDebug(string.Format("An event message arrived with the id '{0}'.", message.MessageId));
215 : string messageBody = message.GetBody<string>();
216 :
217 : IEvent<TAuthenticationToken> @event = AzureBusHelper.ReceiveEvent(messageBody, ReceiveEvent,
218 : string.Format("id '{0}'", message.MessageId),
219 : () =>
220 : {
221 : wasSuccessfull = null;
222 : telemetryName = string.Format("Cqrs/Handle/Event/Skipped/{0}", message.MessageId);
223 : responseCode = "204";
224 : // Remove message from queue
225 : try
226 : {
227 : message.Complete();
228 : }
229 : catch (MessageLockLostException exception)
230 : {
231 : throw new MessageLockLostException(string.Format("The lock supplied for the skipped event message '{0}' is invalid.", message.MessageId), exception);
232 : }
233 : Logger.LogDebug(string.Format("An event message arrived with the id '{0}' but processing was skipped due to event settings.", message.MessageId));
234 : TelemetryHelper.TrackEvent("Cqrs/Handle/Event/Skipped", telemetryProperties);
235 : },
236 : () =>
237 : {
238 : AzureBusHelper.RefreshLock(brokeredMessageRenewCancellationTokenSource, message, "event");
239 : }
240 : );
241 :
242 : if (wasSuccessfull != null)
243 : {
244 : if (@event != null)
245 : {
246 : telemetryName = string.Format("{0}/{1}", @event.GetType().FullName, @event.Id);
247 : authenticationToken = @event.AuthenticationToken as ISingleSignOnToken;
248 :
249 : var telemeteredMessage = @event as ITelemeteredMessage;
250 : if (telemeteredMessage != null)
251 : telemetryName = telemeteredMessage.TelemetryName;
252 :
253 : telemetryName = string.Format("Cqrs/Handle/Event/{0}", telemetryName);
254 : }
255 : // Remove message from queue
256 : try
257 : {
258 : message.Complete();
259 : }
260 : catch (MessageLockLostException exception)
261 : {
262 : throw new MessageLockLostException(string.Format("The lock supplied for event '{0}' of type {1} is invalid.", @event.Id, @event.GetType().Name), exception);
263 : }
264 : }
265 : Logger.LogDebug(string.Format("An event message arrived and was processed with the id '{0}'.", message.MessageId));
266 :
267 : IList<IEvent<TAuthenticationToken>> events;
268 : if (EventWaits.TryGetValue(@event.CorrelationId, out events))
269 : events.Add(@event);
270 : }
271 : catch (MessageLockLostException exception)
272 : {
273 : IDictionary<string, string> subTelemetryProperties = new Dictionary<string, string>(telemetryProperties);
274 : subTelemetryProperties.Add("TimeTaken", mainStopWatch.Elapsed.ToString());
275 : TelemetryHelper.TrackException(exception, null, subTelemetryProperties);
276 : if (ThrowExceptionOnReceiverMessageLockLostExceptionDuringComplete)
277 : {
278 : Logger.LogError(exception.Message, exception: exception);
279 : // Indicates a problem, unlock message in queue
280 : message.Abandon();
281 : wasSuccessfull = false;
282 : }
283 : else
284 : {
285 : Logger.LogWarning(exception.Message, exception: exception);
286 : try
287 : {
288 : message.DeadLetter("LockLostButHandled", "The message was handled but the lock was lost.");
289 : }
290 : catch (Exception)
291 : {
292 : // Oh well, move on.
293 : message.Abandon();
294 : }
295 : }
296 : responseCode = "599";
297 : }
298 : catch (Exception exception)
299 : {
300 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
301 : // Indicates a problem, unlock message in queue
302 : Logger.LogError(string.Format("An event message arrived with the id '{0}' but failed to be process.", message.MessageId), exception: exception);
303 : message.Abandon();
304 : wasSuccessfull = false;
305 : responseCode = "500";
306 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
307 : telemetryProperties.Add("ExceptionMessage", exception.Message);
308 : }
309 : finally
310 : {
311 : // Cancel the lock of renewing the task
312 : brokeredMessageRenewCancellationTokenSource.Cancel();
313 : TelemetryHelper.TrackMetric("Cqrs/Handle/Event", CurrentHandles--, telemetryProperties);
314 :
315 : mainStopWatch.Stop();
316 : TelemetryHelper.TrackRequest
317 : (
318 : telemetryName,
319 : authenticationToken,
320 : startedAt,
321 : mainStopWatch.Elapsed,
322 : responseCode,
323 : wasSuccessfull == null || wasSuccessfull.Value,
324 : telemetryProperties
325 : );
326 :
327 : TelemetryHelper.Flush();
328 : }
329 : }
330 :
331 0 : public virtual bool? ReceiveEvent(IEvent<TAuthenticationToken> @event)
332 : {
333 : return AzureBusHelper.DefaultReceiveEvent(@event, Routes, "Azure-ServiceBus");
334 : }
335 :
336 : #region Overrides of AzureServiceBus<TAuthenticationToken>
337 :
338 0 : protected override int GetCurrentNumberOfReceiversCount()
339 : {
340 : string numberOfReceiversCountValue;
341 : int numberOfReceiversCount;
342 : if (ConfigurationManager.TryGetSetting(NumberOfReceiversCountConfigurationKey, out numberOfReceiversCountValue) && !string.IsNullOrWhiteSpace(numberOfReceiversCountValue))
343 : {
344 : if (!int.TryParse(numberOfReceiversCountValue, out numberOfReceiversCount))
345 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
346 : }
347 : else
348 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
349 : return numberOfReceiversCount;
350 : }
351 :
352 0 : protected override int GetCurrentMaximumConcurrentReceiverProcessesCount()
353 : {
354 : string maximumConcurrentReceiverProcessesCountValue;
355 : int maximumConcurrentReceiverProcessesCount;
356 : if (ConfigurationManager.TryGetSetting(MaximumConcurrentReceiverProcessesCountConfigurationKey, out maximumConcurrentReceiverProcessesCountValue) && !string.IsNullOrWhiteSpace(maximumConcurrentReceiverProcessesCountValue))
357 : {
358 : if (!int.TryParse(maximumConcurrentReceiverProcessesCountValue, out maximumConcurrentReceiverProcessesCount))
359 : maximumConcurrentReceiverProcessesCount = DefaultMaximumConcurrentReceiverProcessesCount;
360 : }
361 : else
362 : maximumConcurrentReceiverProcessesCount = DefaultMaximumConcurrentReceiverProcessesCount;
363 : return maximumConcurrentReceiverProcessesCount;
364 : }
365 :
366 : #endregion
367 : }
368 : }
|