Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Text;
13 : using System.Threading;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Configuration;
17 : using Cqrs.Events;
18 : using Microsoft.ServiceBus.Messaging;
19 : using EventData = Microsoft.ServiceBus.Messaging.EventData;
20 : using SpinWait = Cqrs.Infrastructure.SpinWait;
21 :
22 : namespace Cqrs.Azure.ServiceBus
23 : {
24 : public class AzureQueuedEventBusReceiver<TAuthenticationToken> : AzureEventBusReceiver<TAuthenticationToken>
25 : {
26 : protected static ConcurrentDictionary<string, ConcurrentQueue<IEvent<TAuthenticationToken>>> QueueTracker { get; private set; }
27 :
28 : protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
29 :
30 0 : public AzureQueuedEventBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
31 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper)
32 : {
33 : QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<IEvent<TAuthenticationToken>>>();
34 : QueueTrackerLock = new ReaderWriterLockSlim();
35 : }
36 :
37 0 : protected override void ReceiveEvent(PartitionContext context, EventData eventData)
38 : {
39 : // Do a manual 10 try attempt with back-off
40 : for (int i = 0; i < 10; i++)
41 : {
42 : try
43 : {
44 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
45 : string messageBody = Encoding.UTF8.GetString(eventData.GetBytes());
46 : IEvent<TAuthenticationToken> @event = MessageSerialiser.DeserialiseEvent(messageBody);
47 :
48 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
49 : Logger.LogInfo(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3}.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset, @event.GetType().FullName));
50 :
51 : Type eventType = @event.GetType();
52 :
53 : string targetQueueName = eventType.FullName;
54 :
55 : try
56 : {
57 : object rsn = eventType.GetProperty("Rsn").GetValue(@event, null);
58 : targetQueueName = string.Format("{0}.{1}", targetQueueName, rsn);
59 : }
60 : catch
61 : {
62 : Logger.LogDebug(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' was of type {3} but with no Rsn property.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset, eventType));
63 : // Do nothing if there is no rsn. Just use @event type name
64 : }
65 :
66 : CreateQueueAndAttachListenerIfNotExist(targetQueueName);
67 : EnqueueEvent(targetQueueName, @event);
68 :
69 : // remove the original message from the incoming queue
70 : context.CheckpointAsync(eventData);
71 :
72 : Logger.LogDebug(string.Format("An event message arrived and was processed with the partition key '{0}', sequence number '{1}' and offset '{2}'.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset));
73 : return;
74 : }
75 : catch (Exception exception)
76 : {
77 : // Indicates a problem, unlock message in queue
78 : Logger.LogError(string.Format("An event message arrived with the partition key '{0}', sequence number '{1}' and offset '{2}' but failed to be process.", eventData.PartitionKey, eventData.SequenceNumber, eventData.Offset), exception: exception);
79 :
80 : switch (i)
81 : {
82 : case 0:
83 : case 1:
84 : // 10 seconds
85 : Thread.Sleep(10 * 1000);
86 : break;
87 : case 2:
88 : case 3:
89 : // 30 seconds
90 : Thread.Sleep(30 * 1000);
91 : break;
92 : case 4:
93 : case 5:
94 : case 6:
95 : // 1 minute
96 : Thread.Sleep(60 * 1000);
97 : break;
98 : case 7:
99 : case 8:
100 : case 9:
101 : // 3 minutes
102 : Thread.Sleep(3 * 60 * 1000);
103 : break;
104 : }
105 : }
106 : }
107 : // Eventually just accept it
108 : context.CheckpointAsync(eventData);
109 : }
110 :
111 : private void EnqueueEvent(string targetQueueName, IEvent<TAuthenticationToken> @event)
112 : {
113 : var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<IEvent<TAuthenticationToken>>());
114 : queue.Enqueue(@event);
115 : }
116 :
117 0 : protected void CreateQueueAndAttachListenerIfNotExist(string queueName)
118 : {
119 : if (!QueueTracker.ContainsKey(queueName))
120 : {
121 : QueueTrackerLock.EnterWriteLock();
122 : try
123 : {
124 : if (!QueueTracker.ContainsKey(queueName))
125 : {
126 : QueueTracker.TryAdd(queueName, new ConcurrentQueue<IEvent<TAuthenticationToken>>());
127 : new Thread(() =>
128 : {
129 : Thread.CurrentThread.Name = queueName;
130 : DequeuAndProcessEvent(queueName);
131 : }).Start();
132 : }
133 : }
134 : catch (Exception exception)
135 : {
136 : Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
137 : }
138 : finally
139 : {
140 : QueueTrackerLock.ExitWriteLock();
141 : }
142 : }
143 : }
144 :
145 0 : protected void DequeuAndProcessEvent(string queueName)
146 : {
147 : SpinWait.SpinUntil
148 : (
149 : () =>
150 : {
151 : try
152 : {
153 : ConcurrentQueue<IEvent<TAuthenticationToken>> queue;
154 : if (QueueTracker.TryGetValue(queueName, out queue))
155 : {
156 : while (!queue.IsEmpty)
157 : {
158 : IEvent<TAuthenticationToken> @event;
159 : if (queue.TryDequeue(out @event))
160 : {
161 : try
162 : {
163 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
164 : }
165 : catch (Exception exception)
166 : {
167 : Logger.LogError(string.Format("Trying to set the CorrelationId from the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
168 : }
169 : try
170 : {
171 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
172 : }
173 : catch (Exception exception)
174 : {
175 : Logger.LogError(string.Format("Trying to set the AuthenticationToken from the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
176 : }
177 : try
178 : {
179 : ReceiveEvent(@event);
180 : }
181 : catch (Exception exception)
182 : {
183 : Logger.LogError(string.Format("Processing the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
184 : queue.Enqueue(@event);
185 : }
186 : }
187 : else
188 : Logger.LogDebug(string.Format("Trying to dequeue a event from the queue '{0}' failed.", queueName));
189 : }
190 : }
191 : else
192 : Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
193 : Thread.Sleep(100);
194 : }
195 : catch (Exception exception)
196 : {
197 : Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
198 : }
199 :
200 : // Always return false to keep this spinning.
201 : return false;
202 : },
203 : sleepInMilliseconds: 1000
204 : );
205 : }
206 :
207 : public int QueueCount
208 : {
209 : get
210 : {
211 : QueueTrackerLock.EnterReadLock();
212 : try
213 : {
214 : return QueueTracker.Count;
215 : }
216 : finally
217 : {
218 : QueueTrackerLock.ExitReadLock();
219 : }
220 : }
221 : }
222 :
223 : public ICollection<string> QueueNames
224 : {
225 : get
226 : {
227 : QueueTrackerLock.EnterReadLock();
228 : try
229 : {
230 : return QueueTracker.Keys;
231 : }
232 : finally
233 : {
234 : QueueTrackerLock.ExitReadLock();
235 : }
236 : }
237 : }
238 : }
239 : }
|