Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Threading;
13 : using cdmdotnet.Logging;
14 : using Cqrs.Authentication;
15 : using Cqrs.Bus;
16 : using Cqrs.Configuration;
17 : using Cqrs.Events;
18 : using Microsoft.ServiceBus.Messaging;
19 : using SpinWait = Cqrs.Infrastructure.SpinWait;
20 :
21 : namespace Cqrs.Azure.ServiceBus
22 : {
23 : /// <summary>
24 : /// A concurrent implementation of <see cref="AzureEventBusReceiver{TAuthenticationToken}"/> that resides in memory.
25 : /// </summary>
26 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
27 : public class AzureQueuedEventBusReceiver<TAuthenticationToken> : AzureEventBusReceiver<TAuthenticationToken>
28 2 : {
29 : /// <summary>
30 : /// Tracks all queues.
31 : /// </summary>
32 : protected static ConcurrentDictionary<string, ConcurrentQueue<IEvent<TAuthenticationToken>>> QueueTracker { get; private set; }
33 :
34 : /// <summary>
35 : /// Gets the <see cref="ReaderWriterLockSlim"/>.
36 : /// </summary>
37 : protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
38 :
39 : /// <summary>
40 : /// Instantiates a new instance of <see cref="AzureQueuedEventBusReceiver{TAuthenticationToken}"/>.
41 : /// </summary>
42 2 : public AzureQueuedEventBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper)
43 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, busHelper)
44 : {
45 : QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<IEvent<TAuthenticationToken>>>();
46 : QueueTrackerLock = new ReaderWriterLockSlim();
47 : }
48 :
49 : /// <summary>
50 : /// Receives a <see cref="BrokeredMessage"/> from the event bus, identifies a key and queues it accordingly.
51 : /// </summary>
52 2 : protected override void ReceiveEvent(BrokeredMessage message)
53 : {
54 : try
55 : {
56 : Logger.LogDebug(string.Format("An event message arrived with the id '{0}'.", message.MessageId));
57 : string messageBody = message.GetBody<string>();
58 : IEvent<TAuthenticationToken> @event = MessageSerialiser.DeserialiseEvent(messageBody);
59 :
60 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
61 : Logger.LogInfo(string.Format("An event message arrived with the id '{0}' was of type {1}.", message.MessageId, @event.GetType().FullName));
62 :
63 : Type eventType = @event.GetType();
64 :
65 : string targetQueueName = eventType.FullName;
66 :
67 : try
68 : {
69 : object rsn = eventType.GetProperty("Rsn").GetValue(@event, null);
70 : targetQueueName = string.Format("{0}.{1}", targetQueueName, rsn);
71 : }
72 : catch
73 : {
74 : Logger.LogDebug(string.Format("An event message arrived with the id '{0}' was of type {1} but with no Rsn property.", message.MessageId, eventType));
75 : // Do nothing if there is no rsn. Just use @event type name
76 : }
77 :
78 : CreateQueueAndAttachListenerIfNotExist(targetQueueName);
79 : EnqueueEvent(targetQueueName, @event);
80 :
81 : // remove the original message from the incoming queue
82 : message.Complete();
83 :
84 : Logger.LogDebug(string.Format("An event message arrived and was processed with the id '{0}'.", message.MessageId));
85 : }
86 : catch (Exception exception)
87 : {
88 : // Indicates a problem, unlock message in queue
89 : Logger.LogError(string.Format("An event message arrived with the id '{0}' but failed to be process.", message.MessageId), exception: exception);
90 : message.Abandon();
91 : }
92 : }
93 :
94 : /// <summary>
95 : /// Adds the provided <paramref name="event"/> to the <see cref="QueueTracker"/> of the queue <paramref name="targetQueueName"/>.
96 : /// </summary>
97 : private void EnqueueEvent(string targetQueueName, IEvent<TAuthenticationToken> @event)
98 : {
99 : var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<IEvent<TAuthenticationToken>>());
100 : queue.Enqueue(@event);
101 : }
102 :
103 : /// <summary>
104 : /// Creates the queue of the name <paramref name="queueName"/> if it does not already exist,
105 : /// the queue is attached to <see cref="DequeuAndProcessEvent"/> using a <see cref="Thread"/>.
106 : /// </summary>
107 : /// <param name="queueName">The name of the queue to check and create.</param>
108 2 : protected void CreateQueueAndAttachListenerIfNotExist(string queueName)
109 : {
110 : if (!QueueTracker.ContainsKey(queueName))
111 : {
112 : QueueTrackerLock.EnterWriteLock();
113 : try
114 : {
115 : if (!QueueTracker.ContainsKey(queueName))
116 : {
117 : QueueTracker.TryAdd(queueName, new ConcurrentQueue<IEvent<TAuthenticationToken>>());
118 : new Thread(() =>
119 : {
120 : Thread.CurrentThread.Name = queueName;
121 : DequeuAndProcessEvent(queueName);
122 : }).Start();
123 : }
124 : }
125 : catch (Exception exception)
126 : {
127 : Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
128 : }
129 : finally
130 : {
131 : QueueTrackerLock.ExitWriteLock();
132 : }
133 : }
134 : }
135 :
136 : /// <summary>
137 : /// Takes an <see cref="IEvent{TAuthenticationToken}"/> off the queue of <paramref name="queueName"/>
138 : /// and calls <see cref="ReceiveEvent"/>. Repeats in a loop until the queue is empty.
139 : /// </summary>
140 : /// <param name="queueName">The name of the queue process.</param>
141 2 : protected void DequeuAndProcessEvent(string queueName)
142 : {
143 : SpinWait.SpinUntil
144 : (
145 : () =>
146 : {
147 : try
148 : {
149 : ConcurrentQueue<IEvent<TAuthenticationToken>> queue;
150 : if (QueueTracker.TryGetValue(queueName, out queue))
151 : {
152 : while (!queue.IsEmpty)
153 : {
154 : IEvent<TAuthenticationToken> @event;
155 : if (queue.TryDequeue(out @event))
156 : {
157 : try
158 : {
159 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
160 : }
161 : catch (Exception exception)
162 : {
163 : Logger.LogError(string.Format("Trying to set the CorrelationId from the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
164 : }
165 : try
166 : {
167 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
168 : }
169 : catch (Exception exception)
170 : {
171 : Logger.LogError(string.Format("Trying to set the AuthenticationToken from the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
172 : }
173 : try
174 : {
175 : ReceiveEvent(@event);
176 : }
177 : catch (Exception exception)
178 : {
179 : Logger.LogError(string.Format("Processing the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
180 : queue.Enqueue(@event);
181 : }
182 : }
183 : else
184 : Logger.LogDebug(string.Format("Trying to dequeue a event from the queue '{0}' failed.", queueName));
185 : }
186 : }
187 : else
188 : Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
189 : Thread.Sleep(100);
190 : }
191 : catch (Exception exception)
192 : {
193 : Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
194 : }
195 :
196 : // Always return false to keep this spinning.
197 : return false;
198 : },
199 : sleepInMilliseconds: 1000
200 : );
201 : }
202 :
203 : /// <summary>
204 : /// The number of queues currently known.
205 : /// </summary>
206 : public int QueueCount
207 : {
208 : get
209 : {
210 : QueueTrackerLock.EnterReadLock();
211 : try
212 : {
213 : return QueueTracker.Count;
214 : }
215 : finally
216 : {
217 : QueueTrackerLock.ExitReadLock();
218 : }
219 : }
220 : }
221 :
222 : /// <summary>
223 : /// The name of all currently known queues.
224 : /// </summary>
225 : public ICollection<string> QueueNames
226 : {
227 : get
228 : {
229 : QueueTrackerLock.EnterReadLock();
230 : try
231 : {
232 : return QueueTracker.Keys;
233 : }
234 : finally
235 : {
236 : QueueTrackerLock.ExitReadLock();
237 : }
238 : }
239 : }
240 : }
241 : }
|