Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Linq;
13 : using System.Threading;
14 : using Cqrs.Authentication;
15 : using Cqrs.Commands;
16 : using cdmdotnet.Logging;
17 : using Cqrs.Configuration;
18 : using Cqrs.Exceptions;
19 : using Cqrs.Messages;
20 :
21 : namespace Cqrs.Bus
22 : {
23 : /// <summary>
24 : /// Receives instances of a <see cref="ICommand{TAuthenticationToken}"/> from the command bus, places them into one of several internal concurrent queues and then processes the commands one at a time per queue.
25 : /// </summary>
26 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of authentication token.</typeparam>
27 : public abstract class QueuedCommandBusReceiver<TAuthenticationToken> : ICommandReceiver<TAuthenticationToken>
28 1 : {
29 : /// <summary>
30 : /// The queues keyed by an identifier.
31 : /// </summary>
32 : protected static ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>> QueueTracker { get; private set; }
33 :
34 : /// <summary>
35 : /// A <see cref="ReaderWriterLockSlim"/> for providing a lock mechanism around the main <see cref="QueueTracker"/>.
36 : /// </summary>
37 : protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
38 :
39 : /// <summary>
40 : /// Gets or sets the <see cref="IAuthenticationTokenHelper{TAuthenticationToken}"/>
41 : /// </summary>
42 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
43 :
44 : /// <summary>
45 : /// Gets or sets the <see cref="ICorrelationIdHelper"/>
46 : /// </summary>
47 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
48 :
49 : /// <summary>
50 : /// Gets or sets the <see cref="ILogger"/>
51 : /// </summary>
52 : protected ILogger Logger { get; private set; }
53 :
54 : /// <summary>
55 : /// Gets or sets the <see cref="IConfigurationManager"/>
56 : /// </summary>
57 : protected IConfigurationManager ConfigurationManager { get; private set; }
58 :
59 : /// <summary>
60 : /// Gets or sets the <see cref="IBusHelper"/>
61 : /// </summary>
62 : protected IBusHelper BusHelper { get; private set; }
63 :
64 : /// <summary>
65 : /// Gets or sets the routes or handlers that will be executed as the commands arrive.
66 : /// </summary>
67 : protected abstract IDictionary<Type, IList<Action<IMessage>>> Routes { get; }
68 :
69 : /// <summary>
70 : /// Instantiates a new instance of <see cref="QueuedCommandBusReceiver{TAuthenticationToken}"/>.
71 : /// </summary>
72 1 : protected QueuedCommandBusReceiver(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IConfigurationManager configurationManager, IBusHelper busHelper)
73 : {
74 : QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>>();
75 : QueueTrackerLock = new ReaderWriterLockSlim();
76 : AuthenticationTokenHelper = authenticationTokenHelper;
77 : CorrelationIdHelper = correlationIdHelper;
78 : Logger = logger;
79 : ConfigurationManager = configurationManager;
80 : BusHelper = busHelper;
81 : }
82 :
83 : /// <summary>
84 : /// Places the provided <paramref name="command"/> into the appropriate queue in the <see cref="QueueTracker"/>.
85 : /// </summary>
86 : /// <param name="targetQueueName">The name of the target queue to place the command into</param>
87 : /// <param name="command">The <see cref="ICommand{TAuthenticationToken}"/> to handle.</param>
88 1 : protected virtual void EnqueueCommand(string targetQueueName, ICommand<TAuthenticationToken> command)
89 : {
90 : var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
91 : queue.Enqueue(command);
92 : }
93 :
94 : /// <summary>
95 : /// Checks if the queue exists, if it doesn't it creates a new queue in <see cref="QueueTracker"/> and then starts a separate <see cref="Thread"/> running <see cref="DequeuAndProcessCommand"/>.
96 : /// </summary>
97 : /// <param name="queueName">The name of the queue.</param>
98 1 : protected virtual void CreateQueueAndAttachListenerIfNotExist(string queueName)
99 : {
100 : if (!QueueTracker.ContainsKey(queueName))
101 : {
102 : QueueTrackerLock.EnterWriteLock();
103 : try
104 : {
105 : if (!QueueTracker.ContainsKey(queueName))
106 : {
107 : QueueTracker.TryAdd(queueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
108 : new Thread(() =>
109 : {
110 : Thread.CurrentThread.Name = queueName;
111 : DequeuAndProcessCommand(queueName);
112 : }).Start();
113 : }
114 : }
115 : catch (Exception exception)
116 : {
117 : Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
118 : }
119 : finally
120 : {
121 : QueueTrackerLock.ExitWriteLock();
122 : }
123 : }
124 : }
125 :
126 : /// <summary>
127 : /// Infinitely runs a loop checking if the queue exists in <see cref="QueueTracker"/>
128 : /// and then dequeues <see cref="ICommand{TAuthenticationToken}"/> one at a time, pausing for 0.1 seconds between loops.
129 : /// </summary>
130 : /// <param name="queueName">The name of the queue.</param>
131 1 : protected virtual void DequeuAndProcessCommand(string queueName)
132 : {
133 : long loop = long.MinValue;
134 : while (true)
135 : {
136 : try
137 : {
138 : ConcurrentQueue<ICommand<TAuthenticationToken>> queue;
139 : if (QueueTracker.TryGetValue(queueName, out queue))
140 : {
141 : while (!queue.IsEmpty)
142 : {
143 : ICommand<TAuthenticationToken> command;
144 : if (queue.TryDequeue(out command))
145 : {
146 : try
147 : {
148 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
149 : }
150 : catch (Exception exception)
151 : {
152 : Logger.LogError(string.Format("Trying to set the CorrelationId from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
153 : }
154 : try
155 : {
156 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
157 : }
158 : catch (Exception exception)
159 : {
160 : Logger.LogError(string.Format("Trying to set the AuthenticationToken from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
161 : }
162 : try
163 : {
164 : ReceiveCommand(command);
165 : }
166 : catch (Exception exception)
167 : {
168 : Logger.LogError(string.Format("Processing the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
169 : queue.Enqueue(command);
170 : }
171 : }
172 : else
173 : Logger.LogDebug(string.Format("Trying to dequeue a command from the queue '{0}' failed.", queueName));
174 : }
175 : }
176 : else
177 : Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
178 :
179 : if (loop++ % 5 == 0)
180 : Thread.Yield();
181 : else
182 : Thread.Sleep(100);
183 : if (loop == long.MaxValue)
184 : loop = long.MinValue;
185 : }
186 : catch (Exception exception)
187 : {
188 : Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
189 : }
190 : }
191 : }
192 :
193 : /// <summary>
194 : /// The current number of queues in <see cref="QueueTracker"/>.
195 : /// </summary>
196 : public int QueueCount
197 : {
198 : get
199 : {
200 : QueueTrackerLock.EnterReadLock();
201 : try
202 : {
203 : return QueueTracker.Count;
204 : }
205 : finally
206 : {
207 : QueueTrackerLock.ExitReadLock();
208 : }
209 : }
210 : }
211 :
212 : /// <summary>
213 : /// Gets the names of all queues in <see cref="QueueTracker"/>.
214 : /// </summary>
215 : public ICollection<string> QueueNames
216 : {
217 : get
218 : {
219 : QueueTrackerLock.EnterReadLock();
220 : try
221 : {
222 : return QueueTracker.Keys;
223 : }
224 : finally
225 : {
226 : QueueTrackerLock.ExitReadLock();
227 : }
228 : }
229 : }
230 :
231 : #region Implementation of ICommandReceiver<TAuthenticationToken>
232 :
233 : /// <summary>
234 : /// Receives a <see cref="ICommand{TAuthenticationToken}"/> from the command bus.
235 : /// </summary>
236 1 : public virtual bool? ReceiveCommand(ICommand<TAuthenticationToken> command)
237 : {
238 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
239 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
240 :
241 : Type commandType = command.GetType();
242 :
243 : bool response = true;
244 : bool isRequired = BusHelper.IsEventRequired(commandType);
245 :
246 : IList<Action<IMessage>> handlers;
247 : if (Routes.TryGetValue(commandType, out handlers))
248 : {
249 : if (handlers != null)
250 : {
251 : if (handlers.Count != 1)
252 : throw new MultipleCommandHandlersRegisteredException(commandType);
253 : if (handlers.Count == 1)
254 : handlers.Single()(command);
255 : else if (isRequired)
256 : throw new NoCommandHandlerRegisteredException(commandType);
257 : else
258 : response = false;
259 : }
260 : else if (isRequired)
261 : throw new NoCommandHandlerRegisteredException(commandType);
262 : else
263 : response = false;
264 : }
265 : else if (isRequired)
266 : throw new NoCommandHandlerRegisteredException(commandType);
267 : else
268 : response = false;
269 : return response;
270 : }
271 :
272 : #endregion
273 :
274 : #region Implementation of ICommandReceiver
275 :
276 : /// <summary>
277 : /// Starts listening and processing instances of <see cref="ICommand{TAuthenticationToken}"/> from the command bus.
278 : /// </summary>
279 1 : public abstract void Start();
280 :
281 : #endregion
282 : }
283 : }
|