Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Configuration;
11 : using System.Threading.Tasks;
12 : using cdmdotnet.Logging;
13 : using Cqrs.Authentication;
14 : using Cqrs.Configuration;
15 : using Microsoft.ServiceBus;
16 : using Microsoft.ServiceBus.Messaging;
17 :
18 : namespace Cqrs.Azure.ServiceBus
19 : {
20 : /// <summary>
21 : /// An <see cref="AzureBus{TAuthenticationToken}"/> that uses Azure Service Event Hubs.
22 : /// </summary>
23 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
24 : public abstract class AzureEventHub<TAuthenticationToken> : AzureBus<TAuthenticationToken>
25 1 : {
26 : /// <summary>
27 : /// Gets the public<see cref="EventHubClient"/>.
28 : /// </summary>
29 : protected EventHubClient EventHubPublisher { get; private set; }
30 :
31 : /// <summary>
32 : /// Gets the public<see cref="EventProcessorHost"/>.
33 : /// </summary>
34 : protected EventProcessorHost EventHubReceiver { get; private set; }
35 :
36 : /// <summary>
37 : /// The name of the private event hub.
38 : /// </summary>
39 : protected string PrivateEventHubName { get; set; }
40 :
41 : /// <summary>
42 : /// The name of the public event hub.
43 : /// </summary>
44 : protected string PublicEventHubName { get; private set; }
45 :
46 : /// <summary>
47 : /// The name of the consumer group in the private event hub.
48 : /// </summary>
49 : protected string PrivateEventHubConsumerGroupName { get; private set; }
50 :
51 : /// <summary>
52 : /// The name of the consumer group in the public event hub.
53 : /// </summary>
54 : protected string PublicEventHubConsumerGroupName { get; private set; }
55 :
56 : /// <summary>
57 : /// The configuration key for the event hub connection string as used by <see cref="IConfigurationManager"/>.
58 : /// </summary>
59 : protected abstract string EventHubConnectionStringNameConfigurationKey { get; }
60 :
61 : /// <summary>
62 : /// The configuration key for the event hub storage connection string as used by <see cref="IConfigurationManager"/>.
63 : /// </summary>
64 : protected abstract string EventHubStorageConnectionStringNameConfigurationKey { get; }
65 :
66 : /// <summary>
67 : /// The configuration key for the name of the private event hub as used by <see cref="IConfigurationManager"/>.
68 : /// </summary>
69 : protected abstract string PrivateEventHubNameConfigurationKey { get; }
70 :
71 : /// <summary>
72 : /// The configuration key for the name of the public event hub as used by <see cref="IConfigurationManager"/>.
73 : /// </summary>
74 : protected abstract string PublicEventHubNameConfigurationKey { get; }
75 :
76 : /// <summary>
77 : /// The configuration key for the name of the consumer group name of the private event hub as used by <see cref="IConfigurationManager"/>.
78 : /// </summary>
79 : protected abstract string PrivateEventHubConsumerGroupNameConfigurationKey { get; }
80 :
81 : /// <summary>
82 : /// The configuration key for the name of the consumer group name of the public event hub as used by <see cref="IConfigurationManager"/>.
83 : /// </summary>
84 : protected abstract string PublicEventHubConsumerGroupNameConfigurationKey { get; }
85 :
86 : /// <summary>
87 : /// The default name of the private event hub if no <see cref="IConfigurationManager"/> value is set.
88 : /// </summary>
89 : protected abstract string DefaultPrivateEventHubName { get; }
90 :
91 : /// <summary>
92 : /// The default name of the public event hub if no <see cref="IConfigurationManager"/> value is set.
93 : /// </summary>
94 : protected abstract string DefaultPublicEventHubName { get; }
95 :
96 : /// <summary>
97 : /// The default name of the consumer group in the private event hub if no <see cref="IConfigurationManager"/> value is set.
98 : /// </summary>
99 : protected const string DefaultPrivateEventHubConsumerGroupName = "$Default";
100 :
101 : /// <summary>
102 : /// The default name of the consumer group in the public event hub if no <see cref="IConfigurationManager"/> value is set.
103 : /// </summary>
104 : protected const string DefaultPublicEventHubConsumerGroupName = "$Default";
105 :
106 : /// <summary>
107 : /// The event hub storage connection string.
108 : /// </summary>
109 : protected string StorageConnectionString { get; private set; }
110 :
111 : /// <summary>
112 : /// The <see cref="Action{PartitionContext, EventData}">handler</see> used for <see cref="EventProcessorHost.RegisterEventProcessorFactoryAsync(Microsoft.ServiceBus.Messaging.IEventProcessorFactory)"/> on <see cref="EventHubReceiver"/>.
113 : /// </summary>
114 : protected Action<PartitionContext, EventData> ReceiverMessageHandler { get; private set; }
115 :
116 : /// <summary>
117 : /// The <see cref="EventProcessorOptions" /> used for <see cref="EventProcessorHost.RegisterEventProcessorFactoryAsync(Microsoft.ServiceBus.Messaging.IEventProcessorFactory)"/> on <see cref="EventHubReceiver"/>.
118 : /// </summary>
119 : protected EventProcessorOptions ReceiverMessageHandlerOptions { get; private set; }
120 :
121 : /// <summary>
122 : /// Gets the <see cref="ITelemetryHelper"/>.
123 : /// </summary>
124 : protected ITelemetryHelper TelemetryHelper { get; set; }
125 :
126 : /// <summary>
127 : /// Instantiates a new instance of <see cref="AzureEventHub{TAuthenticationToken}"/>
128 : /// </summary>
129 1 : protected AzureEventHub(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, bool isAPublisher)
130 : : base (configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, isAPublisher)
131 : {
132 : TelemetryHelper = new NullTelemetryHelper();
133 : }
134 :
135 : #region Overrides of AzureBus<TAuthenticationToken>
136 :
137 : /// <summary>
138 : /// Gets the connection string for the bus from <see cref="AzureBus{TAuthenticationToken}.ConfigurationManager"/>
139 : /// </summary>
140 1 : protected override string GetConnectionString()
141 : {
142 : string connectionString = System.Configuration.ConfigurationManager.ConnectionStrings[ConfigurationManager.GetSetting(EventHubConnectionStringNameConfigurationKey)].ConnectionString;
143 : if (string.IsNullOrWhiteSpace(connectionString))
144 : throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and a matching connection string with the name that matches the value of the appSetting value '{0}'.", EventHubConnectionStringNameConfigurationKey));
145 : return connectionString;
146 : }
147 :
148 : /// <summary>
149 : /// Calls <see cref="AzureBus{TAuthenticationToken}.SetConnectionStrings"/>
150 : /// and then sets the required storage connection string.
151 : /// </summary>
152 1 : protected override void SetConnectionStrings()
153 : {
154 : base.SetConnectionStrings();
155 : StorageConnectionString = System.Configuration.ConfigurationManager.ConnectionStrings[ConfigurationManager.GetSetting(EventHubStorageConnectionStringNameConfigurationKey)].ConnectionString;
156 : if (string.IsNullOrWhiteSpace(StorageConnectionString))
157 : throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and a matching connection string with the name that matches the value of the appSetting value '{0}'.", EventHubStorageConnectionStringNameConfigurationKey));
158 : Logger.LogDebug(string.Format("Storage connection string settings set to {0}.", StorageConnectionString));
159 : }
160 :
161 : #endregion
162 :
163 : /// <summary>
164 : /// Instantiate publishing on this bus by
165 : /// calling <see cref="CheckPrivateHubExists"/> and <see cref="CheckPublicHubExists"/>
166 : /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
167 : /// </summary>
168 1 : protected override void InstantiatePublishing()
169 : {
170 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
171 : CheckPrivateHubExists(namespaceManager);
172 : CheckPublicHubExists(namespaceManager);
173 :
174 : EventHubPublisher = EventHubClient.CreateFromConnectionString(ConnectionString, PublicEventHubName);
175 : StartSettingsChecking();
176 : }
177 :
178 : /// <summary>
179 : /// Instantiate receiving on this bus by
180 : /// calling <see cref="CheckPrivateHubExists"/> and <see cref="CheckPublicHubExists"/>
181 : /// then InstantiateReceiving for private and public topics,
182 : /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
183 : /// </summary>
184 1 : protected override void InstantiateReceiving()
185 : {
186 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
187 :
188 : CheckPrivateHubExists(namespaceManager);
189 : CheckPublicHubExists(namespaceManager);
190 :
191 : EventHubReceiver = new EventProcessorHost(PublicEventHubName, PublicEventHubConsumerGroupName, ConnectionString, StorageConnectionString);
192 :
193 : // If this is also a publisher, then it will the check over there and that will handle this
194 : if (EventHubPublisher != null)
195 : return;
196 :
197 : StartSettingsChecking();
198 : }
199 :
200 : /// <summary>
201 : /// Checks if the private hub and consumer group name exists as per <see cref="PrivateEventHubName"/> and <see cref="PrivateEventHubConsumerGroupName"/>.
202 : /// </summary>
203 : /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
204 1 : protected virtual void CheckPrivateHubExists(NamespaceManager namespaceManager)
205 : {
206 : CheckHubExists(namespaceManager, PrivateEventHubName = ConfigurationManager.GetSetting(PrivateEventHubNameConfigurationKey) ?? DefaultPrivateEventHubName, PrivateEventHubConsumerGroupName = ConfigurationManager.GetSetting(PrivateEventHubConsumerGroupNameConfigurationKey) ?? DefaultPrivateEventHubConsumerGroupName);
207 : }
208 :
209 : /// <summary>
210 : /// Checks if the public hub and consumer group name exists as per <see cref="PublicEventHubName"/> and <see cref="PublicEventHubConsumerGroupName"/>.
211 : /// </summary>
212 : /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
213 1 : protected virtual void CheckPublicHubExists(NamespaceManager namespaceManager)
214 : {
215 : CheckHubExists(namespaceManager, PublicEventHubName = ConfigurationManager.GetSetting(PublicEventHubNameConfigurationKey) ?? DefaultPublicEventHubName, PublicEventHubConsumerGroupName = ConfigurationManager.GetSetting(PublicEventHubConsumerGroupNameConfigurationKey) ?? DefaultPublicEventHubConsumerGroupName);
216 : }
217 :
218 : /// <summary>
219 : /// Checks if a event hub by the provided <paramref name="hubName"/> exists and
220 : /// Checks if a consumer group by the provided <paramref name="consumerGroupNames"/> exists.
221 : /// </summary>
222 1 : protected virtual void CheckHubExists(NamespaceManager namespaceManager, string hubName, string consumerGroupNames)
223 : {
224 : // Configure Queue Settings
225 : var eventHubDescription = new EventHubDescription(hubName)
226 : {
227 : MessageRetentionInDays = long.MaxValue,
228 :
229 : };
230 :
231 : // Create the topic if it does not exist already
232 : namespaceManager.CreateEventHubIfNotExists(eventHubDescription);
233 :
234 : var subscriptionDescription = new SubscriptionDescription(eventHubDescription.Path, consumerGroupNames);
235 :
236 : if (!namespaceManager.SubscriptionExists(eventHubDescription.Path, consumerGroupNames))
237 : namespaceManager.CreateSubscription(subscriptionDescription);
238 : }
239 :
240 : /// <summary>
241 : /// Checks <see cref="AzureBus{TAuthenticationToken}.ValidateSettingsHaveChanged"/>
242 : /// and that <see cref="StorageConnectionString"/> have changed.
243 : /// </summary>
244 : /// <returns></returns>
245 1 : protected override bool ValidateSettingsHaveChanged()
246 : {
247 : return base.ValidateSettingsHaveChanged()
248 : ||
249 : StorageConnectionString != ConfigurationManager.GetSetting(EventHubStorageConnectionStringNameConfigurationKey);
250 : }
251 :
252 : /// <summary>
253 : /// Triggers settings checking on <see cref="EventHubPublisher"/> and <see cref="EventHubReceiver"/>,
254 : /// then calls <see cref="InstantiateReceiving"/> and <see cref="InstantiatePublishing"/>.
255 : /// </summary>
256 1 : protected override void TriggerSettingsChecking()
257 : {
258 : // Let's wrap up using this event hub and start the switch
259 : if (EventHubPublisher != null)
260 : {
261 : EventHubPublisher.Close();
262 : Logger.LogDebug("Publishing event hub closed.");
263 : }
264 : // Let's wrap up using this event hub and start the switch
265 : if (EventHubReceiver != null)
266 : {
267 : Task work = EventHubReceiver.UnregisterEventProcessorAsync();
268 : work.ConfigureAwait(false);
269 : work.Wait();
270 : Logger.LogDebug("Receiving event hub closed.");
271 : }
272 : // Restart configuration, we order this intentionally with the receiver first as if this triggers the cancellation we know this isn't a publisher as well
273 : if (EventHubReceiver != null)
274 : {
275 : Logger.LogDebug("Recursively calling into InstantiateReceiving.");
276 : InstantiateReceiving();
277 :
278 : // This will be the case of a connection setting change re-connection
279 : if (ReceiverMessageHandler != null && ReceiverMessageHandlerOptions != null)
280 : {
281 : // Callback to handle received messages
282 : Logger.LogDebug("Re-registering onMessage handler.");
283 : ApplyReceiverMessageHandler();
284 : }
285 : else
286 : Logger.LogWarning("No onMessage handler was found to re-bind.");
287 : }
288 : // Restart configuration, we order this intentionally with the publisher second as if this triggers the cancellation there's nothing else to process here
289 : if (EventHubPublisher != null)
290 : {
291 : Logger.LogDebug("Recursively calling into InstantiatePublishing.");
292 : InstantiatePublishing();
293 : }
294 : }
295 :
296 : /// <summary>
297 : /// Registers the provided <paramref name="receiverMessageHandler"/> with the provided <paramref name="receiverMessageHandlerOptions"/>.
298 : /// </summary>
299 1 : protected virtual void RegisterReceiverMessageHandler(Action<PartitionContext, EventData> receiverMessageHandler, EventProcessorOptions receiverMessageHandlerOptions = null)
300 : {
301 : StoreReceiverMessageHandler(receiverMessageHandler, receiverMessageHandlerOptions);
302 :
303 : ApplyReceiverMessageHandler();
304 : }
305 :
306 : /// <summary>
307 : /// Stores the provided <paramref name="receiverMessageHandler"/> and <paramref name="receiverMessageHandlerOptions"/>.
308 : /// </summary>
309 1 : protected virtual void StoreReceiverMessageHandler(Action<PartitionContext, EventData> receiverMessageHandler, EventProcessorOptions receiverMessageHandlerOptions = null)
310 : {
311 : ReceiverMessageHandler = receiverMessageHandler;
312 : ReceiverMessageHandlerOptions = receiverMessageHandlerOptions;
313 : }
314 :
315 : /// <summary>
316 : /// Applies the stored ReceiverMessageHandler and ReceiverMessageHandlerOptions to the <see cref="EventHubReceiver"/>.
317 : /// </summary>
318 1 : protected override void ApplyReceiverMessageHandler()
319 : {
320 : EventHubReceiver.RegisterEventProcessorFactoryAsync
321 : (
322 : new DefaultEventProcessorFactory<DefaultEventProcessor>
323 : (
324 : new DefaultEventProcessor(Logger, ReceiverMessageHandler)
325 : ),
326 : ReceiverMessageHandlerOptions ?? EventProcessorOptions.DefaultOptions
327 : );
328 : }
329 : }
330 : }
|