Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Text;
13 : using System.Threading;
14 : using Cqrs.Authentication;
15 : using Cqrs.Bus;
16 : using Cqrs.Commands;
17 : using Cqrs.Configuration;
18 : using cdmdotnet.Logging;
19 : using Cqrs.Messages;
20 : using Microsoft.ServiceBus.Messaging;
21 : using Cqrs.Exceptions;
22 :
23 : namespace Cqrs.Azure.ServiceBus
24 : {
25 : /// <summary>
26 : /// A <see cref="ICommandReceiver{TAuthenticationToken}"/> that receives network messages, resolves handlers and executes the handler.
27 : /// </summary>
28 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
29 : public class AzureCommandBusReceiver<TAuthenticationToken>
30 : : AzureCommandBus<TAuthenticationToken>
31 : , ICommandHandlerRegistrar
32 : , ICommandReceiver<TAuthenticationToken>
33 2 : {
34 : // ReSharper disable StaticMemberInGenericType
35 : /// <summary>
36 : /// Gets the <see cref="RouteManager"/>.
37 : /// </summary>
38 : public static RouteManager Routes { get; private set; }
39 :
40 : /// <summary>
41 : /// The number of handles currently being executed.
42 : /// </summary>
43 : protected static long CurrentHandles { get; set; }
44 : // ReSharper restore StaticMemberInGenericType
45 :
46 : static AzureCommandBusReceiver()
47 : {
48 : Routes = new RouteManager();
49 : }
50 :
51 : /// <summary>
52 : /// Instantiates a new instance of <see cref="AzureCommandBusReceiver{TAuthenticationToken}"/>.
53 : /// </summary>
54 : public AzureCommandBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IHashAlgorithmFactory hashAlgorithmFactory, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
55 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, hashAlgorithmFactory, azureBusHelper, false)
56 : {
57 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventHub.CommandBus.Receiver.UseApplicationInsightTelemetryHelper", correlationIdHelper);
58 : }
59 :
60 : /// <summary>
61 : /// Register a command handler that will listen and respond to commands.
62 : /// </summary>
63 : /// <remarks>
64 : /// In many cases the <paramref name="targetedType"/> will be the handler class itself, what you actually want is the target of what is being updated.
65 : /// </remarks>
66 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
67 : where TMessage : IMessage
68 : {
69 : AzureBusHelper.RegisterHandler(TelemetryHelper, Routes, handler, targetedType, holdMessageLock);
70 : }
71 :
72 : /// <summary>
73 : /// Register a command handler that will listen and respond to commands.
74 : /// </summary>
75 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true)
76 : where TMessage : IMessage
77 : {
78 : RegisterHandler(handler, null, holdMessageLock);
79 : }
80 :
81 : /// <summary>
82 : /// Receives <see cref="EventData"/> from the command bus.
83 : /// </summary>
84 : protected virtual void ReceiveCommand(PartitionContext context, EventData eventData)
85 : {
86 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
87 : Stopwatch mainStopWatch = Stopwatch.StartNew();
88 : string responseCode = "200";
89 : // Null means it was skipped
90 : bool? wasSuccessfull = true;
91 : string telemetryName = string.Format("Cqrs/Handle/Command/{0}", eventData.SequenceNumber);
92 : ISingleSignOnToken authenticationToken = null;
93 : Guid? guidAuthenticationToken = null;
94 : string stringAuthenticationToken = null;
95 : int? intAuthenticationToken = null;
96 :
97 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
98 : object value;
99 : if (eventData.Properties.TryGetValue("Type", out value))
100 : telemetryProperties.Add("MessageType", value.ToString());
101 : TelemetryHelper.TrackMetric("Cqrs/Handle/Command", CurrentHandles++, telemetryProperties);
102 : // Do a manual 10 try attempt with back-off
103 : for (int i = 0; i < 10; i++)
104 : {
105 : try
106 : {
107 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
108 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
109 :
110 : ICommand<TAuthenticationToken> command = AzureBusHelper.ReceiveCommand(messageBody, ReceiveCommand,
111 : string.Format("partition key '{0}', sequence number '{1}' and offset '{2}'", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset),
112 : ExtractSignature(eventData),
113 : SigningTokenConfigurationKey,
114 : () =>
115 : {
116 : wasSuccessfull = null;
117 : telemetryName = string.Format("Cqrs/Handle/Command/Skipped/{0}", eventData.SequenceNumber);
118 : responseCode = "204";
119 : // Remove message from queue
120 : context.CheckpointAsync(eventData);
121 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but processing was skipped due to command settings.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
122 : TelemetryHelper.TrackEvent("Cqrs/Handle/Command/Skipped", telemetryProperties);
123 : }
124 : );
125 :
126 : if (wasSuccessfull != null)
127 : {
128 : if (command != null)
129 : {
130 : telemetryName = string.Format("{0}/{1}", command.GetType().FullName, command.Id);
131 : authenticationToken = command.AuthenticationToken as ISingleSignOnToken;
132 : if (AuthenticationTokenIsGuid)
133 : guidAuthenticationToken = command.AuthenticationToken as Guid?;
134 : if (AuthenticationTokenIsString)
135 : stringAuthenticationToken = command.AuthenticationToken as string;
136 : if (AuthenticationTokenIsInt)
137 : intAuthenticationToken = command.AuthenticationToken as int?;
138 :
139 : var telemeteredMessage = command as ITelemeteredMessage;
140 : if (telemeteredMessage != null)
141 : telemetryName = telemeteredMessage.TelemetryName;
142 :
143 : telemetryName = string.Format("Cqrs/Handle/Command/{0}", telemetryName);
144 : }
145 : // Remove message from queue
146 : context.CheckpointAsync(eventData);
147 : }
148 : Logger.LogDebug(string.Format("A command message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
149 :
150 : wasSuccessfull = true;
151 : responseCode = "200";
152 : return;
153 : }
154 : catch (UnAuthorisedMessageReceivedException exception)
155 : {
156 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
157 : // Indicates a problem, unlock message in queue
158 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but was not authorised.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
159 : wasSuccessfull = false;
160 : responseCode = "401";
161 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
162 : telemetryProperties.Add("ExceptionMessage", exception.Message);
163 : }
164 : catch (NoHandlersRegisteredException exception)
165 : {
166 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
167 : // Indicates a problem, unlock message in queue
168 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but no handlers were found to process it.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
169 : wasSuccessfull = false;
170 : responseCode = "501";
171 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
172 : telemetryProperties.Add("ExceptionMessage", exception.Message);
173 : }
174 : catch (NoHandlerRegisteredException exception)
175 : {
176 : TelemetryHelper.TrackException(exception, null, telemetryProperties);
177 : // Indicates a problem, unlock message in queue
178 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but no handler was found to process it.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
179 : wasSuccessfull = false;
180 : responseCode = "501";
181 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
182 : telemetryProperties.Add("ExceptionMessage", exception.Message);
183 : }
184 : catch (Exception exception)
185 : {
186 : // Indicates a problem, unlock message in queue
187 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
188 :
189 : switch (i)
190 : {
191 : case 0:
192 : case 1:
193 : // 10 seconds
194 : Thread.Sleep(10 * 1000);
195 : break;
196 : case 2:
197 : case 3:
198 : // 30 seconds
199 : Thread.Sleep(30 * 1000);
200 : break;
201 : case 4:
202 : case 5:
203 : case 6:
204 : // 1 minute
205 : Thread.Sleep(60 * 1000);
206 : break;
207 : case 7:
208 : case 8:
209 : // 3 minutes
210 : Thread.Sleep(3 * 60 * 1000);
211 : break;
212 : case 9:
213 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
214 : telemetryProperties.Add("ExceptionMessage", exception.Message);
215 : break;
216 : }
217 : wasSuccessfull = false;
218 : responseCode = "500";
219 : }
220 : finally
221 : {
222 : // Eventually just accept it
223 : context.CheckpointAsync(eventData);
224 :
225 : TelemetryHelper.TrackMetric("Cqrs/Handle/Command", CurrentHandles--, telemetryProperties);
226 :
227 : mainStopWatch.Stop();
228 : if (guidAuthenticationToken != null)
229 : TelemetryHelper.TrackRequest
230 : (
231 : telemetryName,
232 : guidAuthenticationToken,
233 : startedAt,
234 : mainStopWatch.Elapsed,
235 : responseCode,
236 : wasSuccessfull == null || wasSuccessfull.Value,
237 : telemetryProperties
238 : );
239 : else if (intAuthenticationToken != null)
240 : TelemetryHelper.TrackRequest
241 : (
242 : telemetryName,
243 : intAuthenticationToken,
244 : startedAt,
245 : mainStopWatch.Elapsed,
246 : responseCode,
247 : wasSuccessfull == null || wasSuccessfull.Value,
248 : telemetryProperties
249 : );
250 : else if (stringAuthenticationToken != null)
251 : TelemetryHelper.TrackRequest
252 : (
253 : telemetryName,
254 : stringAuthenticationToken,
255 : startedAt,
256 : mainStopWatch.Elapsed,
257 : responseCode,
258 : wasSuccessfull == null || wasSuccessfull.Value,
259 : telemetryProperties
260 : );
261 : else
262 : TelemetryHelper.TrackRequest
263 : (
264 : telemetryName,
265 : authenticationToken,
266 : startedAt,
267 : mainStopWatch.Elapsed,
268 : responseCode,
269 : wasSuccessfull == null || wasSuccessfull.Value,
270 : telemetryProperties
271 : );
272 :
273 : TelemetryHelper.Flush();
274 : }
275 : }
276 : }
277 :
278 : /// <summary>
279 : /// Receives a <see cref="ICommand{TAuthenticationToken}"/> from the command bus.
280 : /// </summary>
281 : public virtual bool? ReceiveCommand(ICommand<TAuthenticationToken> command)
282 : {
283 : return AzureBusHelper.DefaultReceiveCommand(command, Routes, "Azure-EventHub");
284 : }
285 :
286 : #region Implementation of ICommandReceiver
287 :
288 : /// <summary>
289 : /// Starts listening and processing instances of <see cref="ICommand{TAuthenticationToken}"/> from the command bus.
290 : /// </summary>
291 : public void Start()
292 : {
293 : InstantiateReceiving();
294 :
295 : // Callback to handle received messages
296 : RegisterReceiverMessageHandler(ReceiveCommand);
297 : }
298 :
299 : #endregion
300 : }
301 : }
|