Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Threading;
13 : using cdmdotnet.Logging;
14 : using Cqrs.Authentication;
15 : using Cqrs.Bus;
16 : using Cqrs.Commands;
17 : using Cqrs.Configuration;
18 : using Microsoft.ServiceBus.Messaging;
19 : using SpinWait = Cqrs.Infrastructure.SpinWait;
20 :
21 : namespace Cqrs.Azure.ServiceBus
22 : {
23 : public class AzureQueuedCommandBusReceiver<TAuthenticationToken> : AzureCommandBusReceiver<TAuthenticationToken>
24 0 : {
25 : protected static ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>> QueueTracker { get; private set; }
26 :
27 : protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
28 :
29 0 : public AzureQueuedCommandBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper)
30 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, busHelper)
31 : {
32 : QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>>();
33 : QueueTrackerLock = new ReaderWriterLockSlim();
34 : }
35 :
36 0 : protected override void ReceiveCommand(BrokeredMessage message)
37 : {
38 : try
39 : {
40 : Logger.LogDebug(string.Format("A command message arrived with the id '{0}'.", message.MessageId));
41 : string messageBody = message.GetBody<string>();
42 : ICommand<TAuthenticationToken> command = MessageSerialiser.DeserialiseCommand(messageBody);
43 :
44 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
45 : Logger.LogInfo(string.Format("A command message arrived with the id '{0}' was of type {1}.", message.MessageId, command.GetType().FullName));
46 :
47 : Type commandType = command.GetType();
48 :
49 : string targetQueueName = commandType.FullName;
50 :
51 : try
52 : {
53 : object rsn = commandType.GetProperty("Rsn").GetValue(command, null);
54 : targetQueueName = string.Format("{0}.{1}", targetQueueName, rsn);
55 : }
56 : catch
57 : {
58 : Logger.LogDebug(string.Format("A command message arrived with the id '{0}' was of type {1} but with no Rsn property.", message.MessageId, commandType));
59 : // Do nothing if there is no rsn. Just use command type name
60 : }
61 :
62 : CreateQueueAndAttachListenerIfNotExist(targetQueueName);
63 : EnqueueCommand(targetQueueName, command);
64 :
65 : // remove the original message from the incoming queue
66 : message.Complete();
67 :
68 : Logger.LogDebug(string.Format("A command message arrived and was processed with the id '{0}'.", message.MessageId));
69 : }
70 : catch (Exception exception)
71 : {
72 : // Indicates a problem, unlock message in queue
73 : Logger.LogError(string.Format("A command message arrived with the id '{0}' but failed to be process.", message.MessageId), exception: exception);
74 : message.Abandon();
75 : }
76 : }
77 :
78 : private void EnqueueCommand(string targetQueueName, ICommand<TAuthenticationToken> command)
79 : {
80 : var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
81 : queue.Enqueue(command);
82 : }
83 :
84 0 : protected void CreateQueueAndAttachListenerIfNotExist(string queueName)
85 : {
86 : if (!QueueTracker.ContainsKey(queueName))
87 : {
88 : QueueTrackerLock.EnterWriteLock();
89 : try
90 : {
91 : if (!QueueTracker.ContainsKey(queueName))
92 : {
93 : QueueTracker.TryAdd(queueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
94 : new Thread(() =>
95 : {
96 : Thread.CurrentThread.Name = queueName;
97 : DequeuAndProcessCommand(queueName);
98 : }).Start();
99 : }
100 : }
101 : catch (Exception exception)
102 : {
103 : Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
104 : }
105 : finally
106 : {
107 : QueueTrackerLock.ExitWriteLock();
108 : }
109 : }
110 : }
111 :
112 0 : protected void DequeuAndProcessCommand(string queueName)
113 : {
114 : SpinWait.SpinUntil
115 : (
116 : () =>
117 : {
118 : try
119 : {
120 : ConcurrentQueue<ICommand<TAuthenticationToken>> queue;
121 : if (QueueTracker.TryGetValue(queueName, out queue))
122 : {
123 : while (!queue.IsEmpty)
124 : {
125 : ICommand<TAuthenticationToken> command;
126 : if (queue.TryDequeue(out command))
127 : {
128 : try
129 : {
130 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
131 : }
132 : catch (Exception exception)
133 : {
134 : Logger.LogError(string.Format("Trying to set the CorrelationId from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
135 : }
136 : try
137 : {
138 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
139 : }
140 : catch (Exception exception)
141 : {
142 : Logger.LogError(string.Format("Trying to set the AuthenticationToken from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
143 : }
144 : try
145 : {
146 : ReceiveCommand(command);
147 : }
148 : catch (Exception exception)
149 : {
150 : Logger.LogError(string.Format("Processing the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
151 : queue.Enqueue(command);
152 : }
153 : }
154 : else
155 : Logger.LogDebug(string.Format("Trying to dequeue a command from the queue '{0}' failed.", queueName));
156 : }
157 : }
158 : else
159 : Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
160 :
161 : Thread.Sleep(100);
162 : }
163 : catch (Exception exception)
164 : {
165 : Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
166 : }
167 :
168 : // Always return false to keep this spinning.
169 : return false;
170 : },
171 : sleepInMilliseconds: 1000
172 : );
173 : }
174 :
175 : public int QueueCount
176 : {
177 : get
178 : {
179 : QueueTrackerLock.EnterReadLock();
180 : try
181 : {
182 : return QueueTracker.Count;
183 : }
184 : finally
185 : {
186 : QueueTrackerLock.ExitReadLock();
187 : }
188 : }
189 : }
190 :
191 : public ICollection<string> QueueNames
192 : {
193 : get
194 : {
195 : QueueTrackerLock.EnterReadLock();
196 : try
197 : {
198 : return QueueTracker.Keys;
199 : }
200 : finally
201 : {
202 : QueueTrackerLock.ExitReadLock();
203 : }
204 : }
205 : }
206 : }
207 : }
|