Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Threading;
13 : using cdmdotnet.Logging;
14 : using Cqrs.Authentication;
15 : using Cqrs.Bus;
16 : using Cqrs.Commands;
17 : using Cqrs.Configuration;
18 : using Microsoft.ServiceBus.Messaging;
19 : using SpinWait = Cqrs.Infrastructure.SpinWait;
20 :
21 : namespace Cqrs.Azure.ServiceBus
22 : {
23 : /// <summary>
24 : /// A concurrent implementation of <see cref="AzureCommandBusReceiver{TAuthenticationToken}"/> that resides in memory.
25 : /// </summary>
26 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
27 : public class AzureQueuedCommandBusReceiver<TAuthenticationToken> : AzureCommandBusReceiver<TAuthenticationToken>
28 2 : {
29 : /// <summary>
30 : /// Tracks all queues.
31 : /// </summary>
32 : protected static ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>> QueueTracker { get; private set; }
33 :
34 : /// <summary>
35 : /// Gets the <see cref="ReaderWriterLockSlim"/>.
36 : /// </summary>
37 : protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
38 :
39 : /// <summary>
40 : /// Instantiates a new instance of <see cref="AzureQueuedCommandBusReceiver{TAuthenticationToken}"/>.
41 : /// </summary>
42 2 : public AzureQueuedCommandBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper)
43 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, busHelper)
44 : {
45 : QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>>();
46 : QueueTrackerLock = new ReaderWriterLockSlim();
47 : }
48 :
49 : /// <summary>
50 : /// Receives a <see cref="BrokeredMessage"/> from the command bus, identifies a key and queues it accordingly.
51 : /// </summary>
52 2 : protected override void ReceiveCommand(BrokeredMessage message)
53 : {
54 : try
55 : {
56 : Logger.LogDebug(string.Format("A command message arrived with the id '{0}'.", message.MessageId));
57 : string messageBody = message.GetBody<string>();
58 : ICommand<TAuthenticationToken> command = MessageSerialiser.DeserialiseCommand(messageBody);
59 :
60 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
61 : Logger.LogInfo(string.Format("A command message arrived with the id '{0}' was of type {1}.", message.MessageId, command.GetType().FullName));
62 :
63 : Type commandType = command.GetType();
64 :
65 : string targetQueueName = commandType.FullName;
66 :
67 : try
68 : {
69 : object rsn = commandType.GetProperty("Rsn").GetValue(command, null);
70 : targetQueueName = string.Format("{0}.{1}", targetQueueName, rsn);
71 : }
72 : catch
73 : {
74 : Logger.LogDebug(string.Format("A command message arrived with the id '{0}' was of type {1} but with no Rsn property.", message.MessageId, commandType));
75 : // Do nothing if there is no rsn. Just use command type name
76 : }
77 :
78 : CreateQueueAndAttachListenerIfNotExist(targetQueueName);
79 : EnqueueCommand(targetQueueName, command);
80 :
81 : // remove the original message from the incoming queue
82 : message.Complete();
83 :
84 : Logger.LogDebug(string.Format("A command message arrived and was processed with the id '{0}'.", message.MessageId));
85 : }
86 : catch (Exception exception)
87 : {
88 : // Indicates a problem, unlock message in queue
89 : Logger.LogError(string.Format("A command message arrived with the id '{0}' but failed to be process.", message.MessageId), exception: exception);
90 : message.Abandon();
91 : }
92 : }
93 :
94 : /// <summary>
95 : /// Adds the provided <paramref name="command"/> to the <see cref="QueueTracker"/> of the queue <paramref name="targetQueueName"/>.
96 : /// </summary>
97 : private void EnqueueCommand(string targetQueueName, ICommand<TAuthenticationToken> command)
98 : {
99 : var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
100 : queue.Enqueue(command);
101 : }
102 :
103 : /// <summary>
104 : /// Creates the queue of the name <paramref name="queueName"/> if it does not already exist,
105 : /// the queue is attached to <see cref="DequeuAndProcessCommand"/> using a <see cref="Thread"/>.
106 : /// </summary>
107 : /// <param name="queueName">The name of the queue to check and create.</param>
108 2 : protected void CreateQueueAndAttachListenerIfNotExist(string queueName)
109 : {
110 : if (!QueueTracker.ContainsKey(queueName))
111 : {
112 : QueueTrackerLock.EnterWriteLock();
113 : try
114 : {
115 : if (!QueueTracker.ContainsKey(queueName))
116 : {
117 : QueueTracker.TryAdd(queueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
118 : new Thread(() =>
119 : {
120 : Thread.CurrentThread.Name = queueName;
121 : DequeuAndProcessCommand(queueName);
122 : }).Start();
123 : }
124 : }
125 : catch (Exception exception)
126 : {
127 : Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
128 : }
129 : finally
130 : {
131 : QueueTrackerLock.ExitWriteLock();
132 : }
133 : }
134 : }
135 :
136 : /// <summary>
137 : /// Takes an <see cref="ICommand{TAuthenticationToken}"/> off the queue of <paramref name="queueName"/>
138 : /// and calls <see cref="ReceiveCommand"/>. Repeats in a loop until the queue is empty.
139 : /// </summary>
140 : /// <param name="queueName">The name of the queue process.</param>
141 2 : protected void DequeuAndProcessCommand(string queueName)
142 : {
143 : SpinWait.SpinUntil
144 : (
145 : () =>
146 : {
147 : try
148 : {
149 : ConcurrentQueue<ICommand<TAuthenticationToken>> queue;
150 : if (QueueTracker.TryGetValue(queueName, out queue))
151 : {
152 : while (!queue.IsEmpty)
153 : {
154 : ICommand<TAuthenticationToken> command;
155 : if (queue.TryDequeue(out command))
156 : {
157 : try
158 : {
159 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
160 : }
161 : catch (Exception exception)
162 : {
163 : Logger.LogError(string.Format("Trying to set the CorrelationId from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
164 : }
165 : try
166 : {
167 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
168 : }
169 : catch (Exception exception)
170 : {
171 : Logger.LogError(string.Format("Trying to set the AuthenticationToken from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
172 : }
173 : try
174 : {
175 : ReceiveCommand(command);
176 : }
177 : catch (Exception exception)
178 : {
179 : Logger.LogError(string.Format("Processing the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
180 : queue.Enqueue(command);
181 : }
182 : }
183 : else
184 : Logger.LogDebug(string.Format("Trying to dequeue a command from the queue '{0}' failed.", queueName));
185 : }
186 : }
187 : else
188 : Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
189 :
190 : Thread.Sleep(100);
191 : }
192 : catch (Exception exception)
193 : {
194 : Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
195 : }
196 :
197 : // Always return false to keep this spinning.
198 : return false;
199 : },
200 : sleepInMilliseconds: 1000
201 : );
202 : }
203 :
204 : /// <summary>
205 : /// The number of queues currently known.
206 : /// </summary>
207 : public int QueueCount
208 : {
209 : get
210 : {
211 : QueueTrackerLock.EnterReadLock();
212 : try
213 : {
214 : return QueueTracker.Count;
215 : }
216 : finally
217 : {
218 : QueueTrackerLock.ExitReadLock();
219 : }
220 : }
221 : }
222 :
223 : /// <summary>
224 : /// The name of all currently known queues.
225 : /// </summary>
226 : public ICollection<string> QueueNames
227 : {
228 : get
229 : {
230 : QueueTrackerLock.EnterReadLock();
231 : try
232 : {
233 : return QueueTracker.Keys;
234 : }
235 : finally
236 : {
237 : QueueTrackerLock.ExitReadLock();
238 : }
239 : }
240 : }
241 : }
242 : }
|