Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Text;
13 : using System.Threading;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Bus;
17 : using Cqrs.Configuration;
18 : using Cqrs.Events;
19 : using Cqrs.Messages;
20 : using Microsoft.ServiceBus.Messaging;
21 : using EventData = Microsoft.ServiceBus.Messaging.EventData;
22 :
23 : namespace Cqrs.Azure.ServiceBus
24 : {
25 : /// <summary>
26 : /// A <see cref="IEventReceiver{TAuthenticationToken}"/> that receives network messages, resolves handlers and executes the handler.
27 : /// </summary>
28 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
29 : public class AzureEventBusReceiver<TAuthenticationToken>
30 : : AzureEventHubBus<TAuthenticationToken>
31 : , IEventHandlerRegistrar
32 : , IEventReceiver<TAuthenticationToken>
33 : {
34 : /// <summary>
35 : /// The configuration key for
36 : /// the number of receiver <see cref="SubscriptionClient"/> instances to create
37 : /// as used by <see cref="IConfigurationManager"/>.
38 : /// </summary>
39 : protected virtual string NumberOfReceiversCountConfigurationKey
40 : {
41 : get { return "Cqrs.Azure.EventHub.EventBus.NumberOfReceiversCount"; }
42 : }
43 :
44 : // ReSharper disable StaticMemberInGenericType
45 : /// <summary>
46 : /// Gets the <see cref="RouteManager"/>.
47 : /// </summary>
48 : public static RouteManager Routes { get; private set; }
49 :
50 : /// <summary>
51 : /// The number of handles currently being executed.
52 : /// </summary>
53 : protected static long CurrentHandles { get; set; }
54 : // ReSharper restore StaticMemberInGenericType
55 :
56 : static AzureEventBusReceiver()
57 : {
58 : Routes = new RouteManager();
59 : }
60 :
61 : /// <summary>
62 : /// Instantiates a new instance of <see cref="AzureEventBusReceiver{TAuthenticationToken}"/>.
63 : /// </summary>
64 2 : public AzureEventBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
65 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, false)
66 : {
67 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventHub.EventBus.Receiver.UseApplicationInsightTelemetryHelper", correlationIdHelper);
68 : }
69 :
70 : /// <summary>
71 : /// Register an event handler that will listen and respond to events.
72 : /// </summary>
73 : /// <remarks>
74 : /// In many cases the <paramref name="targetedType"/> will be the handler class itself, what you actually want is the target of what is being updated.
75 : /// </remarks>
76 2 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
77 : where TMessage : IMessage
78 : {
79 : AzureBusHelper.RegisterHandler(TelemetryHelper, Routes, handler, targetedType, holdMessageLock);
80 : }
81 :
82 : /// <summary>
83 : /// Register an event handler that will listen and respond to events.
84 : /// </summary>
85 2 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = false)
86 : where TMessage : IMessage
87 : {
88 : RegisterHandler(handler, null, holdMessageLock);
89 : }
90 :
91 : /// <summary>
92 : /// Register an event handler that will listen and respond to all events.
93 : /// </summary>
94 2 : public void RegisterGlobalEventHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true)
95 : where TMessage : IMessage
96 : {
97 : Routes.RegisterGlobalEventHandler(handler, holdMessageLock);
98 : }
99 :
100 : /// <summary>
101 : /// Receives a <see cref="BrokeredMessage"/> from the event bus.
102 : /// </summary>
103 2 : protected virtual void ReceiveEvent(PartitionContext context, EventData eventData)
104 : {
105 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
106 : Stopwatch mainStopWatch = Stopwatch.StartNew();
107 : string responseCode = "200";
108 : // Null means it was skipped
109 : bool? wasSuccessfull = true;
110 : string telemetryName = string.Format("Cqrs/Handle/Event/{0}", eventData.SequenceNumber);
111 : ISingleSignOnToken authenticationToken = null;
112 :
113 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
114 : object value;
115 : if (eventData.Properties.TryGetValue("Type", out value))
116 : telemetryProperties.Add("MessageType", value.ToString());
117 : TelemetryHelper.TrackMetric("Cqrs/Handle/Event", CurrentHandles++, telemetryProperties);
118 : // Do a manual 10 try attempt with back-off
119 : for (int i = 0; i < 10; i++)
120 : {
121 : try
122 : {
123 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
124 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
125 :
126 : IEvent<TAuthenticationToken> @event = AzureBusHelper.ReceiveEvent(messageBody, ReceiveEvent,
127 : string.Format("partition key '{0}', sequence number '{1}' and offset '{2}'", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset),
128 : () =>
129 : {
130 : wasSuccessfull = null;
131 : telemetryName = string.Format("Cqrs/Handle/Event/Skipped/{0}", eventData.SequenceNumber);
132 : responseCode = "204";
133 : // Remove message from queue
134 : context.CheckpointAsync(eventData);
135 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but processing was skipped due to event settings.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
136 : TelemetryHelper.TrackEvent("Cqrs/Handle/Event/Skipped", telemetryProperties);
137 : }
138 : );
139 :
140 : if (wasSuccessfull != null)
141 : {
142 : if (@event != null)
143 : {
144 : telemetryName = string.Format("{0}/{1}", @event.GetType().FullName, @event.Id);
145 : authenticationToken = @event.AuthenticationToken as ISingleSignOnToken;
146 :
147 : var telemeteredMessage = @event as ITelemeteredMessage;
148 : if (telemeteredMessage != null)
149 : telemetryName = telemeteredMessage.TelemetryName;
150 :
151 : telemetryName = string.Format("Cqrs/Handle/Event/{0}", telemetryName);
152 : }
153 : // Remove message from queue
154 : context.CheckpointAsync(eventData);
155 : }
156 : Logger.LogDebug(string.Format("An event message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
157 :
158 : IList<IEvent<TAuthenticationToken>> events;
159 : if (EventWaits.TryGetValue(@event.CorrelationId, out events))
160 : events.Add(@event);
161 :
162 : wasSuccessfull = true;
163 : responseCode = "200";
164 : return;
165 : }
166 : catch (Exception exception)
167 : {
168 : // Indicates a problem, unlock message in queue
169 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
170 :
171 : switch (i)
172 : {
173 : case 0:
174 : case 1:
175 : // 10 seconds
176 : Thread.Sleep(10 * 1000);
177 : break;
178 : case 2:
179 : case 3:
180 : // 30 seconds
181 : Thread.Sleep(30 * 1000);
182 : break;
183 : case 4:
184 : case 5:
185 : case 6:
186 : // 1 minute
187 : Thread.Sleep(60 * 1000);
188 : break;
189 : case 7:
190 : case 8:
191 : // 3 minutes
192 : Thread.Sleep(3 * 60 * 1000);
193 : break;
194 : case 9:
195 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
196 : telemetryProperties.Add("ExceptionMessage", exception.Message);
197 : break;
198 : }
199 : wasSuccessfull = false;
200 : responseCode = "500";
201 : }
202 : finally
203 : {
204 : // Eventually just accept it
205 : context.CheckpointAsync(eventData);
206 :
207 : TelemetryHelper.TrackMetric("Cqrs/Handle/Event", CurrentHandles--, telemetryProperties);
208 :
209 : mainStopWatch.Stop();
210 : TelemetryHelper.TrackRequest
211 : (
212 : telemetryName,
213 : authenticationToken,
214 : startedAt,
215 : mainStopWatch.Elapsed,
216 : responseCode,
217 : wasSuccessfull == null || wasSuccessfull.Value,
218 : telemetryProperties
219 : );
220 :
221 : TelemetryHelper.Flush();
222 : }
223 : }
224 : }
225 :
226 : /// <summary>
227 : /// Receives a <see cref="IEvent{TAuthenticationToken}"/> from the event bus.
228 : /// </summary>
229 2 : public virtual bool? ReceiveEvent(IEvent<TAuthenticationToken> @event)
230 : {
231 : return AzureBusHelper.DefaultReceiveEvent(@event, Routes, "Azure-EventHub");
232 : }
233 :
234 : #region Overrides of AzureServiceBus<TAuthenticationToken>
235 :
236 : /// <summary>
237 : /// Returns <see cref="NumberOfReceiversCountConfigurationKey"/> from <see cref="IConfigurationManager"/>
238 : /// if no value is set, returns <see cref="AzureBus{TAuthenticationToken}.DefaultNumberOfReceiversCount"/>.
239 : /// </summary>
240 2 : protected override int GetCurrentNumberOfReceiversCount()
241 : {
242 : string numberOfReceiversCountValue;
243 : int numberOfReceiversCount;
244 : if (ConfigurationManager.TryGetSetting(NumberOfReceiversCountConfigurationKey, out numberOfReceiversCountValue) && !string.IsNullOrWhiteSpace(numberOfReceiversCountValue))
245 : {
246 : if (!int.TryParse(numberOfReceiversCountValue, out numberOfReceiversCount))
247 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
248 : }
249 : else
250 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
251 : return numberOfReceiversCount;
252 : }
253 :
254 : #endregion
255 :
256 : #region Implementation of IEventReceiver
257 :
258 : /// <summary>
259 : /// Starts listening and processing instances of <see cref="IEvent{TAuthenticationToken}"/> from the event bus.
260 : /// </summary>
261 2 : public void Start()
262 : {
263 : InstantiateReceiving();
264 :
265 : // Callback to handle received messages
266 : RegisterReceiverMessageHandler(ReceiveEvent);
267 : }
268 :
269 : #endregion
270 : }
271 : }
|