Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Text;
13 : using System.Threading;
14 : using Cqrs.Authentication;
15 : using Cqrs.Bus;
16 : using Cqrs.Commands;
17 : using Cqrs.Configuration;
18 : using cdmdotnet.Logging;
19 : using Cqrs.Messages;
20 : using Microsoft.ServiceBus.Messaging;
21 :
22 : namespace Cqrs.Azure.ServiceBus
23 : {
24 : /// <summary>
25 : /// A <see cref="ICommandReceiver{TAuthenticationToken}"/> that receives network messages, resolves handlers and executes the handler.
26 : /// </summary>
27 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
28 : public class AzureCommandBusReceiver<TAuthenticationToken>
29 : : AzureCommandBus<TAuthenticationToken>
30 : , ICommandHandlerRegistrar
31 : , ICommandReceiver<TAuthenticationToken>
32 : {
33 : // ReSharper disable StaticMemberInGenericType
34 : /// <summary>
35 : /// Gets the <see cref="RouteManager"/>.
36 : /// </summary>
37 : public static RouteManager Routes { get; private set; }
38 :
39 : /// <summary>
40 : /// The number of handles currently being executed.
41 : /// </summary>
42 : protected static long CurrentHandles { get; set; }
43 : // ReSharper restore StaticMemberInGenericType
44 :
45 : static AzureCommandBusReceiver()
46 : {
47 : Routes = new RouteManager();
48 : }
49 :
50 : /// <summary>
51 : /// Instantiates a new instance of <see cref="AzureCommandBusReceiver{TAuthenticationToken}"/>.
52 : /// </summary>
53 2 : public AzureCommandBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
54 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, false)
55 : {
56 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventHub.CommandBus.Receiver.UseApplicationInsightTelemetryHelper", correlationIdHelper);
57 : }
58 :
59 : /// <summary>
60 : /// Register a command handler that will listen and respond to commands.
61 : /// </summary>
62 : /// <remarks>
63 : /// In many cases the <paramref name="targetedType"/> will be the handler class itself, what you actually want is the target of what is being updated.
64 : /// </remarks>
65 2 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
66 : where TMessage : IMessage
67 : {
68 : AzureBusHelper.RegisterHandler(TelemetryHelper, Routes, handler, targetedType, holdMessageLock);
69 : }
70 :
71 : /// <summary>
72 : /// Register a command handler that will listen and respond to commands.
73 : /// </summary>
74 2 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true)
75 : where TMessage : IMessage
76 : {
77 : RegisterHandler(handler, null, holdMessageLock);
78 : }
79 :
80 : /// <summary>
81 : /// Receives <see cref="EventData"/> from the command bus.
82 : /// </summary>
83 2 : protected virtual void ReceiveCommand(PartitionContext context, EventData eventData)
84 : {
85 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
86 : Stopwatch mainStopWatch = Stopwatch.StartNew();
87 : string responseCode = "200";
88 : // Null means it was skipped
89 : bool? wasSuccessfull = true;
90 : string telemetryName = string.Format("Cqrs/Handle/Command/{0}", eventData.SequenceNumber);
91 : ISingleSignOnToken authenticationToken = null;
92 :
93 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
94 : object value;
95 : if (eventData.Properties.TryGetValue("Type", out value))
96 : telemetryProperties.Add("MessageType", value.ToString());
97 : TelemetryHelper.TrackMetric("Cqrs/Handle/Command", CurrentHandles++, telemetryProperties);
98 : // Do a manual 10 try attempt with back-off
99 : for (int i = 0; i < 10; i++)
100 : {
101 : try
102 : {
103 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
104 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
105 :
106 : ICommand<TAuthenticationToken> command = AzureBusHelper.ReceiveCommand(messageBody, ReceiveCommand,
107 : string.Format("partition key '{0}', sequence number '{1}' and offset '{2}'", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset),
108 : () =>
109 : {
110 : wasSuccessfull = null;
111 : telemetryName = string.Format("Cqrs/Handle/Command/Skipped/{0}", eventData.SequenceNumber);
112 : responseCode = "204";
113 : // Remove message from queue
114 : context.CheckpointAsync(eventData);
115 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but processing was skipped due to command settings.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
116 : TelemetryHelper.TrackEvent("Cqrs/Handle/Command/Skipped", telemetryProperties);
117 : }
118 : );
119 :
120 : if (wasSuccessfull != null)
121 : {
122 : if (command != null)
123 : {
124 : telemetryName = string.Format("{0}/{1}", command.GetType().FullName, command.Id);
125 : authenticationToken = command.AuthenticationToken as ISingleSignOnToken;
126 :
127 : var telemeteredMessage = command as ITelemeteredMessage;
128 : if (telemeteredMessage != null)
129 : telemetryName = telemeteredMessage.TelemetryName;
130 :
131 : telemetryName = string.Format("Cqrs/Handle/Command/{0}", telemetryName);
132 : }
133 : // Remove message from queue
134 : context.CheckpointAsync(eventData);
135 : }
136 : Logger.LogDebug(string.Format("A command message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
137 :
138 : wasSuccessfull = true;
139 : responseCode = "200";
140 : return;
141 : }
142 : catch (Exception exception)
143 : {
144 : // Indicates a problem, unlock message in queue
145 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
146 :
147 : switch (i)
148 : {
149 : case 0:
150 : case 1:
151 : // 10 seconds
152 : Thread.Sleep(10 * 1000);
153 : break;
154 : case 2:
155 : case 3:
156 : // 30 seconds
157 : Thread.Sleep(30 * 1000);
158 : break;
159 : case 4:
160 : case 5:
161 : case 6:
162 : // 1 minute
163 : Thread.Sleep(60 * 1000);
164 : break;
165 : case 7:
166 : case 8:
167 : // 3 minutes
168 : Thread.Sleep(3 * 60 * 1000);
169 : break;
170 : case 9:
171 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
172 : telemetryProperties.Add("ExceptionMessage", exception.Message);
173 : break;
174 : }
175 : wasSuccessfull = false;
176 : responseCode = "500";
177 : }
178 : finally
179 : {
180 : // Eventually just accept it
181 : context.CheckpointAsync(eventData);
182 :
183 : TelemetryHelper.TrackMetric("Cqrs/Handle/Command", CurrentHandles--, telemetryProperties);
184 :
185 : mainStopWatch.Stop();
186 : TelemetryHelper.TrackRequest
187 : (
188 : telemetryName,
189 : authenticationToken,
190 : startedAt,
191 : mainStopWatch.Elapsed,
192 : responseCode,
193 : wasSuccessfull == null || wasSuccessfull.Value,
194 : telemetryProperties
195 : );
196 :
197 : TelemetryHelper.Flush();
198 : }
199 : }
200 : }
201 :
202 : /// <summary>
203 : /// Receives a <see cref="ICommand{TAuthenticationToken}"/> from the command bus.
204 : /// </summary>
205 2 : public virtual bool? ReceiveCommand(ICommand<TAuthenticationToken> command)
206 : {
207 : return AzureBusHelper.DefaultReceiveCommand(command, Routes, "Azure-EventHub");
208 : }
209 :
210 : #region Implementation of ICommandReceiver
211 :
212 : /// <summary>
213 : /// Starts listening and processing instances of <see cref="ICommand{TAuthenticationToken}"/> from the command bus.
214 : /// </summary>
215 2 : public void Start()
216 : {
217 : InstantiateReceiving();
218 :
219 : // Callback to handle received messages
220 : RegisterReceiverMessageHandler(ReceiveCommand);
221 : }
222 :
223 : #endregion
224 : }
225 : }
|