Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Text;
13 : using System.Threading;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Bus;
17 : using Cqrs.Configuration;
18 : using Cqrs.Events;
19 : using Microsoft.ServiceBus.Messaging;
20 : using EventData = Microsoft.ServiceBus.Messaging.EventData;
21 : using SpinWait = Cqrs.Infrastructure.SpinWait;
22 :
23 : namespace Cqrs.Azure.ServiceBus
24 : {
25 : /// <summary>
26 : /// A concurrent implementation of <see cref="AzureEventBusReceiver{TAuthenticationToken}"/> that resides in memory.
27 : /// </summary>
28 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
29 : public class AzureQueuedEventBusReceiver<TAuthenticationToken> : AzureEventBusReceiver<TAuthenticationToken>
30 2 : {
31 : /// <summary>
32 : /// Tracks all queues.
33 : /// </summary>
34 : protected static ConcurrentDictionary<string, ConcurrentQueue<IEvent<TAuthenticationToken>>> QueueTracker { get; private set; }
35 :
36 : /// <summary>
37 : /// Gets the <see cref="ReaderWriterLockSlim"/>.
38 : /// </summary>
39 : protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
40 :
41 : /// <summary>
42 : /// Receives a <see cref="BrokeredMessage"/> from the event bus, identifies a key and queues it accordingly.
43 : /// </summary>
44 : public AzureQueuedEventBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IHashAlgorithmFactory hashAlgorithmFactory, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
45 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, hashAlgorithmFactory, azureBusHelper)
46 : {
47 : QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<IEvent<TAuthenticationToken>>>();
48 : QueueTrackerLock = new ReaderWriterLockSlim();
49 : }
50 :
51 : /// <summary>
52 : /// Receives a <see cref="BrokeredMessage"/> from the event bus, identifies a key and queues it accordingly.
53 : /// </summary>
54 : protected override void ReceiveEvent(PartitionContext context, EventData eventData)
55 : {
56 : // Do a manual 10 try attempt with back-off
57 : for (int i = 0; i < 10; i++)
58 : {
59 : try
60 : {
61 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
62 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
63 : IEvent<TAuthenticationToken> @event = MessageSerialiser.DeserialiseEvent(messageBody);
64 :
65 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
66 : Logger.LogInfo(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3}.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset, @event.GetType().FullName));
67 :
68 : Type eventType = @event.GetType();
69 :
70 : string targetQueueName = eventType.FullName;
71 :
72 : try
73 : {
74 : object rsn = eventType.GetProperty("Rsn").GetValue(@event, null);
75 : targetQueueName = string.Format("{0}.{1}", targetQueueName, rsn);
76 : }
77 : catch
78 : {
79 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3} but with no Rsn property.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset, eventType));
80 : // Do nothing if there is no rsn. Just use @event type name
81 : }
82 :
83 : CreateQueueAndAttachListenerIfNotExist(targetQueueName);
84 : EnqueueEvent(targetQueueName, @event);
85 :
86 : // remove the original message from the incoming queue
87 : context.CheckpointAsync(eventData);
88 :
89 : Logger.LogDebug(string.Format("An event message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
90 : return;
91 : }
92 : catch (Exception exception)
93 : {
94 : // Indicates a problem, unlock message in queue
95 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
96 :
97 : switch (i)
98 : {
99 : case 0:
100 : case 1:
101 : // 10 seconds
102 : Thread.Sleep(10 * 1000);
103 : break;
104 : case 2:
105 : case 3:
106 : // 30 seconds
107 : Thread.Sleep(30 * 1000);
108 : break;
109 : case 4:
110 : case 5:
111 : case 6:
112 : // 1 minute
113 : Thread.Sleep(60 * 1000);
114 : break;
115 : case 7:
116 : case 8:
117 : case 9:
118 : // 3 minutes
119 : Thread.Sleep(3 * 60 * 1000);
120 : break;
121 : }
122 : }
123 : }
124 : // Eventually just accept it
125 : context.CheckpointAsync(eventData);
126 : }
127 :
128 : /// <summary>
129 : /// Adds the provided <paramref name="event"/> to the <see cref="QueueTracker"/> of the queue <paramref name="targetQueueName"/>.
130 : /// </summary>
131 : private void EnqueueEvent(string targetQueueName, IEvent<TAuthenticationToken> @event)
132 : {
133 : var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<IEvent<TAuthenticationToken>>());
134 : queue.Enqueue(@event);
135 : }
136 :
137 : /// <summary>
138 : /// Creates the queue of the name <paramref name="queueName"/> if it does not already exist,
139 : /// the queue is attached to <see cref="DequeuAndProcessEvent"/> using a <see cref="Thread"/>.
140 : /// </summary>
141 : /// <param name="queueName">The name of the queue to check and create.</param>
142 : protected void CreateQueueAndAttachListenerIfNotExist(string queueName)
143 : {
144 : if (!QueueTracker.ContainsKey(queueName))
145 : {
146 : QueueTrackerLock.EnterWriteLock();
147 : try
148 : {
149 : if (!QueueTracker.ContainsKey(queueName))
150 : {
151 : QueueTracker.TryAdd(queueName, new ConcurrentQueue<IEvent<TAuthenticationToken>>());
152 : new Thread(() =>
153 : {
154 : Thread.CurrentThread.Name = queueName;
155 : DequeuAndProcessEvent(queueName);
156 : }).Start();
157 : }
158 : }
159 : catch (Exception exception)
160 : {
161 : Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
162 : }
163 : finally
164 : {
165 : QueueTrackerLock.ExitWriteLock();
166 : }
167 : }
168 : }
169 :
170 : /// <summary>
171 : /// Takes an <see cref="IEvent{TAuthenticationToken}"/> off the queue of <paramref name="queueName"/>
172 : /// and calls <see cref="ReceiveEvent"/>. Repeats in a loop until the queue is empty.
173 : /// </summary>
174 : /// <param name="queueName">The name of the queue process.</param>
175 : protected void DequeuAndProcessEvent(string queueName)
176 : {
177 : SpinWait.SpinUntil
178 : (
179 : () =>
180 : {
181 : try
182 : {
183 : ConcurrentQueue<IEvent<TAuthenticationToken>> queue;
184 : if (QueueTracker.TryGetValue(queueName, out queue))
185 : {
186 : while (!queue.IsEmpty)
187 : {
188 : IEvent<TAuthenticationToken> @event;
189 : if (queue.TryDequeue(out @event))
190 : {
191 : try
192 : {
193 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
194 : }
195 : catch (Exception exception)
196 : {
197 : Logger.LogError(string.Format("Trying to set the CorrelationId from the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
198 : }
199 : try
200 : {
201 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
202 : }
203 : catch (Exception exception)
204 : {
205 : Logger.LogError(string.Format("Trying to set the AuthenticationToken from the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
206 : }
207 : try
208 : {
209 : ReceiveEvent(@event);
210 : }
211 : catch (Exception exception)
212 : {
213 : Logger.LogError(string.Format("Processing the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
214 : queue.Enqueue(@event);
215 : }
216 : }
217 : else
218 : Logger.LogDebug(string.Format("Trying to dequeue a event from the queue '{0}' failed.", queueName));
219 : }
220 : }
221 : else
222 : Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
223 : Thread.Sleep(100);
224 : }
225 : catch (Exception exception)
226 : {
227 : Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
228 : }
229 :
230 : // Always return false to keep this spinning.
231 : return false;
232 : },
233 : sleepInMilliseconds: 1000
234 : );
235 : }
236 :
237 : /// <summary>
238 : /// The number of queues currently known.
239 : /// </summary>
240 : public int QueueCount
241 : {
242 : get
243 : {
244 : QueueTrackerLock.EnterReadLock();
245 : try
246 : {
247 : return QueueTracker.Count;
248 : }
249 : finally
250 : {
251 : QueueTrackerLock.ExitReadLock();
252 : }
253 : }
254 : }
255 :
256 : /// <summary>
257 : /// The name of all currently known queues.
258 : /// </summary>
259 : public ICollection<string> QueueNames
260 : {
261 : get
262 : {
263 : QueueTrackerLock.EnterReadLock();
264 : try
265 : {
266 : return QueueTracker.Keys;
267 : }
268 : finally
269 : {
270 : QueueTrackerLock.ExitReadLock();
271 : }
272 : }
273 : }
274 : }
275 : }
|