Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Text;
13 : using System.Threading;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Bus;
17 : using Cqrs.Commands;
18 : using Cqrs.Configuration;
19 : using Microsoft.ServiceBus.Messaging;
20 : using SpinWait = Cqrs.Infrastructure.SpinWait;
21 :
22 : namespace Cqrs.Azure.ServiceBus
23 : {
24 : /// <summary>
25 : /// A concurrent implementation of <see cref="AzureCommandBusReceiver{TAuthenticationToken}"/> that resides in memory.
26 : /// </summary>
27 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
28 : public class AzureQueuedCommandBusReceiver<TAuthenticationToken> : AzureCommandBusReceiver<TAuthenticationToken>
29 2 : {
30 : /// <summary>
31 : /// Tracks all queues.
32 : /// </summary>
33 : protected static ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>> QueueTracker { get; private set; }
34 :
35 : /// <summary>
36 : /// Gets the <see cref="ReaderWriterLockSlim"/>.
37 : /// </summary>
38 : protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
39 :
40 : /// <summary>
41 : /// Instantiates a new instance of <see cref="AzureQueuedCommandBusReceiver{TAuthenticationToken}"/>.
42 : /// </summary>
43 : public AzureQueuedCommandBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IHashAlgorithmFactory hashAlgorithmFactory, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
44 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, hashAlgorithmFactory, azureBusHelper)
45 : {
46 : QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>>();
47 : QueueTrackerLock = new ReaderWriterLockSlim();
48 : }
49 :
50 : /// <summary>
51 : /// Receives a <see cref="BrokeredMessage"/> from the command bus, identifies a key and queues it accordingly.
52 : /// </summary>
53 : protected override void ReceiveCommand(PartitionContext context, EventData eventData)
54 : {
55 : // Do a manual 10 try attempt with back-off
56 : for (int i = 0; i < 10; i++)
57 : {
58 : try
59 : {
60 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
61 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
62 : ICommand<TAuthenticationToken> command = MessageSerialiser.DeserialiseCommand(messageBody);
63 :
64 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
65 : Logger.LogInfo(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3}.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset, command.GetType().FullName));
66 :
67 : Type commandType = command.GetType();
68 :
69 : string targetQueueName = commandType.FullName;
70 :
71 : try
72 : {
73 : object rsn = commandType.GetProperty("Rsn").GetValue(command, null);
74 : targetQueueName = string.Format("{0}.{1}", targetQueueName, rsn);
75 : }
76 : catch
77 : {
78 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3} but with no Rsn property.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset, commandType));
79 : // Do nothing if there is no rsn. Just use command type name
80 : }
81 :
82 : CreateQueueAndAttachListenerIfNotExist(targetQueueName);
83 : EnqueueCommand(targetQueueName, command);
84 :
85 : // remove the original message from the incoming queue
86 : context.CheckpointAsync(eventData);
87 :
88 : Logger.LogDebug(string.Format("A command message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
89 : return;
90 : }
91 : catch (Exception exception)
92 : {
93 : // Indicates a problem, unlock message in queue
94 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
95 :
96 : switch (i)
97 : {
98 : case 0:
99 : case 1:
100 : // 10 seconds
101 : Thread.Sleep(10 * 1000);
102 : break;
103 : case 2:
104 : case 3:
105 : // 30 seconds
106 : Thread.Sleep(30 * 1000);
107 : break;
108 : case 4:
109 : case 5:
110 : case 6:
111 : // 1 minute
112 : Thread.Sleep(60 * 1000);
113 : break;
114 : case 7:
115 : case 8:
116 : case 9:
117 : // 3 minutes
118 : Thread.Sleep(3 * 60 * 1000);
119 : break;
120 : }
121 : }
122 : }
123 : // Eventually just accept it
124 : context.CheckpointAsync(eventData);
125 : }
126 :
127 : /// <summary>
128 : /// Adds the provided <paramref name="command"/> to the <see cref="QueueTracker"/> of the queue <paramref name="targetQueueName"/>.
129 : /// </summary>
130 : private void EnqueueCommand(string targetQueueName, ICommand<TAuthenticationToken> command)
131 : {
132 : var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
133 : queue.Enqueue(command);
134 : }
135 :
136 : /// <summary>
137 : /// Creates the queue of the name <paramref name="queueName"/> if it does not already exist,
138 : /// the queue is attached to <see cref="DequeuAndProcessCommand"/> using a <see cref="Thread"/>.
139 : /// </summary>
140 : /// <param name="queueName">The name of the queue to check and create.</param>
141 : protected void CreateQueueAndAttachListenerIfNotExist(string queueName)
142 : {
143 : if (!QueueTracker.ContainsKey(queueName))
144 : {
145 : QueueTrackerLock.EnterWriteLock();
146 : try
147 : {
148 : if (!QueueTracker.ContainsKey(queueName))
149 : {
150 : QueueTracker.TryAdd(queueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
151 : new Thread(() =>
152 : {
153 : Thread.CurrentThread.Name = queueName;
154 : DequeuAndProcessCommand(queueName);
155 : }).Start();
156 : }
157 : }
158 : catch (Exception exception)
159 : {
160 : Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
161 : }
162 : finally
163 : {
164 : QueueTrackerLock.ExitWriteLock();
165 : }
166 : }
167 : }
168 :
169 : /// <summary>
170 : /// Takes an <see cref="ICommand{TAuthenticationToken}"/> off the queue of <paramref name="queueName"/>
171 : /// and calls <see cref="ReceiveCommand"/>. Repeats in a loop until the queue is empty.
172 : /// </summary>
173 : /// <param name="queueName">The name of the queue process.</param>
174 : protected void DequeuAndProcessCommand(string queueName)
175 : {
176 : SpinWait.SpinUntil
177 : (
178 : () =>
179 : {
180 : try
181 : {
182 : ConcurrentQueue<ICommand<TAuthenticationToken>> queue;
183 : if (QueueTracker.TryGetValue(queueName, out queue))
184 : {
185 : while (!queue.IsEmpty)
186 : {
187 : ICommand<TAuthenticationToken> command;
188 : if (queue.TryDequeue(out command))
189 : {
190 : try
191 : {
192 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
193 : }
194 : catch (Exception exception)
195 : {
196 : Logger.LogError(string.Format("Trying to set the CorrelationId from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
197 : }
198 : try
199 : {
200 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
201 : }
202 : catch (Exception exception)
203 : {
204 : Logger.LogError(string.Format("Trying to set the AuthenticationToken from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
205 : }
206 : try
207 : {
208 : ReceiveCommand(command);
209 : }
210 : catch (Exception exception)
211 : {
212 : Logger.LogError(string.Format("Processing the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
213 : queue.Enqueue(command);
214 : }
215 : }
216 : else
217 : Logger.LogDebug(string.Format("Trying to dequeue a command from the queue '{0}' failed.", queueName));
218 : }
219 : }
220 : else
221 : Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
222 : Thread.Sleep(100);
223 : }
224 : catch (Exception exception)
225 : {
226 : Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
227 : }
228 :
229 : // Always return false to keep this spinning.
230 : return false;
231 : },
232 : sleepInMilliseconds: 1000
233 : );
234 : }
235 :
236 : /// <summary>
237 : /// The number of queues currently known.
238 : /// </summary>
239 : public int QueueCount
240 : {
241 : get
242 : {
243 : QueueTrackerLock.EnterReadLock();
244 : try
245 : {
246 : return QueueTracker.Count;
247 : }
248 : finally
249 : {
250 : QueueTrackerLock.ExitReadLock();
251 : }
252 : }
253 : }
254 :
255 : /// <summary>
256 : /// The name of all currently known queues.
257 : /// </summary>
258 : public ICollection<string> QueueNames
259 : {
260 : get
261 : {
262 : QueueTrackerLock.EnterReadLock();
263 : try
264 : {
265 : return QueueTracker.Keys;
266 : }
267 : finally
268 : {
269 : QueueTrackerLock.ExitReadLock();
270 : }
271 : }
272 : }
273 : }
274 : }
|