Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Text;
13 : using System.Threading;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Commands;
17 : using Cqrs.Configuration;
18 : using Microsoft.ServiceBus.Messaging;
19 : using SpinWait = Cqrs.Infrastructure.SpinWait;
20 :
21 : namespace Cqrs.Azure.ServiceBus
22 : {
23 : public class AzureQueuedCommandBusReceiver<TAuthenticationToken> : AzureCommandBusReceiver<TAuthenticationToken>
24 : {
25 : protected static ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>> QueueTracker { get; private set; }
26 :
27 : protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
28 :
29 0 : public AzureQueuedCommandBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
30 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper)
31 : {
32 : QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<ICommand<TAuthenticationToken>>>();
33 : QueueTrackerLock = new ReaderWriterLockSlim();
34 : }
35 :
36 0 : protected override void ReceiveCommand(PartitionContext context, EventData eventData)
37 : {
38 : // Do a manual 10 try attempt with back-off
39 : for (int i = 0; i < 10; i++)
40 : {
41 : try
42 : {
43 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
44 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
45 : ICommand<TAuthenticationToken> command = MessageSerialiser.DeserialiseCommand(messageBody);
46 :
47 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
48 : Logger.LogInfo(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3}.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset, command.GetType().FullName));
49 :
50 : Type commandType = command.GetType();
51 :
52 : string targetQueueName = commandType.FullName;
53 :
54 : try
55 : {
56 : object rsn = commandType.GetProperty("Rsn").GetValue(command, null);
57 : targetQueueName = string.Format("{0}.{1}", targetQueueName, rsn);
58 : }
59 : catch
60 : {
61 : Logger.LogDebug(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3} but with no Rsn property.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset, commandType));
62 : // Do nothing if there is no rsn. Just use command type name
63 : }
64 :
65 : CreateQueueAndAttachListenerIfNotExist(targetQueueName);
66 : EnqueueCommand(targetQueueName, command);
67 :
68 : // remove the original message from the incoming queue
69 : context.CheckpointAsync(eventData);
70 :
71 : Logger.LogDebug(string.Format("A command message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
72 : return;
73 : }
74 : catch (Exception exception)
75 : {
76 : // Indicates a problem, unlock message in queue
77 : Logger.LogError(string.Format("A command message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
78 :
79 : switch (i)
80 : {
81 : case 0:
82 : case 1:
83 : // 10 seconds
84 : Thread.Sleep(10 * 1000);
85 : break;
86 : case 2:
87 : case 3:
88 : // 30 seconds
89 : Thread.Sleep(30 * 1000);
90 : break;
91 : case 4:
92 : case 5:
93 : case 6:
94 : // 1 minute
95 : Thread.Sleep(60 * 1000);
96 : break;
97 : case 7:
98 : case 8:
99 : case 9:
100 : // 3 minutes
101 : Thread.Sleep(3 * 60 * 1000);
102 : break;
103 : }
104 : }
105 : }
106 : // Eventually just accept it
107 : context.CheckpointAsync(eventData);
108 : }
109 :
110 : private void EnqueueCommand(string targetQueueName, ICommand<TAuthenticationToken> command)
111 : {
112 : var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
113 : queue.Enqueue(command);
114 : }
115 :
116 0 : protected void CreateQueueAndAttachListenerIfNotExist(string queueName)
117 : {
118 : if (!QueueTracker.ContainsKey(queueName))
119 : {
120 : QueueTrackerLock.EnterWriteLock();
121 : try
122 : {
123 : if (!QueueTracker.ContainsKey(queueName))
124 : {
125 : QueueTracker.TryAdd(queueName, new ConcurrentQueue<ICommand<TAuthenticationToken>>());
126 : new Thread(() =>
127 : {
128 : Thread.CurrentThread.Name = queueName;
129 : DequeuAndProcessCommand(queueName);
130 : }).Start();
131 : }
132 : }
133 : catch (Exception exception)
134 : {
135 : Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
136 : }
137 : finally
138 : {
139 : QueueTrackerLock.ExitWriteLock();
140 : }
141 : }
142 : }
143 :
144 0 : protected void DequeuAndProcessCommand(string queueName)
145 : {
146 : SpinWait.SpinUntil
147 : (
148 : () =>
149 : {
150 : try
151 : {
152 : ConcurrentQueue<ICommand<TAuthenticationToken>> queue;
153 : if (QueueTracker.TryGetValue(queueName, out queue))
154 : {
155 : while (!queue.IsEmpty)
156 : {
157 : ICommand<TAuthenticationToken> command;
158 : if (queue.TryDequeue(out command))
159 : {
160 : try
161 : {
162 : CorrelationIdHelper.SetCorrelationId(command.CorrelationId);
163 : }
164 : catch (Exception exception)
165 : {
166 : Logger.LogError(string.Format("Trying to set the CorrelationId from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
167 : }
168 : try
169 : {
170 : AuthenticationTokenHelper.SetAuthenticationToken(command.AuthenticationToken);
171 : }
172 : catch (Exception exception)
173 : {
174 : Logger.LogError(string.Format("Trying to set the AuthenticationToken from the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
175 : }
176 : try
177 : {
178 : ReceiveCommand(command);
179 : }
180 : catch (Exception exception)
181 : {
182 : Logger.LogError(string.Format("Processing the command type {1} for a request for the queue '{0}' failed.", queueName, command.GetType()), exception: exception);
183 : queue.Enqueue(command);
184 : }
185 : }
186 : else
187 : Logger.LogDebug(string.Format("Trying to dequeue a command from the queue '{0}' failed.", queueName));
188 : }
189 : }
190 : else
191 : Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
192 : Thread.Sleep(100);
193 : }
194 : catch (Exception exception)
195 : {
196 : Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
197 : }
198 :
199 : // Always return false to keep this spinning.
200 : return false;
201 : },
202 : sleepInMilliseconds: 1000
203 : );
204 : }
205 :
206 : public int QueueCount
207 : {
208 : get
209 : {
210 : QueueTrackerLock.EnterReadLock();
211 : try
212 : {
213 : return QueueTracker.Count;
214 : }
215 : finally
216 : {
217 : QueueTrackerLock.ExitReadLock();
218 : }
219 : }
220 : }
221 :
222 : public ICollection<string> QueueNames
223 : {
224 : get
225 : {
226 : QueueTrackerLock.EnterReadLock();
227 : try
228 : {
229 : return QueueTracker.Keys;
230 : }
231 : finally
232 : {
233 : QueueTrackerLock.ExitReadLock();
234 : }
235 : }
236 : }
237 : }
238 : }
|