Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Concurrent;
11 : using System.Collections.Generic;
12 : using System.Threading;
13 : using cdmdotnet.Logging;
14 : using Cqrs.Authentication;
15 : using Cqrs.Bus;
16 : using Cqrs.Configuration;
17 : using Cqrs.Events;
18 : using Microsoft.ServiceBus.Messaging;
19 : using SpinWait = Cqrs.Infrastructure.SpinWait;
20 :
21 : namespace Cqrs.Azure.ServiceBus
22 : {
23 : public class AzureQueuedEventBusReceiver<TAuthenticationToken> : AzureEventBusReceiver<TAuthenticationToken>
24 0 : {
25 : protected static ConcurrentDictionary<string, ConcurrentQueue<IEvent<TAuthenticationToken>>> QueueTracker { get; private set; }
26 :
27 : protected ReaderWriterLockSlim QueueTrackerLock { get; private set; }
28 :
29 0 : public AzureQueuedEventBusReceiver(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper)
30 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, busHelper)
31 : {
32 : QueueTracker = new ConcurrentDictionary<string, ConcurrentQueue<IEvent<TAuthenticationToken>>>();
33 : QueueTrackerLock = new ReaderWriterLockSlim();
34 : }
35 :
36 0 : protected override void ReceiveEvent(BrokeredMessage message)
37 : {
38 : try
39 : {
40 : Logger.LogDebug(string.Format("An event message arrived with the id '{0}'.", message.MessageId));
41 : string messageBody = message.GetBody<string>();
42 : IEvent<TAuthenticationToken> @event = MessageSerialiser.DeserialiseEvent(messageBody);
43 :
44 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
45 : Logger.LogInfo(string.Format("An event message arrived with the id '{0}' was of type {1}.", message.MessageId, @event.GetType().FullName));
46 :
47 : Type eventType = @event.GetType();
48 :
49 : string targetQueueName = eventType.FullName;
50 :
51 : try
52 : {
53 : object rsn = eventType.GetProperty("Rsn").GetValue(@event, null);
54 : targetQueueName = string.Format("{0}.{1}", targetQueueName, rsn);
55 : }
56 : catch
57 : {
58 : Logger.LogDebug(string.Format("An event message arrived with the id '{0}' was of type {1} but with no Rsn property.", message.MessageId, eventType));
59 : // Do nothing if there is no rsn. Just use @event type name
60 : }
61 :
62 : CreateQueueAndAttachListenerIfNotExist(targetQueueName);
63 : EnqueueEvent(targetQueueName, @event);
64 :
65 : // remove the original message from the incoming queue
66 : message.Complete();
67 :
68 : Logger.LogDebug(string.Format("An event message arrived and was processed with the id '{0}'.", message.MessageId));
69 : }
70 : catch (Exception exception)
71 : {
72 : // Indicates a problem, unlock message in queue
73 : Logger.LogError(string.Format("An event message arrived with the id '{0}' but failed to be process.", message.MessageId), exception: exception);
74 : message.Abandon();
75 : }
76 : }
77 :
78 : private void EnqueueEvent(string targetQueueName, IEvent<TAuthenticationToken> @event)
79 : {
80 : var queue = QueueTracker.GetOrAdd(targetQueueName, new ConcurrentQueue<IEvent<TAuthenticationToken>>());
81 : queue.Enqueue(@event);
82 : }
83 :
84 0 : protected void CreateQueueAndAttachListenerIfNotExist(string queueName)
85 : {
86 : if (!QueueTracker.ContainsKey(queueName))
87 : {
88 : QueueTrackerLock.EnterWriteLock();
89 : try
90 : {
91 : if (!QueueTracker.ContainsKey(queueName))
92 : {
93 : QueueTracker.TryAdd(queueName, new ConcurrentQueue<IEvent<TAuthenticationToken>>());
94 : new Thread(() =>
95 : {
96 : Thread.CurrentThread.Name = queueName;
97 : DequeuAndProcessEvent(queueName);
98 : }).Start();
99 : }
100 : }
101 : catch (Exception exception)
102 : {
103 : Logger.LogError(string.Format("Processing a request to start a thread for the queue '{0}' failed.", queueName), exception: exception);
104 : }
105 : finally
106 : {
107 : QueueTrackerLock.ExitWriteLock();
108 : }
109 : }
110 : }
111 :
112 0 : protected void DequeuAndProcessEvent(string queueName)
113 : {
114 : SpinWait.SpinUntil
115 : (
116 : () =>
117 : {
118 : try
119 : {
120 : ConcurrentQueue<IEvent<TAuthenticationToken>> queue;
121 : if (QueueTracker.TryGetValue(queueName, out queue))
122 : {
123 : while (!queue.IsEmpty)
124 : {
125 : IEvent<TAuthenticationToken> @event;
126 : if (queue.TryDequeue(out @event))
127 : {
128 : try
129 : {
130 : CorrelationIdHelper.SetCorrelationId(@event.CorrelationId);
131 : }
132 : catch (Exception exception)
133 : {
134 : Logger.LogError(string.Format("Trying to set the CorrelationId from the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
135 : }
136 : try
137 : {
138 : AuthenticationTokenHelper.SetAuthenticationToken(@event.AuthenticationToken);
139 : }
140 : catch (Exception exception)
141 : {
142 : Logger.LogError(string.Format("Trying to set the AuthenticationToken from the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
143 : }
144 : try
145 : {
146 : ReceiveEvent(@event);
147 : }
148 : catch (Exception exception)
149 : {
150 : Logger.LogError(string.Format("Processing the event type {1} for a request for the queue '{0}' failed.", queueName, @event.GetType()), exception: exception);
151 : queue.Enqueue(@event);
152 : }
153 : }
154 : else
155 : Logger.LogDebug(string.Format("Trying to dequeue a event from the queue '{0}' failed.", queueName));
156 : }
157 : }
158 : else
159 : Logger.LogDebug(string.Format("Trying to find the queue '{0}' failed.", queueName));
160 : Thread.Sleep(100);
161 : }
162 : catch (Exception exception)
163 : {
164 : Logger.LogError(string.Format("Dequeuing and processing a request for the queue '{0}' failed.", queueName), exception: exception);
165 : }
166 :
167 : // Always return false to keep this spinning.
168 : return false;
169 : },
170 : sleepInMilliseconds: 1000
171 : );
172 : }
173 :
174 : public int QueueCount
175 : {
176 : get
177 : {
178 : QueueTrackerLock.EnterReadLock();
179 : try
180 : {
181 : return QueueTracker.Count;
182 : }
183 : finally
184 : {
185 : QueueTrackerLock.ExitReadLock();
186 : }
187 : }
188 : }
189 :
190 : public ICollection<string> QueueNames
191 : {
192 : get
193 : {
194 : QueueTrackerLock.EnterReadLock();
195 : try
196 : {
197 : return QueueTracker.Keys;
198 : }
199 : finally
200 : {
201 : QueueTrackerLock.ExitReadLock();
202 : }
203 : }
204 : }
205 : }
206 : }
|