Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Configuration;
11 : using System.Threading.Tasks;
12 : using cdmdotnet.Logging;
13 : using Cqrs.Authentication;
14 : using Cqrs.Configuration;
15 : using Microsoft.ServiceBus;
16 : using Microsoft.ServiceBus.Messaging;
17 :
18 : namespace Cqrs.Azure.ServiceBus
19 : {
20 : public abstract class AzureEventHub<TAuthenticationToken> : AzureBus<TAuthenticationToken>
21 0 : {
22 : protected EventHubClient EventHubPublisher { get; private set; }
23 :
24 : protected EventProcessorHost EventHubReceiver { get; private set; }
25 :
26 : protected string PrivateEventHubName { get; set; }
27 :
28 : protected string PublicEventHubName { get; private set; }
29 :
30 : protected string PrivateEventHubConsumerGroupName { get; private set; }
31 :
32 : protected string PublicEventHubConsumerGroupName { get; private set; }
33 :
34 : protected abstract string EventHubConnectionStringNameConfigurationKey { get; }
35 :
36 : protected abstract string EventHubStorageConnectionStringNameConfigurationKey { get; }
37 :
38 : protected abstract string PrivateEventHubNameConfigurationKey { get; }
39 :
40 : protected abstract string PublicEventHubNameConfigurationKey { get; }
41 :
42 : protected abstract string DefaultPrivateEventHubName { get; }
43 :
44 : protected abstract string DefaultPublicEventHubName { get; }
45 :
46 : protected abstract string PrivateEventHubConsumerGroupNameConfigurationKey { get; }
47 :
48 : protected abstract string PublicEventHubConsumerGroupNameConfigurationKey { get; }
49 :
50 : protected const string DefaultPrivateEventHubConsumerGroupName = "$Default";
51 :
52 : protected const string DefaultPublicEventHubConsumerGroupName = "$Default";
53 :
54 : protected string StorageConnectionString { get; private set; }
55 :
56 : protected Action<PartitionContext, EventData> ReceiverMessageHandler { get; private set; }
57 :
58 : protected EventProcessorOptions ReceiverMessageHandlerOptions { get; private set; }
59 :
60 : protected ITelemetryHelper TelemetryHelper { get; set; }
61 :
62 0 : protected AzureEventHub(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, bool isAPublisher)
63 : : base (configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, isAPublisher)
64 : {
65 : TelemetryHelper = new NullTelemetryHelper();
66 : }
67 :
68 : #region Overrides of AzureBus<TAuthenticationToken>
69 :
70 0 : protected override string GetConnectionString()
71 : {
72 : string connectionString = System.Configuration.ConfigurationManager.ConnectionStrings[ConfigurationManager.GetSetting(EventHubConnectionStringNameConfigurationKey)].ConnectionString;
73 : if (string.IsNullOrWhiteSpace(connectionString))
74 : throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and a matching connection string with the name that matches the value of the appSetting value '{0}'.", EventHubConnectionStringNameConfigurationKey));
75 : return connectionString;
76 : }
77 :
78 0 : protected override void SetConnectionStrings()
79 : {
80 : base.SetConnectionStrings();
81 : StorageConnectionString = System.Configuration.ConfigurationManager.ConnectionStrings[ConfigurationManager.GetSetting(EventHubStorageConnectionStringNameConfigurationKey)].ConnectionString;
82 : if (string.IsNullOrWhiteSpace(StorageConnectionString))
83 : throw new ConfigurationErrorsException(string.Format("Configuration is missing required information. Make sure the appSetting '{0}' is defined and a matching connection string with the name that matches the value of the appSetting value '{0}'.", EventHubStorageConnectionStringNameConfigurationKey));
84 : Logger.LogDebug(string.Format("Storage connection string settings set to {0}.", StorageConnectionString));
85 : }
86 :
87 : #endregion
88 :
89 0 : protected override void InstantiatePublishing()
90 : {
91 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
92 : CheckPrivateEventHubExists(namespaceManager);
93 : CheckPublicHubExists(namespaceManager);
94 :
95 : EventHubPublisher = EventHubClient.CreateFromConnectionString(ConnectionString, PublicEventHubName);
96 : StartSettingsChecking();
97 : }
98 :
99 0 : protected override void InstantiateReceiving()
100 : {
101 : NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
102 :
103 : CheckPrivateEventHubExists(namespaceManager);
104 : CheckPublicHubExists(namespaceManager);
105 :
106 : EventHubReceiver = new EventProcessorHost(PublicEventHubName, PublicEventHubConsumerGroupName, ConnectionString, StorageConnectionString);
107 :
108 : // If this is also a publisher, then it will the check over there and that will handle this
109 : if (EventHubPublisher != null)
110 : return;
111 :
112 : StartSettingsChecking();
113 : }
114 :
115 0 : protected virtual void CheckPrivateEventHubExists(NamespaceManager namespaceManager)
116 : {
117 : CheckHubExists(namespaceManager, PrivateEventHubName = ConfigurationManager.GetSetting(PrivateEventHubNameConfigurationKey) ?? DefaultPrivateEventHubName, PrivateEventHubConsumerGroupName = ConfigurationManager.GetSetting(PrivateEventHubConsumerGroupNameConfigurationKey) ?? DefaultPrivateEventHubConsumerGroupName);
118 : }
119 :
120 0 : protected virtual void CheckPublicHubExists(NamespaceManager namespaceManager)
121 : {
122 : CheckHubExists(namespaceManager, PublicEventHubName = ConfigurationManager.GetSetting(PublicEventHubNameConfigurationKey) ?? DefaultPublicEventHubName, PublicEventHubConsumerGroupName = ConfigurationManager.GetSetting(PublicEventHubConsumerGroupNameConfigurationKey) ?? DefaultPublicEventHubConsumerGroupName);
123 : }
124 :
125 0 : protected virtual void CheckHubExists(NamespaceManager namespaceManager, string eventHubName, string eventSubscriptionNames)
126 : {
127 : // Configure Queue Settings
128 : var eventHubDescription = new EventHubDescription(eventHubName)
129 : {
130 : MessageRetentionInDays = long.MaxValue,
131 :
132 : };
133 :
134 : // Create the topic if it does not exist already
135 : namespaceManager.CreateEventHubIfNotExists(eventHubDescription);
136 :
137 : var subscriptionDescription = new SubscriptionDescription(eventHubDescription.Path, eventSubscriptionNames);
138 :
139 : if (!namespaceManager.SubscriptionExists(eventHubDescription.Path, eventSubscriptionNames))
140 : namespaceManager.CreateSubscription(subscriptionDescription);
141 : }
142 :
143 0 : protected override bool ValidateSettingsHaveChanged()
144 : {
145 : return base.ValidateSettingsHaveChanged()
146 : ||
147 : StorageConnectionString != ConfigurationManager.GetSetting(EventHubStorageConnectionStringNameConfigurationKey);
148 : }
149 :
150 0 : protected override void TriggerSettingsChecking()
151 : {
152 : // Let's wrap up using this event hub and start the switch
153 : if (EventHubPublisher != null)
154 : {
155 : EventHubPublisher.Close();
156 : Logger.LogDebug("Publishing event hub closed.");
157 : }
158 : // Let's wrap up using this event hub and start the switch
159 : if (EventHubReceiver != null)
160 : {
161 : Task work = EventHubReceiver.UnregisterEventProcessorAsync();
162 : work.ConfigureAwait(false);
163 : work.Wait();
164 : Logger.LogDebug("Receiving event hub closed.");
165 : }
166 : // Restart configuration, we order this intentionally with the receiver first as if this triggers the cancellation we know this isn't a publisher as well
167 : if (EventHubReceiver != null)
168 : {
169 : Logger.LogDebug("Recursively calling into InstantiateReceiving.");
170 : InstantiateReceiving();
171 :
172 : // This will be the case of a connection setting change re-connection
173 : if (ReceiverMessageHandler != null && ReceiverMessageHandlerOptions != null)
174 : {
175 : // Callback to handle received messages
176 : Logger.LogDebug("Re-registering onMessage handler.");
177 : ApplyReceiverMessageHandler();
178 : }
179 : else
180 : Logger.LogWarning("No onMessage handler was found to re-bind.");
181 : }
182 : // Restart configuration, we order this intentionally with the publisher second as if this triggers the cancellation there's nothing else to process here
183 : if (EventHubPublisher != null)
184 : {
185 : Logger.LogDebug("Recursively calling into InstantiatePublishing.");
186 : InstantiatePublishing();
187 : }
188 : }
189 :
190 0 : protected virtual void RegisterReceiverMessageHandler(Action<PartitionContext, EventData> receiverMessageHandler, EventProcessorOptions receiverMessageHandlerOptions = null)
191 : {
192 : StoreReceiverMessageHandler(receiverMessageHandler, receiverMessageHandlerOptions);
193 :
194 : ApplyReceiverMessageHandler();
195 : }
196 :
197 0 : protected virtual void StoreReceiverMessageHandler(Action<PartitionContext, EventData> receiverMessageHandler, EventProcessorOptions receiverMessageHandlerOptions = null)
198 : {
199 : ReceiverMessageHandler = receiverMessageHandler;
200 : ReceiverMessageHandlerOptions = receiverMessageHandlerOptions;
201 : }
202 :
203 0 : protected override void ApplyReceiverMessageHandler()
204 : {
205 : EventHubReceiver.RegisterEventProcessorFactoryAsync
206 : (
207 : new DefaultEventProcessorFactory<DefaultEventProcessor>
208 : (
209 : new DefaultEventProcessor(Logger, ReceiverMessageHandler)
210 : ),
211 : ReceiverMessageHandlerOptions ?? EventProcessorOptions.DefaultOptions
212 : );
213 : }
214 : }
215 : }
|