Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Text;
13 : using System.Threading;
14 : using Chinchilla.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Bus;
17 : using Cqrs.Configuration;
18 : using Cqrs.Events;
19 : using Cqrs.Exceptions;
20 : using Cqrs.Messages;
21 : #if NET452
22 : using Microsoft.ServiceBus.Messaging;
23 : using EventData = Microsoft.ServiceBus.Messaging.EventData;
24 : #endif
25 : #if NETCOREAPP3_0
26 : using Microsoft.Azure.EventHubs;
27 : using Microsoft.Azure.ServiceBus.Core;
28 : using Microsoft.Azure.EventHubs.Processor;
29 : using EventData = Microsoft.Azure.EventHubs.EventData;
30 : #endif
31 :
32 : namespace Cqrs.Azure.ServiceBus
33 : {
34 : /// <summary>
35 : /// A <see cref="IEventReceiver{TAuthenticationToken}"/> that receives network messages, resolves handlers and executes the handler.
36 : /// </summary>
37 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
38 : public class AzureEventBusReceiver<TAuthenticationToken>
39 : : AzureEventHubBus<TAuthenticationToken>
40 : , IEventHandlerRegistrar
41 : , IEventReceiver<TAuthenticationToken>
42 2 : {
43 : #if NET452
44 : /// <summary>
45 : /// The configuration key for
46 : /// the number of receiver <see cref="SubscriptionClient"/> instances to create
47 : /// as used by <see cref="IConfigurationManager"/>.
48 : /// </summary>
49 : #endif
50 : #if NETCOREAPP3_0
51 : /// <summary>
52 : /// The configuration key for
53 : /// the number of receiver <see cref="IMessageReceiver"/> instances to create
54 : /// as used by <see cref="IConfigurationManager"/>.
55 : /// </summary>
56 : #endif
57 : protected virtual string NumberOfReceiversCountConfigurationKey
58 : {
59 : get { return "Cqrs.Azure.EventHub.EventBus.NumberOfReceiversCount"; }
60 : }
61 :
62 : // ReSharper disable StaticMemberInGenericType
63 : /// <summary>
64 : /// Gets the <see cref="RouteManager"/>.
65 : /// </summary>
66 : public static RouteManager Routes { get; private set; }
67 :
68 : /// <summary>
69 : /// The number of handles currently being executed.
70 : /// </summary>
71 : protected static long CurrentHandles { get; set; }
72 : // ReSharper restore StaticMemberInGenericType
73 :
74 : static AzureEventBusReceiver()
75 : {
76 : Routes = new RouteManager();
77 : }
78 :
79 : /// <summary>
80 : /// Instantiates a new instance of <see cref="AzureEventBusReceiver{TAuthenticationToken}"/>.
81 : /// </summary>
82 : public AzureEventBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IHashAlgorithmFactory hashAlgorithmFactory, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
83 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, hashAlgorithmFactory, azureBusHelper, false)
84 : {
85 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventHub.EventBus.Receiver.UseApplicationInsightTelemetryHelper", correlationIdHelper);
86 : }
87 :
88 : /// <summary>
89 : /// Register an event handler that will listen and respond to events.
90 : /// </summary>
91 : /// <remarks>
92 : /// In many cases the <paramref name="targetedType"/> will be the handler class itself, what you actually want is the target of what is being updated.
93 : /// </remarks>
94 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
95 : where TMessage : IMessage
96 : {
97 : AzureBusHelper.RegisterHandler(TelemetryHelper, Routes, handler, targetedType, holdMessageLock);
98 : }
99 :
100 : /// <summary>
101 : /// Register an event handler that will listen and respond to events.
102 : /// </summary>
103 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = false)
104 : where TMessage : IMessage
105 : {
106 : RegisterHandler(handler, null, holdMessageLock);
107 : }
108 :
109 : /// <summary>
110 : /// Register an event handler that will listen and respond to all events.
111 : /// </summary>
112 : public void RegisterGlobalEventHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true)
113 : where TMessage : IMessage
114 : {
115 : Routes.RegisterGlobalEventHandler(handler, holdMessageLock);
116 : }
117 :
118 : /// <summary>
119 : /// Receives a <see cref="EventData"/> from the event bus.
120 : /// </summary>
121 : protected virtual void ReceiveEvent(PartitionContext context, EventData eventData)
122 : {
123 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
124 : Stopwatch mainStopWatch = Stopwatch.StartNew();
125 : string responseCode = "200";
126 : // Null means it was skipped
127 : bool? wasSuccessfull = true;
128 : #if NET452
129 : string telemetryName = string.Format("Cqrs/Handle/Event/{0}", eventData.SequenceNumber);
130 : #endif
131 : #if NETCOREAPP3_0
132 : string telemetryName = string.Format("Cqrs/Handle/Event/{0}", eventData.SystemProperties.SequenceNumber);
133 : #endif
134 : ISingleSignOnToken authenticationToken = null;
135 : Guid? guidAuthenticationToken = null;
136 : string stringAuthenticationToken = null;
137 : int? intAuthenticationToken = null;
138 :
139 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
140 : object value;
141 : if (eventData.Properties.TryGetValue("Type", out value))
142 : telemetryProperties.Add("MessageType", value.ToString());
143 : TelemetryHelper.TrackMetric("Cqrs/Handle/Event", CurrentHandles++, telemetryProperties);
144 : // Do a manual 10 try attempt with back-off
145 : for (int i = 0; i < 10; i++)
146 : {
147 : try
148 : {
149 : #if NET452
150 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
151 : #endif
152 : #if NETCOREAPP3_0
153 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset));
154 : #endif
155 : #if NET452
156 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
157 : #endif
158 : #if NETCOREAPP3_0
159 : string messageBody = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
160 : #endif
161 :
162 : IEvent<TAuthenticationToken> @event = AzureBusHelper.ReceiveEvent(messageBody, ReceiveEvent,
163 : #if NET452
164 : string.Format("partition key '{0}', sequence number '{1}' and offset '{2}'", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset),
165 : #endif
166 : #if NETCOREAPP3_0
167 : string.Format("partition key '{0}', sequence number '{1}' and offset '{2}'", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset),
168 : #endif
169 : ExtractSignature(eventData),
170 : SigningTokenConfigurationKey,
171 : () =>
172 : {
173 : wasSuccessfull = null;
174 : #if NET452
175 : telemetryName = string.Format("Cqrs/Handle/Event/Skipped/{0}", eventData.SequenceNumber);
176 : #endif
177 : #if NETCOREAPP3_0
178 : telemetryName = string.Format("Cqrs/Handle/Event/Skipped/{0}", eventData.SystemProperties.SequenceNumber);
179 : #endif
180 : responseCode = "204";
181 : // Remove message from queue
182 : context.CheckpointAsync(eventData);
183 : #if NET452
184 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but processing was skipped due to event settings.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
185 : #endif
186 : #if NETCOREAPP3_0
187 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but processing was skipped due to event settings.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset));
188 : #endif
189 : TelemetryHelper.TrackEvent("Cqrs/Handle/Event/Skipped", telemetryProperties);
190 : }
191 : );
192 :
193 : if (wasSuccessfull != null)
194 : {
195 : if (@event != null)
196 : {
197 : telemetryName = string.Format("{0}/{1}/{2}", @event.GetType().FullName, @event.GetIdentity(), @event.Id);
198 : authenticationToken = @event.AuthenticationToken as ISingleSignOnToken;
199 : if (AuthenticationTokenIsGuid)
200 : guidAuthenticationToken = @event.AuthenticationToken as Guid?;
201 : if (AuthenticationTokenIsString)
202 : stringAuthenticationToken = @event.AuthenticationToken as string;
203 : if (AuthenticationTokenIsInt)
204 : intAuthenticationToken = @event.AuthenticationToken as int?;
205 :
206 : var telemeteredMessage = @event as ITelemeteredMessage;
207 : if (telemeteredMessage != null)
208 : telemetryName = telemeteredMessage.TelemetryName;
209 :
210 : telemetryName = string.Format("Cqrs/Handle/Event/{0}", telemetryName);
211 : }
212 : // Remove message from queue
213 : context.CheckpointAsync(eventData);
214 : }
215 : #if NET452
216 : Logger.LogDebug(string.Format("An event message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
217 : #endif
218 : #if NETCOREAPP3_0
219 : Logger.LogDebug(string.Format("An event message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset));
220 : #endif
221 :
222 : IList<IEvent<TAuthenticationToken>> events;
223 : if (EventWaits.TryGetValue(@event.CorrelationId, out events))
224 : events.Add(@event);
225 :
226 : wasSuccessfull = true;
227 : responseCode = "200";
228 : return;
229 : }
230 : catch (UnAuthorisedMessageReceivedException exception)
231 : {
232 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
233 : // Indicates a problem, unlock message in queue
234 : #if NET452
235 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but was not authorised.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
236 : #endif
237 : #if NETCOREAPP3_0
238 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but was not authorised.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset), exception: exception);
239 : #endif
240 : wasSuccessfull = false;
241 : responseCode = "401";
242 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
243 : telemetryProperties.Add("ExceptionMessage", exception.Message);
244 : }
245 : catch (NoHandlersRegisteredException exception)
246 : {
247 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
248 : // Indicates a problem, unlock message in queue
249 : #if NET452
250 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but no handlers were found to process it.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
251 : #endif
252 : #if NETCOREAPP3_0
253 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but no handlers were found to process it.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset), exception: exception);
254 : #endif
255 : wasSuccessfull = false;
256 : responseCode = "501";
257 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
258 : telemetryProperties.Add("ExceptionMessage", exception.Message);
259 : }
260 : catch (NoHandlerRegisteredException exception)
261 : {
262 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
263 : // Indicates a problem, unlock message in queue
264 : #if NET452
265 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'s but no handler was found to process it.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
266 : #endif
267 : #if NETCOREAPP3_0
268 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'s but no handler was found to process it.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset), exception: exception);
269 : #endif
270 : wasSuccessfull = false;
271 : responseCode = "501";
272 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
273 : telemetryProperties.Add("ExceptionMessage", exception.Message);
274 : }
275 : catch (Exception exception)
276 : {
277 : // Indicates a problem, unlock message in queue
278 : #if NET452
279 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
280 : #endif
281 : #if NETCOREAPP3_0
282 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset), exception: exception);
283 : #endif
284 :
285 : switch (i)
286 : {
287 : case 0:
288 : case 1:
289 : // 10 seconds
290 : Thread.Sleep(10 * 1000);
291 : break;
292 : case 2:
293 : case 3:
294 : // 30 seconds
295 : Thread.Sleep(30 * 1000);
296 : break;
297 : case 4:
298 : case 5:
299 : case 6:
300 : // 1 minute
301 : Thread.Sleep(60 * 1000);
302 : break;
303 : case 7:
304 : case 8:
305 : // 3 minutes
306 : Thread.Sleep(3 * 60 * 1000);
307 : break;
308 : case 9:
309 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
310 : telemetryProperties.Add("ExceptionMessage", exception.Message);
311 : break;
312 : }
313 : wasSuccessfull = false;
314 : responseCode = "500";
315 : }
316 : finally
317 : {
318 : // Eventually just accept it
319 : context.CheckpointAsync(eventData);
320 :
321 : TelemetryHelper.TrackMetric("Cqrs/Handle/Event", CurrentHandles--, telemetryProperties);
322 :
323 : mainStopWatch.Stop();
324 : if (guidAuthenticationToken != null)
325 : TelemetryHelper.TrackRequest
326 : (
327 : telemetryName,
328 : guidAuthenticationToken,
329 : startedAt,
330 : mainStopWatch.Elapsed,
331 : responseCode,
332 : wasSuccessfull == null || wasSuccessfull.Value,
333 : telemetryProperties
334 : );
335 : else if (intAuthenticationToken != null)
336 : TelemetryHelper.TrackRequest
337 : (
338 : telemetryName,
339 : intAuthenticationToken,
340 : startedAt,
341 : mainStopWatch.Elapsed,
342 : responseCode,
343 : wasSuccessfull == null || wasSuccessfull.Value,
344 : telemetryProperties
345 : );
346 : else if (stringAuthenticationToken != null)
347 : TelemetryHelper.TrackRequest
348 : (
349 : telemetryName,
350 : stringAuthenticationToken,
351 : startedAt,
352 : mainStopWatch.Elapsed,
353 : responseCode,
354 : wasSuccessfull == null || wasSuccessfull.Value,
355 : telemetryProperties
356 : );
357 : else
358 : TelemetryHelper.TrackRequest
359 : (
360 : telemetryName,
361 : authenticationToken,
362 : startedAt,
363 : mainStopWatch.Elapsed,
364 : responseCode,
365 : wasSuccessfull == null || wasSuccessfull.Value,
366 : telemetryProperties
367 : );
368 :
369 : TelemetryHelper.Flush();
370 : }
371 : }
372 : }
373 :
374 : /// <summary>
375 : /// Receives a <see cref="IEvent{TAuthenticationToken}"/> from the event bus.
376 : /// </summary>
377 : public virtual bool? ReceiveEvent(IEvent<TAuthenticationToken> @event)
378 : {
379 : return AzureBusHelper.DefaultReceiveEvent(@event, Routes, "Azure-EventHub");
380 : }
381 :
382 : #region Overrides of AzureServiceBus<TAuthenticationToken>
383 :
384 : /// <summary>
385 : /// Returns <see cref="NumberOfReceiversCountConfigurationKey"/> from <see cref="IConfigurationManager"/>
386 : /// if no value is set, returns <see cref="AzureBus{TAuthenticationToken}.DefaultNumberOfReceiversCount"/>.
387 : /// </summary>
388 : protected override int GetCurrentNumberOfReceiversCount()
389 : {
390 : string numberOfReceiversCountValue;
391 : int numberOfReceiversCount;
392 : if (ConfigurationManager.TryGetSetting(NumberOfReceiversCountConfigurationKey, out numberOfReceiversCountValue) && !string.IsNullOrWhiteSpace(numberOfReceiversCountValue))
393 : {
394 : if (!int.TryParse(numberOfReceiversCountValue, out numberOfReceiversCount))
395 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
396 : }
397 : else
398 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
399 : return numberOfReceiversCount;
400 : }
401 :
402 : #endregion
403 :
404 : #region Implementation of IEventReceiver
405 :
406 : /// <summary>
407 : /// Starts listening and processing instances of <see cref="IEvent{TAuthenticationToken}"/> from the event bus.
408 : /// </summary>
409 : public void Start()
410 : {
411 : InstantiateReceiving();
412 :
413 : // Callback to handle received messages
414 : RegisterReceiverMessageHandler(ReceiveEvent);
415 : }
416 :
417 : #endregion
418 : }
419 : }
|