Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Linq;
13 : using cdmdotnet.Logging;
14 : using Cqrs.Authentication;
15 : using Cqrs.Bus;
16 : using Cqrs.Configuration;
17 : using Cqrs.Events;
18 : using Microsoft.ServiceBus.Messaging;
19 : using Cqrs.Messages;
20 :
21 : namespace Cqrs.Azure.ServiceBus
22 : {
23 : public class AzureEventBusPublisher<TAuthenticationToken> : AzureEventBus<TAuthenticationToken>, IEventPublisher<TAuthenticationToken>
24 0 : {
25 0 : public AzureEventBusPublisher(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper, IBusHelper busHelper)
26 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, busHelper, true)
27 : {
28 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventBus.Publisher.UseApplicationInsightTelemetryHelper", correlationIdHelper);
29 : }
30 :
31 : #region Implementation of IEventPublisher<TAuthenticationToken>
32 :
33 2 : public virtual void Publish<TEvent>(TEvent @event)
34 : where TEvent : IEvent<TAuthenticationToken>
35 : {
36 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
37 : Stopwatch mainStopWatch = Stopwatch.StartNew();
38 : string responseCode = null;
39 : bool mainWasSuccessfull = false;
40 : bool telemeterOverall = false;
41 :
42 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
43 : string telemetryName = string.Format("{0}/{1}", @event.GetType().FullName, @event.Id);
44 : var telemeteredEvent = @event as ITelemeteredMessage;
45 : if (telemeteredEvent != null)
46 : telemetryName = telemeteredEvent.TelemetryName;
47 : telemetryName = string.Format("Event/{0}", telemetryName);
48 :
49 : try
50 : {
51 : if (!AzureBusHelper.PrepareAndValidateEvent(@event, "Azure-ServiceBus"))
52 : return;
53 :
54 : var privateEventAttribute = Attribute.GetCustomAttribute(typeof(TEvent), typeof(PrivateEventAttribute)) as PrivateEventAttribute;
55 : var publicEventAttribute = Attribute.GetCustomAttribute(typeof(TEvent), typeof(PrivateEventAttribute)) as PublicEventAttribute;
56 :
57 : // We only add telemetry for overall operations if two occured
58 : telemeterOverall = publicEventAttribute != null && privateEventAttribute != null;
59 :
60 : // Backwards compatibility and simplicity
61 : bool wasSuccessfull;
62 : Stopwatch stopWatch = Stopwatch.StartNew();
63 : if (publicEventAttribute == null && privateEventAttribute == null)
64 : {
65 : stopWatch.Restart();
66 : responseCode = "200";
67 : wasSuccessfull = false;
68 : try
69 : {
70 : var brokeredMessage = new BrokeredMessage(MessageSerialiser.SerialiseEvent(@event))
71 : {
72 : CorrelationId = CorrelationIdHelper.GetCorrelationId().ToString("N")
73 : };
74 : brokeredMessage.Properties.Add("Type", @event.GetType().FullName);
75 : PublicServiceBusPublisher.Send(brokeredMessage);
76 : wasSuccessfull = true;
77 : }
78 : catch (QuotaExceededException exception)
79 : {
80 : responseCode = "429";
81 : Logger.LogError("The size of the event being sent was too large or the topic has reached it's limit.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
82 : throw;
83 : }
84 : catch (Exception exception)
85 : {
86 : responseCode = "500";
87 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
88 : throw;
89 : }
90 : finally
91 : {
92 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, "Default Bus", startedAt, stopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
93 : }
94 : Logger.LogDebug(string.Format("An event was published on the public bus with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
95 : }
96 : if (publicEventAttribute != null)
97 : {
98 : stopWatch.Restart();
99 : responseCode = "200";
100 : wasSuccessfull = false;
101 : try
102 : {
103 : var brokeredMessage = new BrokeredMessage(MessageSerialiser.SerialiseEvent(@event))
104 : {
105 : CorrelationId = CorrelationIdHelper.GetCorrelationId().ToString("N")
106 : };
107 : brokeredMessage.Properties.Add("Type", @event.GetType().FullName);
108 : PublicServiceBusPublisher.Send(brokeredMessage);
109 : wasSuccessfull = true;
110 : }
111 : catch (QuotaExceededException exception)
112 : {
113 : responseCode = "429";
114 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
115 : throw;
116 : }
117 : catch (Exception exception)
118 : {
119 : responseCode = "500";
120 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
121 : throw;
122 : }
123 : finally
124 : {
125 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, "Public Bus", startedAt, stopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
126 : }
127 : Logger.LogDebug(string.Format("An event was published on the public bus with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
128 : }
129 : if (privateEventAttribute != null)
130 : {
131 : stopWatch.Restart();
132 : responseCode = "200";
133 : wasSuccessfull = false;
134 : try
135 : {
136 : var brokeredMessage = new BrokeredMessage(MessageSerialiser.SerialiseEvent(@event))
137 : {
138 : CorrelationId = CorrelationIdHelper.GetCorrelationId().ToString("N")
139 : };
140 : brokeredMessage.Properties.Add("Type", @event.GetType().FullName);
141 : PrivateServiceBusPublisher.Send(brokeredMessage);
142 : wasSuccessfull = true;
143 : }
144 : catch (QuotaExceededException exception)
145 : {
146 : responseCode = "429";
147 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
148 : throw;
149 : }
150 : catch (Exception exception)
151 : {
152 : responseCode = "500";
153 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
154 : throw;
155 : }
156 : finally
157 : {
158 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, "Private Bus", startedAt, stopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
159 : }
160 :
161 : Logger.LogDebug(string.Format("An event was published on the private bus with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
162 : }
163 : mainWasSuccessfull = true;
164 : }
165 : finally
166 : {
167 : mainStopWatch.Stop();
168 : if (telemeterOverall)
169 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, mainWasSuccessfull, telemetryProperties);
170 : }
171 : }
172 :
173 2 : public virtual void Publish<TEvent>(IEnumerable<TEvent> events)
174 : where TEvent : IEvent<TAuthenticationToken>
175 : {
176 : IList<TEvent> sourceEvents = events.ToList();
177 :
178 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
179 : Stopwatch mainStopWatch = Stopwatch.StartNew();
180 : string responseCode = null;
181 : bool mainWasSuccessfull = false;
182 :
183 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/Servicebus" } };
184 : string telemetryName = "Events";
185 : string telemetryNames = string.Empty;
186 : foreach (TEvent @event in sourceEvents)
187 : {
188 : string subTelemetryName = string.Format("{0}/{1}", @event.GetType().FullName, @event.Id);
189 : var telemeteredEvent = @event as ITelemeteredMessage;
190 : if (telemeteredEvent != null)
191 : subTelemetryName = telemeteredEvent.TelemetryName;
192 : telemetryNames = string.Format("{0}{1},", telemetryNames, subTelemetryName);
193 : }
194 : if (telemetryNames.Length > 0)
195 : telemetryNames = telemetryNames.Substring(0, telemetryNames.Length - 1);
196 : telemetryProperties.Add("Events", telemetryNames);
197 :
198 : try
199 : {
200 : IList<string> sourceEventMessages = new List<string>();
201 : IList<BrokeredMessage> privateBrokeredMessages = new List<BrokeredMessage>(sourceEvents.Count);
202 : IList<BrokeredMessage> publicBrokeredMessages = new List<BrokeredMessage>(sourceEvents.Count);
203 : foreach (TEvent @event in sourceEvents)
204 : {
205 : if (!AzureBusHelper.PrepareAndValidateEvent(@event, "Azure-ServiceBus"))
206 : continue;
207 :
208 : var brokeredMessage = new BrokeredMessage(MessageSerialiser.SerialiseEvent(@event))
209 : {
210 : CorrelationId = CorrelationIdHelper.GetCorrelationId().ToString("N")
211 : };
212 : brokeredMessage.Properties.Add("Type", @event.GetType().FullName);
213 :
214 : var privateEventAttribute = Attribute.GetCustomAttribute(typeof(TEvent), typeof(PrivateEventAttribute)) as PrivateEventAttribute;
215 : var publicEventAttribute = Attribute.GetCustomAttribute(typeof(TEvent), typeof(PrivateEventAttribute)) as PublicEventAttribute;
216 :
217 : if
218 : (
219 : // Backwards compatibility and simplicity
220 : (publicEventAttribute == null && privateEventAttribute == null)
221 : ||
222 : publicEventAttribute != null
223 : )
224 : {
225 : publicBrokeredMessages.Add(brokeredMessage);
226 : sourceEventMessages.Add(string.Format("An event was published on the public bus with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
227 : }
228 : if (privateEventAttribute != null)
229 : {
230 : privateBrokeredMessages.Add(brokeredMessage);
231 : sourceEventMessages.Add(string.Format("An event was published on the private bus with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
232 : }
233 : }
234 :
235 : bool wasSuccessfull;
236 : Stopwatch stopWatch = Stopwatch.StartNew();
237 :
238 : // Backwards compatibility and simplicity
239 : stopWatch.Restart();
240 : responseCode = "200";
241 : wasSuccessfull = false;
242 : try
243 : {
244 : PublicServiceBusPublisher.SendBatch(publicBrokeredMessages);
245 : wasSuccessfull = true;
246 : }
247 : catch (QuotaExceededException exception)
248 : {
249 : responseCode = "429";
250 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Events", publicBrokeredMessages } });
251 : throw;
252 : }
253 : catch (Exception exception)
254 : {
255 : responseCode = "500";
256 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Events", publicBrokeredMessages } });
257 : throw;
258 : }
259 : finally
260 : {
261 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, "Public Bus", startedAt, stopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
262 : }
263 :
264 : stopWatch.Restart();
265 : responseCode = "200";
266 : wasSuccessfull = false;
267 : try
268 : {
269 : PrivateServiceBusPublisher.SendBatch(privateBrokeredMessages);
270 : wasSuccessfull = true;
271 : }
272 : catch (QuotaExceededException exception)
273 : {
274 : responseCode = "429";
275 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Events", privateBrokeredMessages } });
276 : throw;
277 : }
278 : catch (Exception exception)
279 : {
280 : responseCode = "500";
281 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Events", privateBrokeredMessages } });
282 : throw;
283 : }
284 : finally
285 : {
286 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, "Private Bus", startedAt, stopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
287 : }
288 :
289 : foreach (string message in sourceEventMessages)
290 : Logger.LogInfo(message);
291 :
292 : mainWasSuccessfull = true;
293 : }
294 : finally
295 : {
296 : mainStopWatch.Stop();
297 : TelemetryHelper.TrackDependency("Azure/Servicebus/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, mainWasSuccessfull, telemetryProperties);
298 : }
299 : }
300 :
301 : #endregion
302 : }
303 : }
|