Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Text;
13 : using System.Threading;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Commands;
17 : using Cqrs.Configuration;
18 : using Microsoft.ServiceBus.Messaging;
19 : using SpinWait = Cqrs.Infrastructure.SpinWait;
20 :
21 : namespace Cqrs.Azure.ServiceBus
22 : {
23 : /// <summary>
24 : /// A concurrent implementation of <see cref="AzureCommandBusReceiver{TAuthenticationToken}"/> that resides in memory.
25 : /// </summary>
26 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
27 : public class AzureQueuedCommandBusReceiver<TAuthenticationToken> : AzureCommandBusReceiver<TAuthenticationToken>
28 : {
29 : /// <summary>
30 : /// Tracks all queues.
31 : /// </summary>
32 : protected static ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>> QueueTracker { get; private set; }
33 :
34 : /// <summary>
35 : /// Gets the <see cref="ReaderWriterLockSlim"/>.
36 : /// </summary>
37 : protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
38 :
39 : /// <summary>
40 : /// Instantiates a new instance of <see cref="AzureQueuedCommandBusReceiver{TAuthenticationToken}"/>.
41 : /// </summary>
42 2 : public AzureQueuedCommandBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
43 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper)
44 : {
45 : QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>>();
46 : QueueTrackerLock = new ReaderWriterLockSlim();
47 : }
48 :
49 : /// <summary>
50 : /// Receives a <see cref="BrokeredMessage"/> from the command bus, identifies a key and queues it accordingly.
51 : /// </summary>
52 2 : protected override void ReceiveCommand(PartitionContext context, EventData eventData)
53 : {
54 : // Do a manual 10 try attempt with back-off
55 : for (int i = 0; i < 10; i++)
56 : {
57 : try
58 : {
59 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
60 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
61 : ICommand<TAuthenticationToken> command = MessageSerialiser.DeserialiseCommand(messageBody);
62 :
63 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
64 : Logger.LogInfo(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3}.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset, command.GetType().FullName));
65 :
66 : Type commandType = command.GetType();
67 :
68 : string targetQueueName = commandType.FullName;
69 :
70 : try
71 : {
72 : object rsn = commandType.GetProperty("Rsn").GetValue(command, null);
73 : targetQueueName = string.Format("{0}.{1}", targetQueueName, rsn);
74 : }
75 : catch
76 : {
77 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3} but with no Rsn property.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset, commandType));
78 : // Do nothing if there is no rsn. Just use command type name
79 : }
80 :
81 : CreateQueueAndAttachListenerIfNotExist(targetQueueName);
82 : EnqueueCommand(targetQueueName, command);
83 :
84 : // remove the original message from the incoming queue
85 : context.CheckpointAsync(eventData);
86 :
87 : Logger.LogDebug(string.Format("A command message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
88 : return;
89 : }
90 : catch (Exception exception)
91 : {
92 : // Indicates a problem, unlock message in queue
93 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
94 :
95 : switch (i)
96 : {
97 : case 0:
98 : case 1:
99 : // 10 seconds
100 : Thread.Sleep(10 * 1000);
101 : break;
102 : case 2:
103 : case 3:
104 : // 30 seconds
105 : Thread.Sleep(30 * 1000);
106 : break;
107 : case 4:
108 : case 5:
109 : case 6:
110 : // 1 minute
111 : Thread.Sleep(60 * 1000);
112 : break;
113 : case 7:
114 : case 8:
115 : case 9:
116 : // 3 minutes
117 : Thread.Sleep(3 * 60 * 1000);
118 : break;
119 : }
120 : }
121 : }
122 : // Eventually just accept it
123 : context.CheckpointAsync(eventData);
124 : }
125 :
126 : /// <summary>
127 : /// Adds the provided <paramref name="command"/> to the <see cref="QueueTracker"/> of the queue <paramref name="targetQueueName"/>.
128 : /// </summary>
129 : private void EnqueueCommand(string targetQueueName, ICommand<TAuthenticationToken> command)
130 : {
131 : var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
132 : queue.Enqueue(command);
133 : }
134 :
135 : /// <summary>
136 : /// Creates the queue of the name <paramref name="queueName"/> if it does not already exist,
137 : /// the queue is attached to <see cref="DequeuAndProcessCommand"/> using a <see cref="Thread"/>.
138 : /// </summary>
139 : /// <param name="queueName">The name of the queue to check and create.</param>
140 2 : protected void CreateQueueAndAttachListenerIfNotExist(string queueName)
141 : {
142 : if (!QueueTracker.ContainsKey(queueName))
143 : {
144 : QueueTrackerLock.EnterWriteLock();
145 : try
146 : {
147 : if (!QueueTracker.ContainsKey(queueName))
148 : {
149 : QueueTracker.TryAdd(queueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
150 : new Thread(() =>
151 : {
152 : Thread.CurrentThread.Name = queueName;
153 : DequeuAndProcessCommand(queueName);
154 : }).Start();
155 : }
156 : }
157 : catch (Exception exception)
158 : {
159 : Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
160 : }
161 : finally
162 : {
163 : QueueTrackerLock.ExitWriteLock();
164 : }
165 : }
166 : }
167 :
168 : /// <summary>
169 : /// Takes an <see cref="ICommand{TAuthenticationToken}"/> off the queue of <paramref name="queueName"/>
170 : /// and calls <see cref="ReceiveCommand"/>. Repeats in a loop until the queue is empty.
171 : /// </summary>
172 : /// <param name="queueName">The name of the queue process.</param>
173 2 : protected void DequeuAndProcessCommand(string queueName)
174 : {
175 : SpinWait.SpinUntil
176 : (
177 : () =>
178 : {
179 : try
180 : {
181 : ConcurrentQueue<ICommand<TAuthenticationToken>> queue;
182 : if (QueueTracker.TryGetValue(queueName, out queue))
183 : {
184 : while (!queue.IsEmpty)
185 : {
186 : ICommand<TAuthenticationToken> command;
187 : if (queue.TryDequeue(out command))
188 : {
189 : try
190 : {
191 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
192 : }
193 : catch (Exception exception)
194 : {
195 : Logger.LogError(string.Format("Trying to set the CorrelationId from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
196 : }
197 : try
198 : {
199 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
200 : }
201 : catch (Exception exception)
202 : {
203 : Logger.LogError(string.Format("Trying to set the AuthenticationToken from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
204 : }
205 : try
206 : {
207 : ReceiveCommand(command);
208 : }
209 : catch (Exception exception)
210 : {
211 : Logger.LogError(string.Format("Processing the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
212 : queue.Enqueue(command);
213 : }
214 : }
215 : else
216 : Logger.LogDebug(string.Format("Trying to dequeue a command from the queue '{0}' failed.", queueName));
217 : }
218 : }
219 : else
220 : Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
221 : Thread.Sleep(100);
222 : }
223 : catch (Exception exception)
224 : {
225 : Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
226 : }
227 :
228 : // Always return false to keep this spinning.
229 : return false;
230 : },
231 : sleepInMilliseconds: 1000
232 : );
233 : }
234 :
235 : /// <summary>
236 : /// The number of queues currently known.
237 : /// </summary>
238 : public int QueueCount
239 : {
240 : get
241 : {
242 : QueueTrackerLock.EnterReadLock();
243 : try
244 : {
245 : return QueueTracker.Count;
246 : }
247 : finally
248 : {
249 : QueueTrackerLock.ExitReadLock();
250 : }
251 : }
252 : }
253 :
254 : /// <summary>
255 : /// The name of all currently known queues.
256 : /// </summary>
257 : public ICollection<string> QueueNames
258 : {
259 : get
260 : {
261 : QueueTrackerLock.EnterReadLock();
262 : try
263 : {
264 : return QueueTracker.Keys;
265 : }
266 : finally
267 : {
268 : QueueTrackerLock.ExitReadLock();
269 : }
270 : }
271 : }
272 : }
273 : }
|