Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Text;
13 : using System.Threading;
14 : using Chinchilla.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Bus;
17 : using Cqrs.Commands;
18 : using Cqrs.Configuration;
19 : using Cqrs.Exceptions;
20 : using Cqrs.Messages;
21 : #if NET452
22 : using Microsoft.ServiceBus.Messaging;
23 : using EventData = Microsoft.ServiceBus.Messaging.EventData;
24 : #endif
25 : #if NETCOREAPP3_0
26 : using Microsoft.Azure.EventHubs;
27 : using Microsoft.Azure.EventHubs.Processor;
28 : using EventData = Microsoft.Azure.EventHubs.EventData;
29 : #endif
30 :
31 : namespace Cqrs.Azure.ServiceBus
32 : {
33 : /// <summary>
34 : /// A <see cref="ICommandReceiver{TAuthenticationToken}"/> that receives network messages, resolves handlers and executes the handler.
35 : /// </summary>
36 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
37 : public class AzureCommandBusReceiver<TAuthenticationToken>
38 : : AzureCommandBus<TAuthenticationToken>
39 : , ICommandHandlerRegistrar
40 : , ICommandReceiver<TAuthenticationToken>
41 2 : {
42 : // ReSharper disable StaticMemberInGenericType
43 : /// <summary>
44 : /// Gets the <see cref="RouteManager"/>.
45 : /// </summary>
46 : public static RouteManager Routes { get; private set; }
47 :
48 : /// <summary>
49 : /// The number of handles currently being executed.
50 : /// </summary>
51 : protected static long CurrentHandles { get; set; }
52 : // ReSharper restore StaticMemberInGenericType
53 :
54 : static AzureCommandBusReceiver()
55 : {
56 : Routes = new RouteManager();
57 : }
58 :
59 : /// <summary>
60 : /// Instantiates a new instance of <see cref="AzureCommandBusReceiver{TAuthenticationToken}"/>.
61 : /// </summary>
62 : public AzureCommandBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IHashAlgorithmFactory hashAlgorithmFactory, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
63 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, hashAlgorithmFactory, azureBusHelper, false)
64 : {
65 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventHub.CommandBus.Receiver.UseApplicationInsightTelemetryHelper", correlationIdHelper);
66 : }
67 :
68 : /// <summary>
69 : /// Register a command handler that will listen and respond to commands.
70 : /// </summary>
71 : /// <remarks>
72 : /// In many cases the <paramref name="targetedType"/> will be the handler class itself, what you actually want is the target of what is being updated.
73 : /// </remarks>
74 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
75 : where TMessage : IMessage
76 : {
77 : AzureBusHelper.RegisterHandler(TelemetryHelper, Routes, handler, targetedType, holdMessageLock);
78 : }
79 :
80 : /// <summary>
81 : /// Register a command handler that will listen and respond to commands.
82 : /// </summary>
83 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true)
84 : where TMessage : IMessage
85 : {
86 : RegisterHandler(handler, null, holdMessageLock);
87 : }
88 :
89 : /// <summary>
90 : /// Receives <see cref="EventData"/> from the command bus.
91 : /// </summary>
92 : protected virtual void ReceiveCommand(PartitionContext context, EventData eventData)
93 : {
94 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
95 : Stopwatch mainStopWatch = Stopwatch.StartNew();
96 : string responseCode = "200";
97 : // Null means it was skipped
98 : bool? wasSuccessfull = true;
99 : #if NET452
100 : string telemetryName = string.Format("Cqrs/Handle/Command/{0}", eventData.SequenceNumber);
101 : #endif
102 : #if NETCOREAPP3_0
103 : string telemetryName = string.Format("Cqrs/Handle/Command/{0}", eventData.SystemProperties.SequenceNumber);
104 : #endif
105 : ISingleSignOnToken authenticationToken = null;
106 : Guid? guidAuthenticationToken = null;
107 : string stringAuthenticationToken = null;
108 : int? intAuthenticationToken = null;
109 :
110 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
111 : object value;
112 : if (eventData.Properties.TryGetValue("Type", out value))
113 : telemetryProperties.Add("MessageType", value.ToString());
114 : TelemetryHelper.TrackMetric("Cqrs/Handle/Command", CurrentHandles++, telemetryProperties);
115 : // Do a manual 10 try attempt with back-off
116 : for (int i = 0; i < 10; i++)
117 : {
118 : try
119 : {
120 : #if NET452
121 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
122 : #endif
123 : #if NETCOREAPP3_0
124 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset));
125 : #endif
126 : #if NET452
127 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
128 : #endif
129 : #if NETCOREAPP3_0
130 : string messageBody = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
131 : #endif
132 :
133 : ICommand<TAuthenticationToken> command = AzureBusHelper.ReceiveCommand(messageBody, ReceiveCommand,
134 : #if NET452
135 : string.Format("partition key '{0}', sequence number '{1}' and offset '{2}'", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset),
136 : #endif
137 : #if NETCOREAPP3_0
138 : string.Format("partition key '{0}', sequence number '{1}' and offset '{2}'", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset),
139 : #endif
140 : ExtractSignature(eventData),
141 : SigningTokenConfigurationKey,
142 : () =>
143 : {
144 : wasSuccessfull = null;
145 : #if NET452
146 : telemetryName = string.Format("Cqrs/Handle/Command/Skipped/{0}", eventData.SequenceNumber);
147 : #endif
148 : #if NETCOREAPP3_0
149 : telemetryName = string.Format("Cqrs/Handle/Command/Skipped/{0}", eventData.SystemProperties.SequenceNumber);
150 : #endif
151 : responseCode = "204";
152 : // Remove message from queue
153 : context.CheckpointAsync(eventData);
154 : #if NET452
155 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but processing was skipped due to command settings.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
156 : #endif
157 : #if NETCOREAPP3_0
158 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but processing was skipped due to command settings.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset));
159 : #endif
160 : TelemetryHelper.TrackEvent("Cqrs/Handle/Command/Skipped", telemetryProperties);
161 : }
162 : );
163 :
164 : if (wasSuccessfull != null)
165 : {
166 : if (command != null)
167 : {
168 : telemetryName = string.Format("{0}/{1}", command.GetType().FullName, command.Id);
169 : authenticationToken = command.AuthenticationToken as ISingleSignOnToken;
170 : if (AuthenticationTokenIsGuid)
171 : guidAuthenticationToken = command.AuthenticationToken as Guid?;
172 : if (AuthenticationTokenIsString)
173 : stringAuthenticationToken = command.AuthenticationToken as string;
174 : if (AuthenticationTokenIsInt)
175 : intAuthenticationToken = command.AuthenticationToken as int?;
176 :
177 : var telemeteredMessage = command as ITelemeteredMessage;
178 : if (telemeteredMessage != null)
179 : telemetryName = telemeteredMessage.TelemetryName;
180 :
181 : telemetryName = string.Format("Cqrs/Handle/Command/{0}", telemetryName);
182 : }
183 : // Remove message from queue
184 : context.CheckpointAsync(eventData);
185 : }
186 : #if NET452
187 : Logger.LogDebug(string.Format("A command message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
188 : #endif
189 : #if NETCOREAPP3_0
190 : Logger.LogDebug(string.Format("A command message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset));
191 : #endif
192 :
193 : wasSuccessfull = true;
194 : responseCode = "200";
195 : return;
196 : }
197 : catch (UnAuthorisedMessageReceivedException exception)
198 : {
199 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
200 : // Indicates a problem, unlock message in queue
201 : #if NET452
202 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but was not authorised.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
203 : #endif
204 : #if NETCOREAPP3_0
205 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but was not authorised.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset), exception: exception);
206 : #endif
207 : wasSuccessfull = false;
208 : responseCode = "401";
209 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
210 : telemetryProperties.Add("ExceptionMessage", exception.Message);
211 : }
212 : catch (NoHandlersRegisteredException exception)
213 : {
214 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
215 : // Indicates a problem, unlock message in queue
216 : #if NET452
217 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but no handlers were found to process it.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
218 : #endif
219 : #if NETCOREAPP3_0
220 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but no handlers were found to process it.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset), exception: exception);
221 : #endif
222 : wasSuccessfull = false;
223 : responseCode = "501";
224 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
225 : telemetryProperties.Add("ExceptionMessage", exception.Message);
226 : }
227 : catch (NoHandlerRegisteredException exception)
228 : {
229 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
230 : // Indicates a problem, unlock message in queue
231 : #if NET452
232 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but no handler was found to process it.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
233 : #endif
234 : #if NETCOREAPP3_0
235 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but no handler was found to process it.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset), exception: exception);
236 : #endif
237 : wasSuccessfull = false;
238 : responseCode = "501";
239 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
240 : telemetryProperties.Add("ExceptionMessage", exception.Message);
241 : }
242 : catch (Exception exception)
243 : {
244 : // Indicates a problem, unlock message in queue
245 : #if NET452
246 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
247 : #endif
248 : #if NETCOREAPP3_0
249 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset), exception: exception);
250 : #endif
251 :
252 : switch (i)
253 : {
254 : case 0:
255 : case 1:
256 : // 10 seconds
257 : Thread.Sleep(10 * 1000);
258 : break;
259 : case 2:
260 : case 3:
261 : // 30 seconds
262 : Thread.Sleep(30 * 1000);
263 : break;
264 : case 4:
265 : case 5:
266 : case 6:
267 : // 1 minute
268 : Thread.Sleep(60 * 1000);
269 : break;
270 : case 7:
271 : case 8:
272 : // 3 minutes
273 : Thread.Sleep(3 * 60 * 1000);
274 : break;
275 : case 9:
276 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
277 : telemetryProperties.Add("ExceptionMessage", exception.Message);
278 : break;
279 : }
280 : wasSuccessfull = false;
281 : responseCode = "500";
282 : }
283 : finally
284 : {
285 : // Eventually just accept it
286 : context.CheckpointAsync(eventData);
287 :
288 : TelemetryHelper.TrackMetric("Cqrs/Handle/Command", CurrentHandles--, telemetryProperties);
289 :
290 : mainStopWatch.Stop();
291 : if (guidAuthenticationToken != null)
292 : TelemetryHelper.TrackRequest
293 : (
294 : telemetryName,
295 : guidAuthenticationToken,
296 : startedAt,
297 : mainStopWatch.Elapsed,
298 : responseCode,
299 : wasSuccessfull == null || wasSuccessfull.Value,
300 : telemetryProperties
301 : );
302 : else if (intAuthenticationToken != null)
303 : TelemetryHelper.TrackRequest
304 : (
305 : telemetryName,
306 : intAuthenticationToken,
307 : startedAt,
308 : mainStopWatch.Elapsed,
309 : responseCode,
310 : wasSuccessfull == null || wasSuccessfull.Value,
311 : telemetryProperties
312 : );
313 : else if (stringAuthenticationToken != null)
314 : TelemetryHelper.TrackRequest
315 : (
316 : telemetryName,
317 : stringAuthenticationToken,
318 : startedAt,
319 : mainStopWatch.Elapsed,
320 : responseCode,
321 : wasSuccessfull == null || wasSuccessfull.Value,
322 : telemetryProperties
323 : );
324 : else
325 : TelemetryHelper.TrackRequest
326 : (
327 : telemetryName,
328 : authenticationToken,
329 : startedAt,
330 : mainStopWatch.Elapsed,
331 : responseCode,
332 : wasSuccessfull == null || wasSuccessfull.Value,
333 : telemetryProperties
334 : );
335 :
336 : TelemetryHelper.Flush();
337 : }
338 : }
339 : }
340 :
341 : /// <summary>
342 : /// Receives a <see cref="ICommand{TAuthenticationToken}"/> from the command bus.
343 : /// </summary>
344 : public virtual bool? ReceiveCommand(ICommand<TAuthenticationToken> command)
345 : {
346 : return AzureBusHelper.DefaultReceiveCommand(command, Routes, "Azure-EventHub");
347 : }
348 :
349 : #region Implementation of ICommandReceiver
350 :
351 : /// <summary>
352 : /// Starts listening and processing instances of <see cref="ICommand{TAuthenticationToken}"/> from the command bus.
353 : /// </summary>
354 : public void Start()
355 : {
356 : InstantiateReceiving();
357 :
358 : // Callback to handle received messages
359 : RegisterReceiverMessageHandler(ReceiveCommand);
360 : }
361 :
362 : #endregion
363 : }
364 : }
|