Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Text;
13 : using System.Threading;
14 : using Cqrs.Authentication;
15 : using Cqrs.Bus;
16 : using Cqrs.Commands;
17 : using Cqrs.Configuration;
18 : using cdmdotnet.Logging;
19 : using Cqrs.Messages;
20 : using Microsoft.ServiceBus.Messaging;
21 :
22 : namespace Cqrs.Azure.ServiceBus
23 : {
24 : public class AzureCommandBusReceiver<TAuthenticationToken>
25 : : AzureCommandBus<TAuthenticationToken>
26 : , ICommandHandlerRegistrar
27 : , ICommandReceiver<TAuthenticationToken>
28 : {
29 : // ReSharper disable StaticMemberInGenericType
30 : private static RouteManager Routes { get; set; }
31 :
32 : protected static long CurrentHandles { get; set; }
33 : // ReSharper restore StaticMemberInGenericType
34 :
35 : static AzureCommandBusReceiver()
36 : {
37 : Routes = new RouteManager();
38 : }
39 :
40 0 : public AzureCommandBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
41 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, false)
42 : {
43 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventHub.CommandBus.Receiver.UseApplicationInsightTelemetryHelper", correlationIdHelper);
44 : }
45 :
46 2 : public virtual void RegisterHandler<TMessage>(Action<TMessage> handler, Type targetedType, bool holdMessageLock = true)
47 : where TMessage : IMessage
48 : {
49 : AzureBusHelper.RegisterHandler(TelemetryHelper, Routes, handler, targetedType, holdMessageLock);
50 : }
51 :
52 : /// <summary>
53 : /// Register an event or command handler that will listen and respond to events or commands.
54 : /// </summary>
55 2 : public void RegisterHandler<TMessage>(Action<TMessage> handler, bool holdMessageLock = true)
56 : where TMessage : IMessage
57 : {
58 : RegisterHandler(handler, null, holdMessageLock);
59 : }
60 :
61 0 : protected virtual void ReceiveCommand(PartitionContext context, EventData eventData)
62 : {
63 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
64 : Stopwatch mainStopWatch = Stopwatch.StartNew();
65 : string responseCode = "200";
66 : // Null means it was skipped
67 : bool? wasSuccessfull = true;
68 : string telemetryName = string.Format("Cqrs/Handle/Command/{0}", eventData.SequenceNumber);
69 : ISingleSignOnToken authenticationToken = null;
70 :
71 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
72 : object value;
73 : if (eventData.Properties.TryGetValue("Type", out value))
74 : telemetryProperties.Add("MessageType", value.ToString());
75 : TelemetryHelper.TrackMetric("Cqrs/Handle/Command", CurrentHandles++, telemetryProperties);
76 : // Do a manual 10 try attempt with back-off
77 : for (int i = 0; i < 10; i++)
78 : {
79 : try
80 : {
81 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
82 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
83 :
84 : ICommand<TAuthenticationToken> command = AzureBusHelper.ReceiveCommand(messageBody, ReceiveCommand,
85 : string.Format("partition key '{0}', sequence number '{1}' and offset '{2}'", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset),
86 : () =>
87 : {
88 : wasSuccessfull = null;
89 : telemetryName = string.Format("Cqrs/Handle/Command/Skipped/{0}", eventData.SequenceNumber);
90 : responseCode = "204";
91 : // Remove message from queue
92 : context.CheckpointAsync(eventData);
93 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but processing was skipped due to command settings.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
94 : TelemetryHelper.TrackEvent("Cqrs/Handle/Command/Skipped", telemetryProperties);
95 : }
96 : );
97 :
98 : if (wasSuccessfull != null)
99 : {
100 : if (command != null)
101 : {
102 : telemetryName = string.Format("{0}/{1}", command.GetType().FullName, command.Id);
103 : authenticationToken = command.AuthenticationToken as ISingleSignOnToken;
104 :
105 : var telemeteredMessage = command as ITelemeteredMessage;
106 : if (telemeteredMessage != null)
107 : telemetryName = telemeteredMessage.TelemetryName;
108 :
109 : telemetryName = string.Format("Cqrs/Handle/Command/{0}", telemetryName);
110 : }
111 : // Remove message from queue
112 : context.CheckpointAsync(eventData);
113 : }
114 : Logger.LogDebug(string.Format("A command message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
115 :
116 : wasSuccessfull = true;
117 : responseCode = "200";
118 : return;
119 : }
120 : catch (Exception exception)
121 : {
122 : // Indicates a problem, unlock message in queue
123 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
124 :
125 : switch (i)
126 : {
127 : case 0:
128 : case 1:
129 : // 10 seconds
130 : Thread.Sleep(10 * 1000);
131 : break;
132 : case 2:
133 : case 3:
134 : // 30 seconds
135 : Thread.Sleep(30 * 1000);
136 : break;
137 : case 4:
138 : case 5:
139 : case 6:
140 : // 1 minute
141 : Thread.Sleep(60 * 1000);
142 : break;
143 : case 7:
144 : case 8:
145 : // 3 minutes
146 : Thread.Sleep(3 * 60 * 1000);
147 : break;
148 : case 9:
149 : telemetryProperties.Add("ExceptionType", exception.GetType().FullName);
150 : telemetryProperties.Add("ExceptionMessage", exception.Message);
151 : break;
152 : }
153 : wasSuccessfull = false;
154 : responseCode = "500";
155 : }
156 : finally
157 : {
158 : // Eventually just accept it
159 : context.CheckpointAsync(eventData);
160 :
161 : TelemetryHelper.TrackMetric("Cqrs/Handle/Command", CurrentHandles--, telemetryProperties);
162 :
163 : mainStopWatch.Stop();
164 : TelemetryHelper.TrackRequest
165 : (
166 : telemetryName,
167 : authenticationToken,
168 : startedAt,
169 : mainStopWatch.Elapsed,
170 : responseCode,
171 : wasSuccessfull == null || wasSuccessfull.Value,
172 : telemetryProperties
173 : );
174 :
175 : TelemetryHelper.Flush();
176 : }
177 : }
178 : }
179 :
180 0 : public virtual bool? ReceiveCommand(ICommand<TAuthenticationToken> command)
181 : {
182 : return AzureBusHelper.DefaultReceiveCommand(command, Routes, "Azure-EventHub");
183 : }
184 :
185 : #region Implementation of ICommandReceiver
186 :
187 0 : public void Start()
188 : {
189 : InstantiateReceiving();
190 :
191 : // Callback to handle received messages
192 : RegisterReceiverMessageHandler(ReceiveCommand);
193 : }
194 :
195 : #endregion
196 : }
197 : }
|