Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Linq;
13 : using System.Threading;
14 : using Cqrs.Authentication;
15 : using Cqrs.Commands;
16 : using cdmdotnet.Logging;
17 : using Cqrs.Configuration;
18 : using Cqrs.Messages;
19 :
20 : namespace Cqrs.Bus
21 : {
22 : public abstract class QueuedCommandBusReceiver<TAuthenticationToken> : ICommandReceiver<TAuthenticationToken>
23 0 : {
24 : protected static ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>> QueueTracker { get; private set; }
25 :
26 : protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
27 :
28 : protected IAuthenticationTokenHelper<TAuthenticationToken> AuthenticationTokenHelper { get; private set; }
29 :
30 : protected ICorrelationIdHelper CorrelationIdHelper { get; private set; }
31 :
32 : protected ILogger Logger { get; private set; }
33 :
34 : protected IConfigurationManager ConfigurationManager { get; private set; }
35 :
36 : protected IBusHelper BusHelper { get; private set; }
37 :
38 : protected abstract IDictionary<Type, IList<Action<IMessage>>> Routes { get; }
39 :
40 0 : protected QueuedCommandBusReceiver(IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IConfigurationManager configurationManager, IBusHelper busHelper)
41 : {
42 : QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>>();
43 : QueueTrackerLock = new ReaderWriterLockSlim();
44 : AuthenticationTokenHelper = authenticationTokenHelper;
45 : CorrelationIdHelper = correlationIdHelper;
46 : Logger = logger;
47 : ConfigurationManager = configurationManager;
48 : BusHelper = busHelper;
49 : }
50 :
51 0 : protected virtual void EnqueueCommand(string targetQueueName, ICommand<TAuthenticationToken> command)
52 : {
53 : var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
54 : queue.Enqueue(command);
55 : }
56 :
57 0 : protected virtual void CreateQueueAndAttachListenerIfNotExist(string queueName)
58 : {
59 : if (!QueueTracker.ContainsKey(queueName))
60 : {
61 : QueueTrackerLock.EnterWriteLock();
62 : try
63 : {
64 : if (!QueueTracker.ContainsKey(queueName))
65 : {
66 : QueueTracker.TryAdd(queueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
67 : new Thread(() =>
68 : {
69 : Thread.CurrentThread.Name = queueName;
70 : DequeuAndProcessCommand(queueName);
71 : }).Start();
72 : }
73 : }
74 : catch (Exception exception)
75 : {
76 : Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
77 : }
78 : finally
79 : {
80 : QueueTrackerLock.ExitWriteLock();
81 : }
82 : }
83 : }
84 :
85 0 : protected virtual void DequeuAndProcessCommand(string queueName)
86 : {
87 : long loop = long.MinValue;
88 : while (true)
89 : {
90 : try
91 : {
92 : ConcurrentQueue<ICommand<TAuthenticationToken>> queue;
93 : if (QueueTracker.TryGetValue(queueName, out queue))
94 : {
95 : while (!queue.IsEmpty)
96 : {
97 : ICommand<TAuthenticationToken> command;
98 : if (queue.TryDequeue(out command))
99 : {
100 : try
101 : {
102 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
103 : }
104 : catch (Exception exception)
105 : {
106 : Logger.LogError(string.Format("Trying to set the CorrelationId from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
107 : }
108 : try
109 : {
110 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
111 : }
112 : catch (Exception exception)
113 : {
114 : Logger.LogError(string.Format("Trying to set the AuthenticationToken from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
115 : }
116 : try
117 : {
118 : ReceiveCommand(command);
119 : }
120 : catch (Exception exception)
121 : {
122 : Logger.LogError(string.Format("Processing the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
123 : queue.Enqueue(command);
124 : }
125 : }
126 : else
127 : Logger.LogDebug(string.Format("Trying to dequeue a command from the queue '{0}' failed.", queueName));
128 : }
129 : }
130 : else
131 : Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
132 :
133 : if (loop++ % 5 == 0)
134 : Thread.Yield();
135 : else
136 : Thread.Sleep(100);
137 : if (loop == long.MaxValue)
138 : loop = long.MinValue;
139 : }
140 : catch (Exception exception)
141 : {
142 : Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
143 : }
144 : }
145 : }
146 :
147 : public int QueueCount
148 : {
149 : get
150 : {
151 : QueueTrackerLock.EnterReadLock();
152 : try
153 : {
154 : return QueueTracker.Count;
155 : }
156 : finally
157 : {
158 : QueueTrackerLock.ExitReadLock();
159 : }
160 : }
161 : }
162 :
163 : public ICollection<string> QueueNames
164 : {
165 : get
166 : {
167 : QueueTrackerLock.EnterReadLock();
168 : try
169 : {
170 : return QueueTracker.Keys;
171 : }
172 : finally
173 : {
174 : QueueTrackerLock.ExitReadLock();
175 : }
176 : }
177 : }
178 :
179 0 : public virtual bool? ReceiveCommand(ICommand<TAuthenticationToken> command)
180 : {
181 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
182 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
183 :
184 : Type commandType = command.GetType();
185 :
186 : bool response = true;
187 : bool isRequired = BusHelper.IsEventRequired(commandType);
188 :
189 : IList<Action<IMessage>> handlers;
190 : if (Routes.TryGetValue(commandType, out handlers))
191 : {
192 : if (handlers != null)
193 : {
194 : if (handlers.Count != 1)
195 : throw new MultipleCommandHandlersRegisteredException(commandType);
196 : if (handlers.Count == 1)
197 : handlers.Single()(command);
198 : else if (isRequired)
199 : throw new NoCommandHandlerRegisteredException(commandType);
200 : else
201 : response = false;
202 : }
203 : else if (isRequired)
204 : throw new NoCommandHandlerRegisteredException(commandType);
205 : else
206 : response = false;
207 : }
208 : else if (isRequired)
209 : throw new NoCommandHandlerRegisteredException(commandType);
210 : else
211 : response = false;
212 : return response;
213 : }
214 :
215 : #region Implementation of ICommandReceiver
216 :
217 0 : public abstract void Start();
218 :
219 : #endregion
220 : }
221 : }
|