Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Text;
13 : using System.Threading;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Bus;
17 : using Cqrs.Configuration;
18 : using Cqrs.Events;
19 : using Cqrs.Exceptions;
20 : using Cqrs.Messages;
21 : using Microsoft.ServiceBus.Messaging;
22 : using EventData = Microsoft.ServiceBus.Messaging.EventData;
23 :
24 : namespace Cqrs.Azure.ServiceBus
25 : {
26 : /// <summary>
27 : /// A <see cref="IEventReceiver{TAuthenticationToken}"/> that receives network messages, resolves handlers and executes the handler.
28 : /// </summary>
29 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
30 : public class AzureEventBusReceiver<TAuthenticationToken>
31 : : AzureEventHubBus<TAuthenticationToken>
32 : , IEventHandlerRegistrar
33 : , IEventReceiver<TAuthenticationToken>
34 2 : {
35 : /// <summary>
36 : /// The configuration key for
37 : /// the number of receiver <see cref="SubscriptionClient"/> instances to create
38 : /// as used by <see cref="IConfigurationManager"/>.
39 : /// </summary>
40 : protected virtual string NumberOfReceiversCountConfigurationKey
41 : {
42 : get { return "Cqrs.Azure.EventHub.EventBus.NumberOfReceiversCount"; }
43 : }
44 :
45 : // ReSharper disable StaticMemberInGenericType
46 : /// <summary>
47 : /// Gets the <see cref="RouteManager"/>.
48 : /// </summary>
49 : public static RouteManager Routes { get; private set; }
50 :
51 : /// <summary>
52 : /// The number of handles currently being executed.
53 : /// </summary>
54 : protected static long CurrentHandles { get; set; }
55 : // ReSharper restore StaticMemberInGenericType
56 :
57 : static AzureEventBusReceiver()
58 : {
59 : Routes = new RouteManager();
60 : }
61 :
62 : /// <summary>
63 : /// Instantiates a new instance of <see cref="AzureEventBusReceiver{TAuthenticationToken}"/>.
64 : /// </summary>
65 : public AzureEventBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IHashAlgorithmFactory hashAlgorithmFactory, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
66 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, hashAlgorithmFactory, azureBusHelper, false)
67 : {
68 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventHub.EventBus.Receiver.UseApplicationInsightTelemetryHelper", correlationIdHelper);
69 : }
70 :
71 : /// <summary>
72 : /// Register an event handler that will listen and respond to events.
73 : /// </summary>
74 : /// <remarks>
75 : /// In many cases the <paramref name="targetedType"/> will be the handler class itself, what you actually want is the target of what is being updated.
76 : /// </remarks>
77 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
78 : where TMessage : IMessage
79 : {
80 : AzureBusHelper.RegisterHandler(TelemetryHelper, Routes, handler, targetedType, holdMessageLock);
81 : }
82 :
83 : /// <summary>
84 : /// Register an event handler that will listen and respond to events.
85 : /// </summary>
86 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = false)
87 : where TMessage : IMessage
88 : {
89 : RegisterHandler(handler, null, holdMessageLock);
90 : }
91 :
92 : /// <summary>
93 : /// Register an event handler that will listen and respond to all events.
94 : /// </summary>
95 : public void RegisterGlobalEventHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true)
96 : where TMessage : IMessage
97 : {
98 : Routes.RegisterGlobalEventHandler(handler, holdMessageLock);
99 : }
100 :
101 : /// <summary>
102 : /// Receives a <see cref="BrokeredMessage"/> from the event bus.
103 : /// </summary>
104 : protected virtual void ReceiveEvent(PartitionContext context, EventData eventData)
105 : {
106 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
107 : Stopwatch mainStopWatch = Stopwatch.StartNew();
108 : string responseCode = "200";
109 : // Null means it was skipped
110 : bool? wasSuccessfull = true;
111 : string telemetryName = string.Format("Cqrs/Handle/Event/{0}", eventData.SequenceNumber);
112 : ISingleSignOnToken authenticationToken = null;
113 : Guid? guidAuthenticationToken = null;
114 : string stringAuthenticationToken = null;
115 : int? intAuthenticationToken = null;
116 :
117 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
118 : object value;
119 : if (eventData.Properties.TryGetValue("Type", out value))
120 : telemetryProperties.Add("MessageType", value.ToString());
121 : TelemetryHelper.TrackMetric("Cqrs/Handle/Event", CurrentHandles++, telemetryProperties);
122 : // Do a manual 10 try attempt with back-off
123 : for (int i = 0; i < 10; i++)
124 : {
125 : try
126 : {
127 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
128 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
129 :
130 : IEvent<TAuthenticationToken> @event = AzureBusHelper.ReceiveEvent(messageBody, ReceiveEvent,
131 : string.Format("partition key '{0}', sequence number '{1}' and offset '{2}'", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset),
132 : ExtractSignature(eventData),
133 : SigningTokenConfigurationKey,
134 : () =>
135 : {
136 : wasSuccessfull = null;
137 : telemetryName = string.Format("Cqrs/Handle/Event/Skipped/{0}", eventData.SequenceNumber);
138 : responseCode = "204";
139 : // Remove message from queue
140 : context.CheckpointAsync(eventData);
141 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but processing was skipped due to event settings.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
142 : TelemetryHelper.TrackEvent("Cqrs/Handle/Event/Skipped", telemetryProperties);
143 : }
144 : );
145 :
146 : if (wasSuccessfull != null)
147 : {
148 : if (@event != null)
149 : {
150 : telemetryName = string.Format("{0}/{1}/{2}", @event.GetType().FullName, @event.GetIdentity(), @event.Id);
151 : authenticationToken = @event.AuthenticationToken as ISingleSignOnToken;
152 : if (AuthenticationTokenIsGuid)
153 : guidAuthenticationToken = @event.AuthenticationToken as Guid?;
154 : if (AuthenticationTokenIsString)
155 : stringAuthenticationToken = @event.AuthenticationToken as string;
156 : if (AuthenticationTokenIsInt)
157 : intAuthenticationToken = @event.AuthenticationToken as int?;
158 :
159 : var telemeteredMessage = @event as ITelemeteredMessage;
160 : if (telemeteredMessage != null)
161 : telemetryName = telemeteredMessage.TelemetryName;
162 :
163 : telemetryName = string.Format("Cqrs/Handle/Event/{0}", telemetryName);
164 : }
165 : // Remove message from queue
166 : context.CheckpointAsync(eventData);
167 : }
168 : Logger.LogDebug(string.Format("An event message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
169 :
170 : IList<IEvent<TAuthenticationToken>> events;
171 : if (EventWaits.TryGetValue(@event.CorrelationId, out events))
172 : events.Add(@event);
173 :
174 : wasSuccessfull = true;
175 : responseCode = "200";
176 : return;
177 : }
178 : catch (UnAuthorisedMessageReceivedException exception)
179 : {
180 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
181 : // Indicates a problem, unlock message in queue
182 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but was not authorised.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
183 : wasSuccessfull = false;
184 : responseCode = "401";
185 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
186 : telemetryProperties.Add("ExceptionMessage", exception.Message);
187 : }
188 : catch (NoHandlersRegisteredException exception)
189 : {
190 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
191 : // Indicates a problem, unlock message in queue
192 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but no handlers were found to process it.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
193 : wasSuccessfull = false;
194 : responseCode = "501";
195 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
196 : telemetryProperties.Add("ExceptionMessage", exception.Message);
197 : }
198 : catch (NoHandlerRegisteredException exception)
199 : {
200 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
201 : // Indicates a problem, unlock message in queue
202 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'s but no handler was found to process it.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
203 : wasSuccessfull = false;
204 : responseCode = "501";
205 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
206 : telemetryProperties.Add("ExceptionMessage", exception.Message);
207 : }
208 : catch (Exception exception)
209 : {
210 : // Indicates a problem, unlock message in queue
211 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
212 :
213 : switch (i)
214 : {
215 : case 0:
216 : case 1:
217 : // 10 seconds
218 : Thread.Sleep(10 * 1000);
219 : break;
220 : case 2:
221 : case 3:
222 : // 30 seconds
223 : Thread.Sleep(30 * 1000);
224 : break;
225 : case 4:
226 : case 5:
227 : case 6:
228 : // 1 minute
229 : Thread.Sleep(60 * 1000);
230 : break;
231 : case 7:
232 : case 8:
233 : // 3 minutes
234 : Thread.Sleep(3 * 60 * 1000);
235 : break;
236 : case 9:
237 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
238 : telemetryProperties.Add("ExceptionMessage", exception.Message);
239 : break;
240 : }
241 : wasSuccessfull = false;
242 : responseCode = "500";
243 : }
244 : finally
245 : {
246 : // Eventually just accept it
247 : context.CheckpointAsync(eventData);
248 :
249 : TelemetryHelper.TrackMetric("Cqrs/Handle/Event", CurrentHandles--, telemetryProperties);
250 :
251 : mainStopWatch.Stop();
252 : if (guidAuthenticationToken != null)
253 : TelemetryHelper.TrackRequest
254 : (
255 : telemetryName,
256 : guidAuthenticationToken,
257 : startedAt,
258 : mainStopWatch.Elapsed,
259 : responseCode,
260 : wasSuccessfull == null || wasSuccessfull.Value,
261 : telemetryProperties
262 : );
263 : else if (intAuthenticationToken != null)
264 : TelemetryHelper.TrackRequest
265 : (
266 : telemetryName,
267 : intAuthenticationToken,
268 : startedAt,
269 : mainStopWatch.Elapsed,
270 : responseCode,
271 : wasSuccessfull == null || wasSuccessfull.Value,
272 : telemetryProperties
273 : );
274 : else if (stringAuthenticationToken != null)
275 : TelemetryHelper.TrackRequest
276 : (
277 : telemetryName,
278 : stringAuthenticationToken,
279 : startedAt,
280 : mainStopWatch.Elapsed,
281 : responseCode,
282 : wasSuccessfull == null || wasSuccessfull.Value,
283 : telemetryProperties
284 : );
285 : else
286 : TelemetryHelper.TrackRequest
287 : (
288 : telemetryName,
289 : authenticationToken,
290 : startedAt,
291 : mainStopWatch.Elapsed,
292 : responseCode,
293 : wasSuccessfull == null || wasSuccessfull.Value,
294 : telemetryProperties
295 : );
296 :
297 : TelemetryHelper.Flush();
298 : }
299 : }
300 : }
301 :
302 : /// <summary>
303 : /// Receives a <see cref="IEvent{TAuthenticationToken}"/> from the event bus.
304 : /// </summary>
305 : public virtual bool? ReceiveEvent(IEvent<TAuthenticationToken> @event)
306 : {
307 : return AzureBusHelper.DefaultReceiveEvent(@event, Routes, "Azure-EventHub");
308 : }
309 :
310 : #region Overrides of AzureServiceBus<TAuthenticationToken>
311 :
312 : /// <summary>
313 : /// Returns <see cref="NumberOfReceiversCountConfigurationKey"/> from <see cref="IConfigurationManager"/>
314 : /// if no value is set, returns <see cref="AzureBus{TAuthenticationToken}.DefaultNumberOfReceiversCount"/>.
315 : /// </summary>
316 : protected override int GetCurrentNumberOfReceiversCount()
317 : {
318 : string numberOfReceiversCountValue;
319 : int numberOfReceiversCount;
320 : if (ConfigurationManager.TryGetSetting(NumberOfReceiversCountConfigurationKey, out numberOfReceiversCountValue) && !string.IsNullOrWhiteSpace(numberOfReceiversCountValue))
321 : {
322 : if (!int.TryParse(numberOfReceiversCountValue, out numberOfReceiversCount))
323 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
324 : }
325 : else
326 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
327 : return numberOfReceiversCount;
328 : }
329 :
330 : #endregion
331 :
332 : #region Implementation of IEventReceiver
333 :
334 : /// <summary>
335 : /// Starts listening and processing instances of <see cref="IEvent{TAuthenticationToken}"/> from the event bus.
336 : /// </summary>
337 : public void Start()
338 : {
339 : InstantiateReceiving();
340 :
341 : // Callback to handle received messages
342 : RegisterReceiverMessageHandler(ReceiveEvent);
343 : }
344 :
345 : #endregion
346 : }
347 : }
|