Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Configuration;
12 : using System.Diagnostics;
13 : using System.IO;
14 : using System.Linq;
15 : using System.Reflection;
16 : using System.Security.Cryptography;
17 : using System.Threading.Tasks;
18 : using Chinchilla.Logging;
19 : using Cqrs.Authentication;
20 : using Cqrs.Configuration;
21 : #if NET452
22 : using Microsoft.ServiceBus;
23 : using Microsoft.ServiceBus.Messaging;
24 : using Manager = Microsoft.ServiceBus.NamespaceManager;
25 : #endif
26 : #if NETCOREAPP3_0
27 : using Microsoft.Azure.EventHubs;
28 : using Microsoft.Azure.EventHubs.Processor;
29 : using Manager = Microsoft.Azure.ServiceBus.Management.ManagementClient;
30 : #endif
31 : using System.Text;
32 : using Cqrs.Bus;
33 :
34 : namespace Cqrs.Azure.ServiceBus
35 : {
36 : /// <summary>
37 : /// An <see cref="AzureBus{TAuthenticationToken}"/> that uses Azure Service Event Hubs.
38 : /// </summary>
39 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
40 : public abstract class AzureEventHub<TAuthenticationToken>
41 : : AzureBus<TAuthenticationToken>
42 1 : {
43 : /// <summary>
44 : /// Gets the public<see cref="EventHubClient"/>.
45 : /// </summary>
46 : protected EventHubClient EventHubPublisher { get; private set; }
47 :
48 : /// <summary>
49 : /// Gets the public<see cref="EventProcessorHost"/>.
50 : /// </summary>
51 : protected EventProcessorHost EventHubReceiver { get; private set; }
52 :
53 : /// <summary>
54 : /// The name of the private event hub.
55 : /// </summary>
56 : protected string PrivateEventHubName { get; set; }
57 :
58 : /// <summary>
59 : /// The name of the public event hub.
60 : /// </summary>
61 : protected string PublicEventHubName { get; private set; }
62 :
63 : /// <summary>
64 : /// The name of the consumer group in the private event hub.
65 : /// </summary>
66 : protected string PrivateEventHubConsumerGroupName { get; private set; }
67 :
68 : /// <summary>
69 : /// The name of the consumer group in the public event hub.
70 : /// </summary>
71 : protected string PublicEventHubConsumerGroupName { get; private set; }
72 :
73 : /// <summary>
74 : /// The configuration key for the event hub connection string as used by <see cref="IConfigurationManager"/>.
75 : /// </summary>
76 : protected abstract string EventHubConnectionStringNameConfigurationKey { get; }
77 :
78 : /// <summary>
79 : /// The configuration key for the event hub storage connection string as used by <see cref="IConfigurationManager"/>.
80 : /// </summary>
81 : protected abstract string EventHubStorageConnectionStringNameConfigurationKey { get; }
82 :
83 : /// <summary>
84 : /// The configuration key for the signing token as used by <see cref="IConfigurationManager"/>.
85 : /// </summary>
86 : protected abstract string SigningTokenConfigurationKey { get; }
87 :
88 : /// <summary>
89 : /// The configuration key for the name of the private event hub as used by <see cref="IConfigurationManager"/>.
90 : /// </summary>
91 : protected abstract string PrivateEventHubNameConfigurationKey { get; }
92 :
93 : /// <summary>
94 : /// The configuration key for the name of the public event hub as used by <see cref="IConfigurationManager"/>.
95 : /// </summary>
96 : protected abstract string PublicEventHubNameConfigurationKey { get; }
97 :
98 : /// <summary>
99 : /// The configuration key for the name of the consumer group name of the private event hub as used by <see cref="IConfigurationManager"/>.
100 : /// </summary>
101 : protected abstract string PrivateEventHubConsumerGroupNameConfigurationKey { get; }
102 :
103 : /// <summary>
104 : /// The configuration key for the name of the consumer group name of the public event hub as used by <see cref="IConfigurationManager"/>.
105 : /// </summary>
106 : protected abstract string PublicEventHubConsumerGroupNameConfigurationKey { get; }
107 :
108 : /// <summary>
109 : /// The default name of the private event hub if no <see cref="IConfigurationManager"/> value is set.
110 : /// </summary>
111 : protected abstract string DefaultPrivateEventHubName { get; }
112 :
113 : /// <summary>
114 : /// The default name of the public event hub if no <see cref="IConfigurationManager"/> value is set.
115 : /// </summary>
116 : protected abstract string DefaultPublicEventHubName { get; }
117 :
118 : /// <summary>
119 : /// The default name of the consumer group in the private event hub if no <see cref="IConfigurationManager"/> value is set.
120 : /// </summary>
121 : protected const string DefaultPrivateEventHubConsumerGroupName = "$Default";
122 :
123 : /// <summary>
124 : /// The default name of the consumer group in the public event hub if no <see cref="IConfigurationManager"/> value is set.
125 : /// </summary>
126 : protected const string DefaultPublicEventHubConsumerGroupName = "$Default";
127 :
128 : /// <summary>
129 : /// The event hub storage connection string.
130 : /// </summary>
131 : protected string StorageConnectionString { get; private set; }
132 :
133 : /// <summary>
134 : /// The <see cref="Action{PartitionContext, EventData}">handler</see> used for <see cref="EventProcessorHost.RegisterEventProcessorFactoryAsync(IEventProcessorFactory)"/> on <see cref="EventHubReceiver"/>.
135 : /// </summary>
136 : protected Action<PartitionContext, EventData> ReceiverMessageHandler { get; private set; }
137 :
138 : /// <summary>
139 : /// The <see cref="EventProcessorOptions" /> used for <see cref="EventProcessorHost.RegisterEventProcessorFactoryAsync(IEventProcessorFactory)"/> on <see cref="EventHubReceiver"/>.
140 : /// </summary>
141 : protected EventProcessorOptions ReceiverMessageHandlerOptions { get; private set; }
142 :
143 : /// <summary>
144 : /// Gets the <see cref="ITelemetryHelper"/>.
145 : /// </summary>
146 : protected ITelemetryHelper TelemetryHelper { get; set; }
147 :
148 : /// <summary>
149 : /// The <see cref="IHashAlgorithmFactory"/> to use to sign messages.
150 : /// </summary>
151 : protected IHashAlgorithmFactory Signer { get; private set; }
152 :
153 : /// <summary>
154 : /// A list of namespaces to exclude when trying to automatically determine the container.
155 : /// </summary>
156 : protected IList<string> ExclusionNamespaces { get; private set; }
157 :
158 : /// <summary>
159 : /// Instantiates a new instance of <see cref="AzureEventHub{TAuthenticationToken}"/>
160 : /// </summary>
161 1 : protected AzureEventHub(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IHashAlgorithmFactory hashAlgorithmFactory, bool isAPublisher)
162 : : base (configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, isAPublisher)
163 : {
164 : TelemetryHelper = new NullTelemetryHelper();
165 : ExclusionNamespaces = new SynchronizedCollection<string> { "Cqrs", "System" };
166 : Signer = hashAlgorithmFactory;
167 : }
168 :
169 : #region Overrides of AzureBus<TAuthenticationToken>
170 :
171 : /// <summary>
172 : /// Gets the connection string for the bus from <see cref="AzureBus{TAuthenticationToken}.ConfigurationManager"/>
173 : /// </summary>
174 1 : protected override string GetConnectionString()
175 : {
176 : string connectionString = System.Configuration.ConfigurationManager.ConnectionStrings[ConfigurationManager.GetSetting(EventHubConnectionStringNameConfigurationKey)].ConnectionString;
177 : if (string.IsNullOrWhiteSpace(connectionString))
178 : throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and a matching connection string with the name that matches the value of the appSetting value '{0}'.", EventHubConnectionStringNameConfigurationKey));
179 : return connectionString;
180 : }
181 :
182 : /// <summary>
183 : /// Calls <see cref="AzureBus{TAuthenticationToken}.SetConnectionStrings"/>
184 : /// and then sets the required storage connection string.
185 : /// </summary>
186 1 : protected override void SetConnectionStrings()
187 : {
188 : base.SetConnectionStrings();
189 : StorageConnectionString = System.Configuration.ConfigurationManager.ConnectionStrings[ConfigurationManager.GetSetting(EventHubStorageConnectionStringNameConfigurationKey)].ConnectionString;
190 : if (string.IsNullOrWhiteSpace(StorageConnectionString))
191 : throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and a matching connection string with the name that matches the value of the appSetting value '{0}'.", EventHubStorageConnectionStringNameConfigurationKey));
192 : Logger.LogDebug(string.Format("Storage connection string settings set to {0}.", StorageConnectionString));
193 : }
194 :
195 : #endregion
196 :
197 : /// <summary>
198 : /// Instantiate publishing on this bus by
199 : /// calling <see cref="CheckPrivateHubExists"/> and <see cref="CheckPublicHubExists"/>
200 : /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
201 : /// </summary>
202 1 : protected override void InstantiatePublishing()
203 : {
204 : #if NET452
205 : Manager manager = Manager.CreateFromConnectionString(ConnectionString);
206 : #endif
207 : #if NETCOREAPP3_0
208 : var manager = new Manager(ConnectionString);
209 : #endif
210 : CheckPrivateHubExists(manager);
211 : CheckPublicHubExists(manager);
212 :
213 : #if NET452
214 : EventHubPublisher = EventHubClient.CreateFromConnectionString(ConnectionString, PublicEventHubName);
215 : #endif
216 : #if NETCOREAPP3_0
217 : var connectionStringBuilder = new EventHubsConnectionStringBuilder(ConnectionString)
218 : {
219 : EntityPath = PublicEventHubName
220 : };
221 : EventHubPublisher = EventHubClient.Create(connectionStringBuilder);
222 : #endif
223 : StartSettingsChecking();
224 : }
225 :
226 : /// <summary>
227 : /// Instantiate receiving on this bus by
228 : /// calling <see cref="CheckPrivateHubExists"/> and <see cref="CheckPublicHubExists"/>
229 : /// then InstantiateReceiving for private and public topics,
230 : /// then calling <see cref="AzureBus{TAuthenticationToken}.StartSettingsChecking"/>
231 : /// </summary>
232 1 : protected override void InstantiateReceiving()
233 : {
234 : #if NET452
235 : Manager manager = Manager.CreateFromConnectionString(ConnectionString);
236 : #endif
237 : #if NETCOREAPP3_0
238 : var manager = new Manager(ConnectionString);
239 : #endif
240 :
241 : CheckPrivateHubExists(manager);
242 : CheckPublicHubExists(manager);
243 :
244 : EventHubReceiver = new EventProcessorHost(PublicEventHubName, PublicEventHubConsumerGroupName, ConnectionString, StorageConnectionString, "Cqrs");
245 :
246 : // If this is also a publisher, then it will the check over there and that will handle this
247 : if (EventHubPublisher != null)
248 : return;
249 :
250 : StartSettingsChecking();
251 : }
252 :
253 : /// <summary>
254 : /// Checks if the private hub and consumer group name exists as per <see cref="PrivateEventHubName"/> and <see cref="PrivateEventHubConsumerGroupName"/>.
255 : /// </summary>
256 : /// <param name="manager">The <see cref="Manager"/>.</param>
257 1 : protected virtual void CheckPrivateHubExists(Manager manager)
258 : {
259 : CheckHubExists(manager, PrivateEventHubName = ConfigurationManager.GetSetting(PrivateEventHubNameConfigurationKey) ?? DefaultPrivateEventHubName, PrivateEventHubConsumerGroupName = ConfigurationManager.GetSetting(PrivateEventHubConsumerGroupNameConfigurationKey) ?? DefaultPrivateEventHubConsumerGroupName);
260 : }
261 :
262 : /// <summary>
263 : /// Checks if the public hub and consumer group name exists as per <see cref="PublicEventHubName"/> and <see cref="PublicEventHubConsumerGroupName"/>.
264 : /// </summary>
265 : /// <param name="manager">The <see cref="Manager"/>.</param>
266 1 : protected virtual void CheckPublicHubExists(Manager manager)
267 : {
268 : CheckHubExists(manager, PublicEventHubName = ConfigurationManager.GetSetting(PublicEventHubNameConfigurationKey) ?? DefaultPublicEventHubName, PublicEventHubConsumerGroupName = ConfigurationManager.GetSetting(PublicEventHubConsumerGroupNameConfigurationKey) ?? DefaultPublicEventHubConsumerGroupName);
269 : }
270 :
271 : /// <summary>
272 : /// Checks if a event hub by the provided <paramref name="hubName"/> exists and
273 : /// Checks if a consumer group by the provided <paramref name="consumerGroupNames"/> exists.
274 : /// </summary>
275 1 : protected virtual void CheckHubExists(Manager manager, string hubName, string consumerGroupNames)
276 : {
277 : #if NET452
278 : // Configure Queue Settings
279 : var eventHubDescription = new EventHubDescription(hubName)
280 : {
281 : MessageRetentionInDays = long.MaxValue,
282 : };
283 :
284 : // Create the topic if it does not exist already
285 : manager.CreateEventHubIfNotExists(eventHubDescription);
286 :
287 : var subscriptionDescription = new SubscriptionDescription(eventHubDescription.Path, consumerGroupNames);
288 :
289 : if (!manager.SubscriptionExists(eventHubDescription.Path, consumerGroupNames))
290 : manager.CreateSubscription(subscriptionDescription);
291 : #endif
292 : #if NETCOREAPP3_0
293 : /*
294 : // Configure Queue Settings
295 : var eventHubDescription = new EventHubDescription(hubName)
296 : {
297 : MessageRetentionInDays = long.MaxValue,
298 : };
299 :
300 : // Create the topic if it does not exist already
301 : manager.CreateEventHubIfNotExists(eventHubDescription);
302 :
303 : Task<bool> checkTask = manager.SubscriptionExistsAsync(eventHubDescription.Path, consumerGroupNames);
304 : checkTask.Wait(1500);
305 : if (!checkTask.Result)
306 : manager.CreateSubscriptionAsync(subscriptionDescription).Wait(1500);
307 : */
308 : Logger.LogWarning($"Checking EventHubs and subscriptions is not currently implemented until the Azure libraries provide management facilities. You will need to check these objects exist manually: EventHub {hubName}, Subscription/Consumer Group {consumerGroupNames}", "AzureEventHub");
309 : #endif
310 : }
311 :
312 : /// <summary>
313 : /// Checks <see cref="AzureBus{TAuthenticationToken}.ValidateSettingsHaveChanged"/>
314 : /// and that <see cref="StorageConnectionString"/> have changed.
315 : /// </summary>
316 : /// <returns></returns>
317 1 : protected override bool ValidateSettingsHaveChanged()
318 : {
319 : return base.ValidateSettingsHaveChanged()
320 : ||
321 : StorageConnectionString != ConfigurationManager.GetSetting(EventHubStorageConnectionStringNameConfigurationKey);
322 : }
323 :
324 : /// <summary>
325 : /// Triggers settings checking on <see cref="EventHubPublisher"/> and <see cref="EventHubReceiver"/>,
326 : /// then calls <see cref="InstantiateReceiving"/> and <see cref="InstantiatePublishing"/>.
327 : /// </summary>
328 1 : protected override void TriggerSettingsChecking()
329 : {
330 : // Let's wrap up using this event hub and start the switch
331 : if (EventHubPublisher != null)
332 : {
333 : EventHubPublisher.Close();
334 : Logger.LogDebug("Publishing event hub closed.");
335 : }
336 : // Let's wrap up using this event hub and start the switch
337 : if (EventHubReceiver != null)
338 : {
339 : Task work = EventHubReceiver.UnregisterEventProcessorAsync();
340 : work.ConfigureAwait(false);
341 : work.Wait();
342 : Logger.LogDebug("Receiving event hub closed.");
343 : }
344 : // Restart configuration, we order this intentionally with the receiver first as if this triggers the cancellation we know this isn't a publisher as well
345 : if (EventHubReceiver != null)
346 : {
347 : Logger.LogDebug("Recursively calling into InstantiateReceiving.");
348 : InstantiateReceiving();
349 :
350 : // This will be the case of a connection setting change re-connection
351 : if (ReceiverMessageHandler != null && ReceiverMessageHandlerOptions != null)
352 : {
353 : // Callback to handle received messages
354 : Logger.LogDebug("Re-registering onMessage handler.");
355 : ApplyReceiverMessageHandler();
356 : }
357 : else
358 : Logger.LogWarning("No onMessage handler was found to re-bind.");
359 : }
360 : // Restart configuration, we order this intentionally with the publisher second as if this triggers the cancellation there's nothing else to process here
361 : if (EventHubPublisher != null)
362 : {
363 : Logger.LogDebug("Recursively calling into InstantiatePublishing.");
364 : InstantiatePublishing();
365 : }
366 : }
367 :
368 : /// <summary>
369 : /// Registers the provided <paramref name="receiverMessageHandler"/> with the provided <paramref name="receiverMessageHandlerOptions"/>.
370 : /// </summary>
371 1 : protected virtual void RegisterReceiverMessageHandler(Action<PartitionContext, EventData> receiverMessageHandler, EventProcessorOptions receiverMessageHandlerOptions = null)
372 : {
373 : StoreReceiverMessageHandler(receiverMessageHandler, receiverMessageHandlerOptions);
374 :
375 : ApplyReceiverMessageHandler();
376 : }
377 :
378 : /// <summary>
379 : /// Stores the provided <paramref name="receiverMessageHandler"/> and <paramref name="receiverMessageHandlerOptions"/>.
380 : /// </summary>
381 1 : protected virtual void StoreReceiverMessageHandler(Action<PartitionContext, EventData> receiverMessageHandler, EventProcessorOptions receiverMessageHandlerOptions = null)
382 : {
383 : ReceiverMessageHandler = receiverMessageHandler;
384 : ReceiverMessageHandlerOptions = receiverMessageHandlerOptions;
385 : }
386 :
387 : /// <summary>
388 : /// Applies the stored ReceiverMessageHandler and ReceiverMessageHandlerOptions to the <see cref="EventHubReceiver"/>.
389 : /// </summary>
390 1 : protected override void ApplyReceiverMessageHandler()
391 : {
392 : EventHubReceiver.RegisterEventProcessorFactoryAsync
393 : (
394 : new DefaultEventProcessorFactory<DefaultEventProcessor>
395 : (
396 : new DefaultEventProcessor(Logger, ReceiverMessageHandler)
397 : ),
398 : ReceiverMessageHandlerOptions ?? EventProcessorOptions.DefaultOptions
399 : );
400 : }
401 :
402 : /// <summary>
403 : /// Create <see cref="EventData"/> with additional properties to aid routing and tracing
404 : /// </summary>
405 1 : protected virtual EventData CreateBrokeredMessage<TMessage>(Func<TMessage, string> serialiserFunction, Type messageType, TMessage message)
406 : {
407 : string messageBody = serialiserFunction(message);
408 : var brokeredMessage = new EventData(Encoding.UTF8.GetBytes(messageBody));
409 :
410 : brokeredMessage.Properties.Add("CorrelationId", CorrelationIdHelper.GetCorrelationId().ToString("N"));
411 : brokeredMessage.Properties.Add("Type", messageType.FullName);
412 : brokeredMessage.Properties.Add("Source", string.Format("{0}/{1}/{2}/{3}", Logger.LoggerSettings.ModuleName, Logger.LoggerSettings.Instance, Logger.LoggerSettings.Environment, Logger.LoggerSettings.EnvironmentInstance));
413 :
414 : // see https://github.com/Chinchilla-Software-Com/CQRS/wiki/Inter-process-function-security</remarks>
415 : string configurationKey = string.Format("{0}.SigningToken", messageType.FullName);
416 : string signingToken;
417 : HashAlgorithm signer = Signer.Create();
418 : if (!ConfigurationManager.TryGetSetting(configurationKey, out signingToken) || string.IsNullOrWhiteSpace(signingToken))
419 : if (!ConfigurationManager.TryGetSetting(SigningTokenConfigurationKey, out signingToken) || string.IsNullOrWhiteSpace(signingToken))
420 : signingToken = Guid.Empty.ToString("N");
421 : if (!string.IsNullOrWhiteSpace(signingToken))
422 : using (var hashStream = new MemoryStream(Encoding.UTF8.GetBytes(string.Concat("{0}{1}", signingToken, messageBody))))
423 : brokeredMessage.Properties.Add("Signature", Convert.ToBase64String(signer.ComputeHash(hashStream)));
424 :
425 : try
426 : {
427 : var stackTrace = new StackTrace();
428 : StackFrame[] stackFrames = stackTrace.GetFrames();
429 : if (stackFrames != null)
430 : {
431 : foreach (StackFrame frame in stackFrames)
432 : {
433 : MethodBase method = frame.GetMethod();
434 : if (method.ReflectedType == null)
435 : continue;
436 :
437 : try
438 : {
439 : if (ExclusionNamespaces.All(@namespace => !method.ReflectedType.FullName.StartsWith(@namespace)))
440 : {
441 : brokeredMessage.Properties.Add("Source-Method", string.Format("{0}.{1}", method.ReflectedType.FullName, method.Name));
442 : break;
443 : }
444 : }
445 : catch
446 : {
447 : // Just move on
448 : }
449 : }
450 : }
451 : }
452 : catch
453 : {
454 : // Just move on
455 : }
456 :
457 : return brokeredMessage;
458 : }
459 :
460 : /// <summary>
461 : /// Extract any telemetry properties from the provided <paramref name="message"/>.
462 : /// </summary>
463 1 : protected virtual IDictionary<string, string> ExtractTelemetryProperties(EventData message, string baseCommunicationType)
464 : {
465 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", baseCommunicationType } };
466 : object value;
467 : if (message.Properties.TryGetValue("Type", out value))
468 : telemetryProperties.Add("MessageType", value.ToString());
469 : if (message.Properties.TryGetValue("Source", out value))
470 : telemetryProperties.Add("MessageSource", value.ToString());
471 : if (message.Properties.TryGetValue("Source-Method", out value))
472 : telemetryProperties.Add("MessageSourceMethod", value.ToString());
473 : if (message.Properties.TryGetValue("CorrelationId", out value) && !telemetryProperties.ContainsKey("CorrelationId"))
474 : telemetryProperties.Add("CorrelationId", value.ToString());
475 :
476 : return telemetryProperties;
477 : }
478 :
479 : /// <summary>
480 : /// Extract the signature from the provided <paramref name="eventData"/>.
481 : /// </summary>
482 1 : protected virtual string ExtractSignature(EventData eventData)
483 : {
484 : object value;
485 : if (eventData.Properties.TryGetValue("Signature", out value))
486 : return value.ToString();
487 : return null;
488 : }
489 : }
490 : }
|