Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Text;
13 : using System.Threading;
14 : using Chinchilla.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Bus;
17 : using Cqrs.Configuration;
18 : using Cqrs.Events;
19 : #if NET452
20 : using Microsoft.ServiceBus.Messaging;
21 : using EventData = Microsoft.ServiceBus.Messaging.EventData;
22 : #endif
23 : #if NETCOREAPP3_0
24 : using Microsoft.Azure.EventHubs;
25 : using Microsoft.Azure.EventHubs.Processor;
26 : using EventData = Microsoft.Azure.EventHubs.EventData;
27 : #endif
28 : using SpinWait = Cqrs.Infrastructure.SpinWait;
29 :
30 : namespace Cqrs.Azure.ServiceBus
31 : {
32 : /// <summary>
33 : /// A concurrent implementation of <see cref="AzureEventBusReceiver{TAuthenticationToken}"/> that resides in memory.
34 : /// </summary>
35 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
36 : public class AzureQueuedEventBusReceiver<TAuthenticationToken> : AzureEventBusReceiver<TAuthenticationToken>
37 2 : {
38 : /// <summary>
39 : /// Tracks all queues.
40 : /// </summary>
41 : protected static ConcurrentDictionary<string, ConcurrentQueue<IEvent<TAuthenticationToken>>> QueueTracker { get; private set; }
42 :
43 : /// <summary>
44 : /// Gets the <see cref="ReaderWriterLockSlim"/>.
45 : /// </summary>
46 : protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
47 :
48 : /// <summary>
49 : /// Receives an <see cref="EventData"/> message from the event bus, identifies a key and queues it accordingly.
50 : /// </summary>
51 : public AzureQueuedEventBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IHashAlgorithmFactory hashAlgorithmFactory, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
52 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, hashAlgorithmFactory, azureBusHelper)
53 : {
54 : QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<IEvent<TAuthenticationToken>>>();
55 : QueueTrackerLock = new ReaderWriterLockSlim();
56 : }
57 :
58 : /// <summary>
59 : /// Receives an <see cref="EventData"/> message from the event bus, identifies a key and queues it accordingly.
60 : /// </summary>
61 : protected override void ReceiveEvent(PartitionContext context, EventData eventData)
62 : {
63 : // Do a manual 10 try attempt with back-off
64 : for (int i = 0; i < 10; i++)
65 : {
66 : try
67 : {
68 : #if NET452
69 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
70 : #endif
71 : #if NETCOREAPP3_0
72 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset));
73 : #endif
74 : #if NET452
75 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
76 : #endif
77 : #if NETCOREAPP3_0
78 : string messageBody = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
79 : #endif
80 : IEvent<TAuthenticationToken> @event = MessageSerialiser.DeserialiseEvent(messageBody);
81 :
82 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
83 : #if NET452
84 : Logger.LogInfo(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3}.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset, @event.GetType().FullName));
85 : #endif
86 : #if NETCOREAPP3_0
87 : Logger.LogInfo(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3}.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset, @event.GetType().FullName));
88 : #endif
89 :
90 : Type eventType = @event.GetType();
91 :
92 : string targetQueueName = eventType.FullName;
93 :
94 : try
95 : {
96 : object rsn = eventType.GetProperty("Rsn").GetValue(@event, null);
97 : targetQueueName = string.Format("{0}.{1}", targetQueueName, rsn);
98 : }
99 : catch
100 : {
101 : #if NET452
102 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3} but with no Rsn property.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset, eventType));
103 : #endif
104 : #if NETCOREAPP3_0
105 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3} but with no Rsn property.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset, eventType));
106 : #endif
107 : // Do nothing if there is no rsn. Just use @event type name
108 : }
109 :
110 : CreateQueueAndAttachListenerIfNotExist(targetQueueName);
111 : EnqueueEvent(targetQueueName, @event);
112 :
113 : // remove the original message from the incoming queue
114 : context.CheckpointAsync(eventData);
115 :
116 : #if NET452
117 : Logger.LogDebug(string.Format("An event message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
118 : #endif
119 : #if NETCOREAPP3_0
120 : Logger.LogDebug(string.Format("An event message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset));
121 : #endif
122 : return;
123 : }
124 : catch (Exception exception)
125 : {
126 : // Indicates a problem, unlock message in queue
127 : #if NET452
128 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
129 : #endif
130 : #if NETCOREAPP3_0
131 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.SystemProperties.PartitionKey, eventData.SystemProperties.SequenceNumber, eventData.SystemProperties.Offset), exception: exception);
132 : #endif
133 :
134 : switch (i)
135 : {
136 : case 0:
137 : case 1:
138 : // 10 seconds
139 : Thread.Sleep(10 * 1000);
140 : break;
141 : case 2:
142 : case 3:
143 : // 30 seconds
144 : Thread.Sleep(30 * 1000);
145 : break;
146 : case 4:
147 : case 5:
148 : case 6:
149 : // 1 minute
150 : Thread.Sleep(60 * 1000);
151 : break;
152 : case 7:
153 : case 8:
154 : case 9:
155 : // 3 minutes
156 : Thread.Sleep(3 * 60 * 1000);
157 : break;
158 : }
159 : }
160 : }
161 : // Eventually just accept it
162 : context.CheckpointAsync(eventData);
163 : }
164 :
165 : /// <summary>
166 : /// Adds the provided <paramref name="event"/> to the <see cref="QueueTracker"/> of the queue <paramref name="targetQueueName"/>.
167 : /// </summary>
168 : private void EnqueueEvent(string targetQueueName, IEvent<TAuthenticationToken> @event)
169 : {
170 : var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<IEvent<TAuthenticationToken>>());
171 : queue.Enqueue(@event);
172 : }
173 :
174 : /// <summary>
175 : /// Creates the queue of the name <paramref name="queueName"/> if it does not already exist,
176 : /// the queue is attached to <see cref="DequeuAndProcessEvent"/> using a <see cref="Thread"/>.
177 : /// </summary>
178 : /// <param name="queueName">The name of the queue to check and create.</param>
179 : protected void CreateQueueAndAttachListenerIfNotExist(string queueName)
180 : {
181 : if (!QueueTracker.ContainsKey(queueName))
182 : {
183 : QueueTrackerLock.EnterWriteLock();
184 : try
185 : {
186 : if (!QueueTracker.ContainsKey(queueName))
187 : {
188 : QueueTracker.TryAdd(queueName, new ConcurrentQueue<IEvent<TAuthenticationToken>>());
189 : new Thread(() =>
190 : {
191 : Thread.CurrentThread.Name = queueName;
192 : DequeuAndProcessEvent(queueName);
193 : }).Start();
194 : }
195 : }
196 : catch (Exception exception)
197 : {
198 : Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
199 : }
200 : finally
201 : {
202 : QueueTrackerLock.ExitWriteLock();
203 : }
204 : }
205 : }
206 :
207 : /// <summary>
208 : /// Takes an <see cref="IEvent{TAuthenticationToken}"/> off the queue of <paramref name="queueName"/>
209 : /// and calls <see cref="ReceiveEvent"/>. Repeats in a loop until the queue is empty.
210 : /// </summary>
211 : /// <param name="queueName">The name of the queue process.</param>
212 : protected void DequeuAndProcessEvent(string queueName)
213 : {
214 : SpinWait.SpinUntil
215 : (
216 : () =>
217 : {
218 : try
219 : {
220 : ConcurrentQueue<IEvent<TAuthenticationToken>> queue;
221 : if (QueueTracker.TryGetValue(queueName, out queue))
222 : {
223 : while (!queue.IsEmpty)
224 : {
225 : IEvent<TAuthenticationToken> @event;
226 : if (queue.TryDequeue(out @event))
227 : {
228 : try
229 : {
230 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
231 : }
232 : catch (Exception exception)
233 : {
234 : Logger.LogError(string.Format("Trying to set the CorrelationId from the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
235 : }
236 : try
237 : {
238 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
239 : }
240 : catch (Exception exception)
241 : {
242 : Logger.LogError(string.Format("Trying to set the AuthenticationToken from the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
243 : }
244 : try
245 : {
246 : ReceiveEvent(@event);
247 : }
248 : catch (Exception exception)
249 : {
250 : Logger.LogError(string.Format("Processing the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
251 : queue.Enqueue(@event);
252 : }
253 : }
254 : else
255 : Logger.LogDebug(string.Format("Trying to dequeue a event from the queue '{0}' failed.", queueName));
256 : }
257 : }
258 : else
259 : Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
260 : Thread.Sleep(100);
261 : }
262 : catch (Exception exception)
263 : {
264 : Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
265 : }
266 :
267 : // Always return false to keep this spinning.
268 : return false;
269 : },
270 : sleepInMilliseconds: 1000
271 : );
272 : }
273 :
274 : /// <summary>
275 : /// The number of queues currently known.
276 : /// </summary>
277 : public int QueueCount
278 : {
279 : get
280 : {
281 : QueueTrackerLock.EnterReadLock();
282 : try
283 : {
284 : return QueueTracker.Count;
285 : }
286 : finally
287 : {
288 : QueueTrackerLock.ExitReadLock();
289 : }
290 : }
291 : }
292 :
293 : /// <summary>
294 : /// The name of all currently known queues.
295 : /// </summary>
296 : public ICollection<string> QueueNames
297 : {
298 : get
299 : {
300 : QueueTrackerLock.EnterReadLock();
301 : try
302 : {
303 : return QueueTracker.Keys;
304 : }
305 : finally
306 : {
307 : QueueTrackerLock.ExitReadLock();
308 : }
309 : }
310 : }
311 : }
312 : }
|