Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Configuration;
12 : using System.Diagnostics;
13 : using System.IO;
14 : using System.Linq;
15 : using System.Reflection;
16 : using System.Security.Cryptography;
17 : using System.Threading.Tasks;
18 : using cdmdotnet.Logging;
19 : using Cqrs.Authentication;
20 : using Cqrs.Configuration;
21 : using Microsoft.ServiceBus;
22 : using Microsoft.ServiceBus.Messaging;
23 : using System.Text;
24 : using Cqrs.Bus;
25 :
26 : namespace Cqrs.Azure.ServiceBus
27 : {
28 : /// <summary>
29 : /// An <see cref="AzureBus{TAuthenticationToken}"/> that uses Azure Service Event Hubs.
30 : /// </summary>
31 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
32 : public abstract class AzureEventHub<TAuthenticationToken>
33 : : AzureBus<TAuthenticationToken>
34 1 : {
35 : /// <summary>
36 : /// Gets the public<see cref="EventHubClient"/>.
37 : /// </summary>
38 : protected EventHubClient EventHubPublisher { get; private set; }
39 :
40 : /// <summary>
41 : /// Gets the public<see cref="EventProcessorHost"/>.
42 : /// </summary>
43 : protected EventProcessorHost EventHubReceiver { get; private set; }
44 :
45 : /// <summary>
46 : /// The name of the private event hub.
47 : /// </summary>
48 : protected string PrivateEventHubName { get; set; }
49 :
50 : /// <summary>
51 : /// The name of the public event hub.
52 : /// </summary>
53 : protected string PublicEventHubName { get; private set; }
54 :
55 : /// <summary>
56 : /// The name of the consumer group in the private event hub.
57 : /// </summary>
58 : protected string PrivateEventHubConsumerGroupName { get; private set; }
59 :
60 : /// <summary>
61 : /// The name of the consumer group in the public event hub.
62 : /// </summary>
63 : protected string PublicEventHubConsumerGroupName { get; private set; }
64 :
65 : /// <summary>
66 : /// The configuration key for the event hub connection string as used by <see cref="IConfigurationManager"/>.
67 : /// </summary>
68 : protected abstract string EventHubConnectionStringNameConfigurationKey { get; }
69 :
70 : /// <summary>
71 : /// The configuration key for the event hub storage connection string as used by <see cref="IConfigurationManager"/>.
72 : /// </summary>
73 : protected abstract string EventHubStorageConnectionStringNameConfigurationKey { get; }
74 :
75 : /// <summary>
76 : /// The configuration key for the signing token as used by <see cref="IConfigurationManager"/>.
77 : /// </summary>
78 : protected abstract string SigningTokenConfigurationKey { get; }
79 :
80 : /// <summary>
81 : /// The configuration key for the name of the private event hub as used by <see cref="IConfigurationManager"/>.
82 : /// </summary>
83 : protected abstract string PrivateEventHubNameConfigurationKey { get; }
84 :
85 : /// <summary>
86 : /// The configuration key for the name of the public event hub as used by <see cref="IConfigurationManager"/>.
87 : /// </summary>
88 : protected abstract string PublicEventHubNameConfigurationKey { get; }
89 :
90 : /// <summary>
91 : /// The configuration key for the name of the consumer group name of the private event hub as used by <see cref="IConfigurationManager"/>.
92 : /// </summary>
93 : protected abstract string PrivateEventHubConsumerGroupNameConfigurationKey { get; }
94 :
95 : /// <summary>
96 : /// The configuration key for the name of the consumer group name of the public event hub as used by <see cref="IConfigurationManager"/>.
97 : /// </summary>
98 : protected abstract string PublicEventHubConsumerGroupNameConfigurationKey { get; }
99 :
100 : /// <summary>
101 : /// The default name of the private event hub if no <see cref="IConfigurationManager"/> value is set.
102 : /// </summary>
103 : protected abstract string DefaultPrivateEventHubName { get; }
104 :
105 : /// <summary>
106 : /// The default name of the public event hub if no <see cref="IConfigurationManager"/> value is set.
107 : /// </summary>
108 : protected abstract string DefaultPublicEventHubName { get; }
109 :
110 : /// <summary>
111 : /// The default name of the consumer group in the private event hub if no <see cref="IConfigurationManager"/> value is set.
112 : /// </summary>
113 : protected const string DefaultPrivateEventHubConsumerGroupName = "$Default";
114 :
115 : /// <summary>
116 : /// The default name of the consumer group in the public event hub if no <see cref="IConfigurationManager"/> value is set.
117 : /// </summary>
118 : protected const string DefaultPublicEventHubConsumerGroupName = "$Default";
119 :
120 : /// <summary>
121 : /// The event hub storage connection string.
122 : /// </summary>
123 : protected string StorageConnectionString { get; private set; }
124 :
125 : /// <summary>
126 : /// The <see cref="Action{PartitionContext, EventData}">handler</see> used for <see cref="EventProcessorHost.RegisterEventProcessorFactoryAsync(Microsoft.ServiceBus.Messaging.IEventProcessorFactory)"/> on <see cref="EventHubReceiver"/>.
127 : /// </summary>
128 : protected Action<PartitionContext, EventData> ReceiverMessageHandler { get; private set; }
129 :
130 : /// <summary>
131 : /// The <see cref="EventProcessorOptions" /> used for <see cref="EventProcessorHost.RegisterEventProcessorFactoryAsync(Microsoft.ServiceBus.Messaging.IEventProcessorFactory)"/> on <see cref="EventHubReceiver"/>.
132 : /// </summary>
133 : protected EventProcessorOptions ReceiverMessageHandlerOptions { get; private set; }
134 :
135 : /// <summary>
136 : /// Gets the <see cref="ITelemetryHelper"/>.
137 : /// </summary>
138 : protected ITelemetryHelper TelemetryHelper { get; set; }
139 :
140 : /// <summary>
141 : /// The <see cref="IHashAlgorithmFactory"/> to use to sign messages.
142 : /// </summary>
143 : protected IHashAlgorithmFactory Signer { get; private set; }
144 :
145 : /// <summary>
146 : /// A list of namespaces to exclude when trying to automatically determine the container.
147 : /// </summary>
148 : protected IList<string> ExclusionNamespaces { get; private set; }
149 :
150 : /// <summary>
151 : /// Instantiates a new instance of <see cref="AzureEventHub{TAuthenticationToken}"/>
152 : /// </summary>
153 1 : protected AzureEventHub(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IHashAlgorithmFactory hashAlgorithmFactory, bool isAPublisher)
154 : : base (configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, isAPublisher)
155 : {
156 : TelemetryHelper = new NullTelemetryHelper();
157 : ExclusionNamespaces = new SynchronizedCollection<string> { "Cqrs", "System" };
158 : Signer = hashAlgorithmFactory;
159 : }
160 :
161 : #region Overrides of AzureBus<TAuthenticationToken>
162 :
163 : /// <summary>
164 : /// Gets the connection string for the bus from <see cref="AzureBus{TAuthenticationToken}.ConfigurationManager"/>
165 : /// </summary>
166 1 : protected override string GetConnectionString()
167 : {
168 : string connectionString = System.Configuration.ConfigurationManager.ConnectionStrings[ConfigurationManager.GetSetting(EventHubConnectionStringNameConfigurationKey)].ConnectionString;
169 : if (string.IsNullOrWhiteSpace(connectionString))
170 : throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and a matching connection string with the name that matches the value of the appSetting value '{0}'.", EventHubConnectionStringNameConfigurationKey));
171 : return connectionString;
172 : }
173 :
174 : /// <summary>
175 : /// Calls <see cref="AzureBus{TAuthenticationToken}.SetConnectionStrings"/>
176 : /// and then sets the required storage connection string.
177 : /// </summary>
178 1 : protected override void SetConnectionStrings()
179 : {
180 : base.SetConnectionStrings();
181 : StorageConnectionString = System.Configuration.ConfigurationManager.ConnectionStrings[ConfigurationManager.GetSetting(EventHubStorageConnectionStringNameConfigurationKey)].ConnectionString;
182 : if (string.IsNullOrWhiteSpace(StorageConnectionString))
183 : throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and a matching connection string with the name that matches the value of the appSetting value '{0}'.", EventHubStorageConnectionStringNameConfigurationKey));
184 : Logger.LogDebug(string.Format("Storage connection string settings set to {0}.", StorageConnectionString));
185 : }
186 :
187 : #endregion
188 :
189 : /// <summary>
190 : /// Instantiate publishing on this bus by
191 : /// calling <see cref="CheckPrivateHubExists"/> and <see cref="CheckPublicHubExists"/>
192 : /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
193 : /// </summary>
194 1 : protected override void InstantiatePublishing()
195 : {
196 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
197 : CheckPrivateHubExists(namespaceManager);
198 : CheckPublicHubExists(namespaceManager);
199 :
200 : EventHubPublisher = EventHubClient.CreateFromConnectionString(ConnectionString, PublicEventHubName);
201 : StartSettingsChecking();
202 : }
203 :
204 : /// <summary>
205 : /// Instantiate receiving on this bus by
206 : /// calling <see cref="CheckPrivateHubExists"/> and <see cref="CheckPublicHubExists"/>
207 : /// then InstantiateReceiving for private and public topics,
208 : /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
209 : /// </summary>
210 1 : protected override void InstantiateReceiving()
211 : {
212 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
213 :
214 : CheckPrivateHubExists(namespaceManager);
215 : CheckPublicHubExists(namespaceManager);
216 :
217 : EventHubReceiver = new EventProcessorHost(PublicEventHubName, PublicEventHubConsumerGroupName, ConnectionString, StorageConnectionString);
218 :
219 : // If this is also a publisher, then it will the check over there and that will handle this
220 : if (EventHubPublisher != null)
221 : return;
222 :
223 : StartSettingsChecking();
224 : }
225 :
226 : /// <summary>
227 : /// Checks if the private hub and consumer group name exists as per <see cref="PrivateEventHubName"/> and <see cref="PrivateEventHubConsumerGroupName"/>.
228 : /// </summary>
229 : /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
230 1 : protected virtual void CheckPrivateHubExists(NamespaceManager namespaceManager)
231 : {
232 : CheckHubExists(namespaceManager, PrivateEventHubName = ConfigurationManager.GetSetting(PrivateEventHubNameConfigurationKey) ?? DefaultPrivateEventHubName, PrivateEventHubConsumerGroupName = ConfigurationManager.GetSetting(PrivateEventHubConsumerGroupNameConfigurationKey) ?? DefaultPrivateEventHubConsumerGroupName);
233 : }
234 :
235 : /// <summary>
236 : /// Checks if the public hub and consumer group name exists as per <see cref="PublicEventHubName"/> and <see cref="PublicEventHubConsumerGroupName"/>.
237 : /// </summary>
238 : /// <param name="namespaceManager">The <see cref="NamespaceManager"/>.</param>
239 1 : protected virtual void CheckPublicHubExists(NamespaceManager namespaceManager)
240 : {
241 : CheckHubExists(namespaceManager, PublicEventHubName = ConfigurationManager.GetSetting(PublicEventHubNameConfigurationKey) ?? DefaultPublicEventHubName, PublicEventHubConsumerGroupName = ConfigurationManager.GetSetting(PublicEventHubConsumerGroupNameConfigurationKey) ?? DefaultPublicEventHubConsumerGroupName);
242 : }
243 :
244 : /// <summary>
245 : /// Checks if a event hub by the provided <paramref name="hubName"/> exists and
246 : /// Checks if a consumer group by the provided <paramref name="consumerGroupNames"/> exists.
247 : /// </summary>
248 1 : protected virtual void CheckHubExists(NamespaceManager namespaceManager, string hubName, string consumerGroupNames)
249 : {
250 : // Configure Queue Settings
251 : var eventHubDescription = new EventHubDescription(hubName)
252 : {
253 : MessageRetentionInDays = long.MaxValue,
254 :
255 : };
256 :
257 : // Create the topic if it does not exist already
258 : namespaceManager.CreateEventHubIfNotExists(eventHubDescription);
259 :
260 : var subscriptionDescription = new SubscriptionDescription(eventHubDescription.Path, consumerGroupNames);
261 :
262 : if (!namespaceManager.SubscriptionExists(eventHubDescription.Path, consumerGroupNames))
263 : namespaceManager.CreateSubscription(subscriptionDescription);
264 : }
265 :
266 : /// <summary>
267 : /// Checks <see cref="AzureBus{TAuthenticationToken}.ValidateSettingsHaveChanged"/>
268 : /// and that <see cref="StorageConnectionString"/> have changed.
269 : /// </summary>
270 : /// <returns></returns>
271 1 : protected override bool ValidateSettingsHaveChanged()
272 : {
273 : return base.ValidateSettingsHaveChanged()
274 : ||
275 : StorageConnectionString != ConfigurationManager.GetSetting(EventHubStorageConnectionStringNameConfigurationKey);
276 : }
277 :
278 : /// <summary>
279 : /// Triggers settings checking on <see cref="EventHubPublisher"/> and <see cref="EventHubReceiver"/>,
280 : /// then calls <see cref="InstantiateReceiving"/> and <see cref="InstantiatePublishing"/>.
281 : /// </summary>
282 1 : protected override void TriggerSettingsChecking()
283 : {
284 : // Let's wrap up using this event hub and start the switch
285 : if (EventHubPublisher != null)
286 : {
287 : EventHubPublisher.Close();
288 : Logger.LogDebug("Publishing event hub closed.");
289 : }
290 : // Let's wrap up using this event hub and start the switch
291 : if (EventHubReceiver != null)
292 : {
293 : Task work = EventHubReceiver.UnregisterEventProcessorAsync();
294 : work.ConfigureAwait(false);
295 : work.Wait();
296 : Logger.LogDebug("Receiving event hub closed.");
297 : }
298 : // Restart configuration, we order this intentionally with the receiver first as if this triggers the cancellation we know this isn't a publisher as well
299 : if (EventHubReceiver != null)
300 : {
301 : Logger.LogDebug("Recursively calling into InstantiateReceiving.");
302 : InstantiateReceiving();
303 :
304 : // This will be the case of a connection setting change re-connection
305 : if (ReceiverMessageHandler != null && ReceiverMessageHandlerOptions != null)
306 : {
307 : // Callback to handle received messages
308 : Logger.LogDebug("Re-registering onMessage handler.");
309 : ApplyReceiverMessageHandler();
310 : }
311 : else
312 : Logger.LogWarning("No onMessage handler was found to re-bind.");
313 : }
314 : // Restart configuration, we order this intentionally with the publisher second as if this triggers the cancellation there's nothing else to process here
315 : if (EventHubPublisher != null)
316 : {
317 : Logger.LogDebug("Recursively calling into InstantiatePublishing.");
318 : InstantiatePublishing();
319 : }
320 : }
321 :
322 : /// <summary>
323 : /// Registers the provided <paramref name="receiverMessageHandler"/> with the provided <paramref name="receiverMessageHandlerOptions"/>.
324 : /// </summary>
325 1 : protected virtual void RegisterReceiverMessageHandler(Action<PartitionContext, EventData> receiverMessageHandler, EventProcessorOptions receiverMessageHandlerOptions = null)
326 : {
327 : StoreReceiverMessageHandler(receiverMessageHandler, receiverMessageHandlerOptions);
328 :
329 : ApplyReceiverMessageHandler();
330 : }
331 :
332 : /// <summary>
333 : /// Stores the provided <paramref name="receiverMessageHandler"/> and <paramref name="receiverMessageHandlerOptions"/>.
334 : /// </summary>
335 1 : protected virtual void StoreReceiverMessageHandler(Action<PartitionContext, EventData> receiverMessageHandler, EventProcessorOptions receiverMessageHandlerOptions = null)
336 : {
337 : ReceiverMessageHandler = receiverMessageHandler;
338 : ReceiverMessageHandlerOptions = receiverMessageHandlerOptions;
339 : }
340 :
341 : /// <summary>
342 : /// Applies the stored ReceiverMessageHandler and ReceiverMessageHandlerOptions to the <see cref="EventHubReceiver"/>.
343 : /// </summary>
344 1 : protected override void ApplyReceiverMessageHandler()
345 : {
346 : EventHubReceiver.RegisterEventProcessorFactoryAsync
347 : (
348 : new DefaultEventProcessorFactory<DefaultEventProcessor>
349 : (
350 : new DefaultEventProcessor(Logger, ReceiverMessageHandler)
351 : ),
352 : ReceiverMessageHandlerOptions ?? EventProcessorOptions.DefaultOptions
353 : );
354 : }
355 :
356 : /// <summary>
357 : /// Create <see cref="EventData"/> with additional properties to aid routing and tracing
358 : /// </summary>
359 1 : protected virtual EventData CreateBrokeredMessage<TMessage>(Func<TMessage, string> serialiserFunction, Type messageType, TMessage message)
360 : {
361 : string messageBody = serialiserFunction(message);
362 : var brokeredMessage = new EventData(Encoding.UTF8.GetBytes(messageBody));
363 :
364 : brokeredMessage.Properties.Add("CorrelationId", CorrelationIdHelper.GetCorrelationId().ToString("N"));
365 : brokeredMessage.Properties.Add("Type", messageType.FullName);
366 : brokeredMessage.Properties.Add("Source", string.Format("{0}/{1}/{2}/{3}", Logger.LoggerSettings.ModuleName, Logger.LoggerSettings.Instance, Logger.LoggerSettings.Environment, Logger.LoggerSettings.EnvironmentInstance));
367 :
368 : // see https://github.com/Chinchilla-Software-Com/CQRS/wiki/Inter-process-function-security</remarks>
369 : string configurationKey = string.Format("{0}.SigningToken", messageType.FullName);
370 : string signingToken;
371 : HashAlgorithm signer = Signer.Create();
372 : if (!ConfigurationManager.TryGetSetting(configurationKey, out signingToken) || string.IsNullOrWhiteSpace(signingToken))
373 : if (!ConfigurationManager.TryGetSetting(SigningTokenConfigurationKey, out signingToken) || string.IsNullOrWhiteSpace(signingToken))
374 : signingToken = Guid.Empty.ToString("N");
375 : if (!string.IsNullOrWhiteSpace(signingToken))
376 : using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", signingToken, messageBody))))
377 : brokeredMessage.Properties.Add("Signature", Convert.ToBase64String(signer.ComputeHash(hashStream)));
378 :
379 : try
380 : {
381 : var stackTrace = new StackTrace();
382 : StackFrame[] stackFrames = stackTrace.GetFrames();
383 : if (stackFrames != null)
384 : {
385 : foreach (StackFrame frame in stackFrames)
386 : {
387 : MethodBase method = frame.GetMethod();
388 : if (method.ReflectedType == null)
389 : continue;
390 :
391 : try
392 : {
393 : if (ExclusionNamespaces.All(@namespace => !method.ReflectedType.FullName.StartsWith(@namespace)))
394 : {
395 : brokeredMessage.Properties.Add("Source-Method", string.Format("{0}.{1}", method.ReflectedType.FullName, method.Name));
396 : break;
397 : }
398 : }
399 : catch
400 : {
401 : // Just move on
402 : }
403 : }
404 : }
405 : }
406 : catch
407 : {
408 : // Just move on
409 : }
410 :
411 : return brokeredMessage;
412 : }
413 :
414 : /// <summary>
415 : /// Extract any telemetry properties from the provided <paramref name="message"/>.
416 : /// </summary>
417 1 : protected virtual IDictionary<string, string> ExtractTelemetryProperties(EventData message, string baseCommunicationType)
418 : {
419 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", baseCommunicationType } };
420 : object value;
421 : if (message.Properties.TryGetValue("Type", out value))
422 : telemetryProperties.Add("MessageType", value.ToString());
423 : if (message.Properties.TryGetValue("Source", out value))
424 : telemetryProperties.Add("MessageSource", value.ToString());
425 : if (message.Properties.TryGetValue("Source-Method", out value))
426 : telemetryProperties.Add("MessageSourceMethod", value.ToString());
427 : if (message.Properties.TryGetValue("CorrelationId", out value) && !telemetryProperties.ContainsKey("CorrelationId"))
428 : telemetryProperties.Add("CorrelationId", value.ToString());
429 :
430 : return telemetryProperties;
431 : }
432 :
433 : /// <summary>
434 : /// Extract the signature from the provided <paramref name="eventData"/>.
435 : /// </summary>
436 1 : protected virtual string ExtractSignature(EventData eventData)
437 : {
438 : object value;
439 : if (eventData.Properties.TryGetValue("Signature", out value))
440 : return value.ToString();
441 : return null;
442 : }
443 : }
444 : }
|