Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Text;
13 : using System.Threading;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Bus;
17 : using Cqrs.Configuration;
18 : using Cqrs.Events;
19 : using Cqrs.Messages;
20 : using Microsoft.ServiceBus.Messaging;
21 : using EventData = Microsoft.ServiceBus.Messaging.EventData;
22 :
23 : namespace Cqrs.Azure.ServiceBus
24 : {
25 : public class AzureEventBusReceiver<TAuthenticationToken>
26 : : AzureEventHubBus<TAuthenticationToken>
27 : , IEventHandlerRegistrar
28 : , IEventReceiver<TAuthenticationToken>
29 : {
30 : protected virtual string NumberOfReceiversCountConfigurationKey
31 : {
32 : get { return "Cqrs.Azure.EventHub.EventBus.NumberOfReceiversCount"; }
33 : }
34 :
35 : // ReSharper disable StaticMemberInGenericType
36 : protected static RouteManager Routes { get; private set; }
37 :
38 : protected static long CurrentHandles { get; set; }
39 : // ReSharper restore StaticMemberInGenericType
40 :
41 : static AzureEventBusReceiver()
42 : {
43 : Routes = new RouteManager();
44 : }
45 :
46 0 : public AzureEventBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
47 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, false)
48 : {
49 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventHub.EventBus.Receiver.UseApplicationInsightTelemetryHelper", correlationIdHelper);
50 : }
51 :
52 0 : public void Start()
53 : {
54 : InstantiateReceiving();
55 :
56 : // Callback to handle received messages
57 : RegisterReceiverMessageHandler(ReceiveEvent);
58 : }
59 :
60 : /// <summary>
61 : /// Register an event or command handler that will listen and respond to events or commands.
62 : /// </summary>
63 2 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
64 : where TMessage : IMessage
65 : {
66 : AzureBusHelper.RegisterHandler(TelemetryHelper, Routes, handler, targetedType, holdMessageLock);
67 : }
68 :
69 : /// <summary>
70 : /// Register an event or command handler that will listen and respond to events or commands.
71 : /// </summary>
72 2 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = false)
73 : where TMessage : IMessage
74 : {
75 : RegisterHandler(handler, null, holdMessageLock);
76 : }
77 :
78 0 : protected virtual void ReceiveEvent(PartitionContext context, EventData eventData)
79 : {
80 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
81 : Stopwatch mainStopWatch = Stopwatch.StartNew();
82 : string responseCode = "200";
83 : // Null means it was skipped
84 : bool? wasSuccessfull = true;
85 : string telemetryName = string.Format("Cqrs/Handle/Event/{0}", eventData.SequenceNumber);
86 : ISingleSignOnToken authenticationToken = null;
87 :
88 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
89 : object value;
90 : if (eventData.Properties.TryGetValue("Type", out value))
91 : telemetryProperties.Add("MessageType", value.ToString());
92 : TelemetryHelper.TrackMetric("Cqrs/Handle/Event", CurrentHandles++, telemetryProperties);
93 : // Do a manual 10 try attempt with back-off
94 : for (int i = 0; i < 10; i++)
95 : {
96 : try
97 : {
98 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
99 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
100 :
101 : IEvent<TAuthenticationToken> @event = AzureBusHelper.ReceiveEvent(messageBody, ReceiveEvent,
102 : string.Format("partition key '{0}', sequence number '{1}' and offset '{2}'", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset),
103 : () =>
104 : {
105 : wasSuccessfull = null;
106 : telemetryName = string.Format("Cqrs/Handle/Event/Skipped/{0}", eventData.SequenceNumber);
107 : responseCode = "204";
108 : // Remove message from queue
109 : context.CheckpointAsync(eventData);
110 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but processing was skipped due to event settings.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
111 : TelemetryHelper.TrackEvent("Cqrs/Handle/Event/Skipped", telemetryProperties);
112 : }
113 : );
114 :
115 : if (wasSuccessfull != null)
116 : {
117 : if (@event != null)
118 : {
119 : telemetryName = string.Format("{0}/{1}", @event.GetType().FullName, @event.Id);
120 : authenticationToken = @event.AuthenticationToken as ISingleSignOnToken;
121 :
122 : var telemeteredMessage = @event as ITelemeteredMessage;
123 : if (telemeteredMessage != null)
124 : telemetryName = telemeteredMessage.TelemetryName;
125 :
126 : telemetryName = string.Format("Cqrs/Handle/Event/{0}", telemetryName);
127 : }
128 : // Remove message from queue
129 : context.CheckpointAsync(eventData);
130 : }
131 : Logger.LogDebug(string.Format("An event message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
132 :
133 : IList<IEvent<TAuthenticationToken>> events;
134 : if (EventWaits.TryGetValue(@event.CorrelationId, out events))
135 : events.Add(@event);
136 :
137 : wasSuccessfull = true;
138 : responseCode = "200";
139 : return;
140 : }
141 : catch (Exception exception)
142 : {
143 : // Indicates a problem, unlock message in queue
144 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
145 :
146 : switch (i)
147 : {
148 : case 0:
149 : case 1:
150 : // 10 seconds
151 : Thread.Sleep(10 * 1000);
152 : break;
153 : case 2:
154 : case 3:
155 : // 30 seconds
156 : Thread.Sleep(30 * 1000);
157 : break;
158 : case 4:
159 : case 5:
160 : case 6:
161 : // 1 minute
162 : Thread.Sleep(60 * 1000);
163 : break;
164 : case 7:
165 : case 8:
166 : // 3 minutes
167 : Thread.Sleep(3 * 60 * 1000);
168 : break;
169 : case 9:
170 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
171 : telemetryProperties.Add("ExceptionMessage", exception.Message);
172 : break;
173 : }
174 : wasSuccessfull = false;
175 : responseCode = "500";
176 : }
177 : finally
178 : {
179 : // Eventually just accept it
180 : context.CheckpointAsync(eventData);
181 :
182 : TelemetryHelper.TrackMetric("Cqrs/Handle/Event", CurrentHandles--, telemetryProperties);
183 :
184 : mainStopWatch.Stop();
185 : TelemetryHelper.TrackRequest
186 : (
187 : telemetryName,
188 : authenticationToken,
189 : startedAt,
190 : mainStopWatch.Elapsed,
191 : responseCode,
192 : wasSuccessfull == null || wasSuccessfull.Value,
193 : telemetryProperties
194 : );
195 :
196 : TelemetryHelper.Flush();
197 : }
198 : }
199 : }
200 :
201 0 : public virtual bool? ReceiveEvent(IEvent<TAuthenticationToken> @event)
202 : {
203 : return AzureBusHelper.DefaultReceiveEvent(@event, Routes, "Azure-EventHub");
204 : }
205 :
206 : #region Overrides of AzureServiceBus<TAuthenticationToken>
207 :
208 0 : protected override int GetCurrentNumberOfReceiversCount()
209 : {
210 : string numberOfReceiversCountValue;
211 : int numberOfReceiversCount;
212 : if (ConfigurationManager.TryGetSetting(NumberOfReceiversCountConfigurationKey, out numberOfReceiversCountValue) && !string.IsNullOrWhiteSpace(numberOfReceiversCountValue))
213 : {
214 : if (!int.TryParse(numberOfReceiversCountValue, out numberOfReceiversCount))
215 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
216 : }
217 : else
218 : numberOfReceiversCount = DefaultNumberOfReceiversCount;
219 : return numberOfReceiversCount;
220 : }
221 :
222 : #endregion
223 : }
224 : }
|