Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Globalization;
13 : using System.Linq;
14 : using cdmdotnet.Logging;
15 : using Cqrs.Authentication;
16 : using Cqrs.Bus;
17 : using Cqrs.Configuration;
18 : using Cqrs.Events;
19 : using Microsoft.ServiceBus.Messaging;
20 : using Cqrs.Messages;
21 :
22 : namespace Cqrs.Azure.ServiceBus
23 : {
24 : /// <summary>
25 : /// An <see cref="IEventPublisher{TAuthenticationToken}"/> that resolves handlers and executes the handler.
26 : /// </summary>
27 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
28 : // The “,nq” suffix here just asks the expression evaluator to remove the quotes when displaying the final value (nq = no quotes).
29 : [DebuggerDisplay("{DebuggerDisplay,nq}")]
30 : public class AzureEventBusPublisher<TAuthenticationToken>
31 : : AzureEventBus<TAuthenticationToken>
32 : , IEventPublisher<TAuthenticationToken>
33 2 : {
34 : /// <summary>
35 : /// Instantiates a new instance of <see cref="AzureEventBusPublisher{TAuthenticationToken}"/>.
36 : /// </summary>
37 2 : public AzureEventBusPublisher(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper)
38 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, busHelper, true)
39 : {
40 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventBus.Publisher.UseApplicationInsightTelemetryHelper", correlationIdHelper);
41 : }
42 :
43 : /// <summary>
44 : /// The debugger variable window value.
45 : /// </summary>
46 : internal string DebuggerDisplay
47 : {
48 : get
49 : {
50 : string connectionString = string.Format("ConnectionString : {0}", MessageBusConnectionStringConfigurationKey);
51 : try
52 : {
53 : connectionString = string.Concat(connectionString, "=", GetConnectionString());
54 : }
55 : catch { /* */ }
56 : return string.Format(CultureInfo.InvariantCulture, "{0}, PrivateTopicName : {1}, PrivateTopicSubscriptionName : {2}, PublicTopicName : {3}, PublicTopicSubscriptionName : {4}",
57 : connectionString, PrivateTopicName, PrivateTopicSubscriptionName, PublicTopicName, PublicTopicSubscriptionName);
58 : }
59 : }
60 :
61 : #region Implementation of IEventPublisher<TAuthenticationToken>
62 :
63 : /// <summary>
64 : /// Publishes the provided <paramref name="event"/> on the event bus.
65 : /// </summary>
66 2 : public virtual void Publish<TEvent>(TEvent @event)
67 : where TEvent : IEvent<TAuthenticationToken>
68 : {
69 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
70 : Stopwatch mainStopWatch = Stopwatch.StartNew();
71 : string responseCode = null;
72 : bool mainWasSuccessfull = false;
73 : bool telemeterOverall = false;
74 :
75 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
76 : string telemetryName = string.Format("{0}/{1}", @event.GetType().FullName, @event.Id);
77 : var telemeteredEvent = @event as ITelemeteredMessage;
78 : if (telemeteredEvent != null)
79 : telemetryName = telemeteredEvent.TelemetryName;
80 : telemetryName = string.Format("Event/{0}", telemetryName);
81 :
82 : try
83 : {
84 : if (!AzureBusHelper.PrepareAndValidateEvent(@event, "Azure-ServiceBus"))
85 : return;
86 :
87 : var privateEventAttribute = Attribute.GetCustomAttribute(typeof(TEvent), typeof(PrivateEventAttribute)) as PrivateEventAttribute;
88 : var publicEventAttribute = Attribute.GetCustomAttribute(typeof(TEvent), typeof(PrivateEventAttribute)) as PublicEventAttribute;
89 :
90 : // We only add telemetry for overall operations if two occured
91 : telemeterOverall = publicEventAttribute != null && privateEventAttribute != null;
92 :
93 : // Backwards compatibility and simplicity
94 : bool wasSuccessfull;
95 : Stopwatch stopWatch = Stopwatch.StartNew();
96 : if (publicEventAttribute == null && privateEventAttribute == null)
97 : {
98 : stopWatch.Restart();
99 : responseCode = "200";
100 : wasSuccessfull = false;
101 : try
102 : {
103 : var brokeredMessage = new BrokeredMessage(MessageSerialiser.SerialiseEvent(@event))
104 : {
105 : CorrelationId = CorrelationIdHelper.GetCorrelationId().ToString("N")
106 : };
107 : brokeredMessage.Properties.Add("Type", @event.GetType().FullName);
108 : PublicServiceBusPublisher.Send(brokeredMessage);
109 : wasSuccessfull = true;
110 : }
111 : catch (QuotaExceededException exception)
112 : {
113 : responseCode = "429";
114 : Logger.LogError("The size of the event being sent was too large or the topic has reached it's limit.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
115 : throw;
116 : }
117 : catch (Exception exception)
118 : {
119 : responseCode = "500";
120 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
121 : throw;
122 : }
123 : finally
124 : {
125 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, "Default Bus", startedAt, stopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
126 : }
127 : Logger.LogDebug(string.Format("An event was published on the public bus with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
128 : }
129 : if (publicEventAttribute != null)
130 : {
131 : stopWatch.Restart();
132 : responseCode = "200";
133 : wasSuccessfull = false;
134 : try
135 : {
136 : var brokeredMessage = new BrokeredMessage(MessageSerialiser.SerialiseEvent(@event))
137 : {
138 : CorrelationId = CorrelationIdHelper.GetCorrelationId().ToString("N")
139 : };
140 : brokeredMessage.Properties.Add("Type", @event.GetType().FullName);
141 : PublicServiceBusPublisher.Send(brokeredMessage);
142 : wasSuccessfull = true;
143 : }
144 : catch (QuotaExceededException exception)
145 : {
146 : responseCode = "429";
147 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
148 : throw;
149 : }
150 : catch (Exception exception)
151 : {
152 : responseCode = "500";
153 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
154 : throw;
155 : }
156 : finally
157 : {
158 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, "Public Bus", startedAt, stopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
159 : }
160 : Logger.LogDebug(string.Format("An event was published on the public bus with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
161 : }
162 : if (privateEventAttribute != null)
163 : {
164 : stopWatch.Restart();
165 : responseCode = "200";
166 : wasSuccessfull = false;
167 : try
168 : {
169 : var brokeredMessage = new BrokeredMessage(MessageSerialiser.SerialiseEvent(@event))
170 : {
171 : CorrelationId = CorrelationIdHelper.GetCorrelationId().ToString("N")
172 : };
173 : brokeredMessage.Properties.Add("Type", @event.GetType().FullName);
174 : PrivateServiceBusPublisher.Send(brokeredMessage);
175 : wasSuccessfull = true;
176 : }
177 : catch (QuotaExceededException exception)
178 : {
179 : responseCode = "429";
180 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
181 : throw;
182 : }
183 : catch (Exception exception)
184 : {
185 : responseCode = "500";
186 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
187 : throw;
188 : }
189 : finally
190 : {
191 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, "Private Bus", startedAt, stopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
192 : }
193 :
194 : Logger.LogDebug(string.Format("An event was published on the private bus with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
195 : }
196 : mainWasSuccessfull = true;
197 : }
198 : finally
199 : {
200 : mainStopWatch.Stop();
201 : if (telemeterOverall)
202 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, mainWasSuccessfull, telemetryProperties);
203 : }
204 : }
205 :
206 : /// <summary>
207 : /// Publishes the provided <paramref name="events"/> on the event bus.
208 : /// </summary>
209 2 : public virtual void Publish<TEvent>(IEnumerable<TEvent> events)
210 : where TEvent : IEvent<TAuthenticationToken>
211 : {
212 : IList<TEvent> sourceEvents = events.ToList();
213 :
214 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
215 : Stopwatch mainStopWatch = Stopwatch.StartNew();
216 : string responseCode = null;
217 : bool mainWasSuccessfull = false;
218 :
219 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
220 : string telemetryName = "Events";
221 : string telemetryNames = string.Empty;
222 : foreach (TEvent @event in sourceEvents)
223 : {
224 : string subTelemetryName = string.Format("{0}/{1}", @event.GetType().FullName, @event.Id);
225 : var telemeteredEvent = @event as ITelemeteredMessage;
226 : if (telemeteredEvent != null)
227 : subTelemetryName = telemeteredEvent.TelemetryName;
228 : telemetryNames = string.Format("{0}{1},", telemetryNames, subTelemetryName);
229 : }
230 : if (telemetryNames.Length > 0)
231 : telemetryNames = telemetryNames.Substring(0, telemetryNames.Length - 1);
232 : telemetryProperties.Add("Events", telemetryNames);
233 :
234 : try
235 : {
236 : IList<string> sourceEventMessages = new List<string>();
237 : IList<BrokeredMessage> privateBrokeredMessages = new List<BrokeredMessage>(sourceEvents.Count);
238 : IList<BrokeredMessage> publicBrokeredMessages = new List<BrokeredMessage>(sourceEvents.Count);
239 : foreach (TEvent @event in sourceEvents)
240 : {
241 : if (!AzureBusHelper.PrepareAndValidateEvent(@event, "Azure-ServiceBus"))
242 : continue;
243 :
244 : var brokeredMessage = new BrokeredMessage(MessageSerialiser.SerialiseEvent(@event))
245 : {
246 : CorrelationId = CorrelationIdHelper.GetCorrelationId().ToString("N")
247 : };
248 : brokeredMessage.Properties.Add("Type", @event.GetType().FullName);
249 :
250 : var privateEventAttribute = Attribute.GetCustomAttribute(typeof(TEvent), typeof(PrivateEventAttribute)) as PrivateEventAttribute;
251 : var publicEventAttribute = Attribute.GetCustomAttribute(typeof(TEvent), typeof(PrivateEventAttribute)) as PublicEventAttribute;
252 :
253 : if
254 : (
255 : // Backwards compatibility and simplicity
256 : (publicEventAttribute == null && privateEventAttribute == null)
257 : ||
258 : publicEventAttribute != null
259 : )
260 : {
261 : publicBrokeredMessages.Add(brokeredMessage);
262 : sourceEventMessages.Add(string.Format("An event was published on the public bus with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
263 : }
264 : if (privateEventAttribute != null)
265 : {
266 : privateBrokeredMessages.Add(brokeredMessage);
267 : sourceEventMessages.Add(string.Format("An event was published on the private bus with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
268 : }
269 : }
270 :
271 : bool wasSuccessfull;
272 : Stopwatch stopWatch = Stopwatch.StartNew();
273 :
274 : // Backwards compatibility and simplicity
275 : stopWatch.Restart();
276 : responseCode = "200";
277 : wasSuccessfull = false;
278 : try
279 : {
280 : PublicServiceBusPublisher.SendBatch(publicBrokeredMessages);
281 : wasSuccessfull = true;
282 : }
283 : catch (QuotaExceededException exception)
284 : {
285 : responseCode = "429";
286 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Events", publicBrokeredMessages } });
287 : throw;
288 : }
289 : catch (Exception exception)
290 : {
291 : responseCode = "500";
292 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Events", publicBrokeredMessages } });
293 : throw;
294 : }
295 : finally
296 : {
297 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, "Public Bus", startedAt, stopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
298 : }
299 :
300 : stopWatch.Restart();
301 : responseCode = "200";
302 : wasSuccessfull = false;
303 : try
304 : {
305 : PrivateServiceBusPublisher.SendBatch(privateBrokeredMessages);
306 : wasSuccessfull = true;
307 : }
308 : catch (QuotaExceededException exception)
309 : {
310 : responseCode = "429";
311 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Events", privateBrokeredMessages } });
312 : throw;
313 : }
314 : catch (Exception exception)
315 : {
316 : responseCode = "500";
317 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Events", privateBrokeredMessages } });
318 : throw;
319 : }
320 : finally
321 : {
322 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, "Private Bus", startedAt, stopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
323 : }
324 :
325 : foreach (string message in sourceEventMessages)
326 : Logger.LogInfo(message);
327 :
328 : mainWasSuccessfull = true;
329 : }
330 : finally
331 : {
332 : mainStopWatch.Stop();
333 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, mainWasSuccessfull, telemetryProperties);
334 : }
335 : }
336 :
337 : #endregion
338 : }
339 : }
|