Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Linq;
13 : using Chinchilla.Logging;
14 : using Cqrs.Authentication;
15 : using Cqrs.Bus;
16 : using Cqrs.Configuration;
17 : using Cqrs.Events;
18 : using Cqrs.Messages;
19 : #if NET452
20 : using Microsoft.ServiceBus.Messaging;
21 : using EventData = Microsoft.ServiceBus.Messaging.EventData;
22 : #endif
23 : #if NETCOREAPP3_0
24 : using Microsoft.Azure.EventHubs;
25 : using Microsoft.Azure.EventHubs.Processor;
26 : using EventData = Microsoft.Azure.EventHubs.EventData;
27 : #endif
28 :
29 : namespace Cqrs.Azure.ServiceBus
30 : {
31 : /// <summary>
32 : /// An <see cref="IEventPublisher{TAuthenticationToken}"/> that resolves handlers and executes the handler.
33 : /// </summary>
34 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
35 : public class AzureEventBusPublisher<TAuthenticationToken>
36 : : AzureEventHubBus<TAuthenticationToken>
37 : , IEventPublisher<TAuthenticationToken>
38 2 : {
39 : /// <summary>
40 : /// Instantiates a new instance of <see cref="AzureEventBusPublisher{TAuthenticationToken}"/>.
41 : /// </summary>
42 : public AzureEventBusPublisher(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IHashAlgorithmFactory hashAlgorithmFactory, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
43 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, hashAlgorithmFactory, azureBusHelper, true)
44 : {
45 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventHub.EventBus.Publisher.UseApplicationInsightTelemetryHelper", correlationIdHelper);
46 : }
47 :
48 : #region Implementation of IEventPublisher<TAuthenticationToken>
49 :
50 : /// <summary>
51 : /// Publishes the provided <paramref name="event"/> on the event bus.
52 : /// </summary>
53 : public virtual void Publish<TEvent>(TEvent @event)
54 : where TEvent : IEvent<TAuthenticationToken>
55 : {
56 : if (@event == null)
57 : {
58 : Logger.LogDebug("No event to publish.");
59 : return;
60 : }
61 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
62 : Stopwatch mainStopWatch = Stopwatch.StartNew();
63 : string responseCode = "200";
64 : bool wasSuccessfull = false;
65 :
66 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
67 : string telemetryName = string.Format("{0}/{1}/{2}", @event.GetType().FullName, @event.GetIdentity(), @event.Id);
68 : var telemeteredEvent = @event as ITelemeteredMessage;
69 : if (telemeteredEvent != null)
70 : telemetryName = telemeteredEvent.TelemetryName;
71 : telemetryName = string.Format("Event/{0}", telemetryName);
72 :
73 : try
74 : {
75 : if (!AzureBusHelper.PrepareAndValidateEvent(@event, "Azure-EventHub"))
76 : return;
77 :
78 : try
79 : {
80 : var brokeredMessage = CreateBrokeredMessage(MessageSerialiser.SerialiseEvent, @event.GetType(), @event);
81 :
82 : #if NET452
83 : EventHubPublisher.Send(brokeredMessage);
84 : #endif
85 : #if NETCOREAPP3_0
86 : EventHubPublisher.SendAsync(brokeredMessage).Wait();
87 : #endif
88 : wasSuccessfull = true;
89 : }
90 : catch (QuotaExceededException exception)
91 : {
92 : responseCode = "429";
93 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
94 : throw;
95 : }
96 : catch (Exception exception)
97 : {
98 : responseCode = "500";
99 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
100 : throw;
101 : }
102 : }
103 : finally
104 : {
105 : TelemetryHelper.TrackDependency("Azure/EventHub/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
106 : }
107 : Logger.LogInfo(string.Format("An event was published with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
108 : }
109 :
110 : /// <summary>
111 : /// Publishes the provided <paramref name="events"/> on the event bus.
112 : /// </summary>
113 : public virtual void Publish<TEvent>(IEnumerable<TEvent> events)
114 : where TEvent : IEvent<TAuthenticationToken>
115 : {
116 : if (events == null)
117 : {
118 : Logger.LogDebug("No events to publish.");
119 : return;
120 : }
121 : IList<TEvent> sourceEvents = events.ToList();
122 : if (!sourceEvents.Any())
123 : {
124 : Logger.LogDebug("An empty collection of events to publish.");
125 : return;
126 : }
127 :
128 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
129 : Stopwatch mainStopWatch = Stopwatch.StartNew();
130 : string responseCode = "200";
131 : bool wasSuccessfull = false;
132 :
133 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
134 : string telemetryName = "Events";
135 : string telemetryNames = string.Empty;
136 : foreach (TEvent @event in sourceEvents)
137 : {
138 : string subTelemetryName = string.Format("{0}/{1}/{2}", @event.GetType().FullName, @event.GetIdentity(), @event.Id);
139 : var telemeteredEvent = @event as ITelemeteredMessage;
140 : if (telemeteredEvent != null)
141 : subTelemetryName = telemeteredEvent.TelemetryName;
142 : telemetryNames = string.Format("{0}{1},", telemetryNames, subTelemetryName);
143 : }
144 : if (telemetryNames.Length > 0)
145 : telemetryNames = telemetryNames.Substring(0, telemetryNames.Length - 1);
146 : telemetryProperties.Add("Events", telemetryNames);
147 :
148 : try
149 : {
150 : IList<string> sourceEventMessages = new List<string>();
151 : IList<EventData> brokeredMessages = new List<EventData>(sourceEvents.Count);
152 : foreach (TEvent @event in sourceEvents)
153 : {
154 : if (!AzureBusHelper.PrepareAndValidateEvent(@event, "Azure-EventHub"))
155 : continue;
156 :
157 : var brokeredMessage = CreateBrokeredMessage(MessageSerialiser.SerialiseEvent, @event.GetType(), @event);
158 :
159 : brokeredMessages.Add(brokeredMessage);
160 : sourceEventMessages.Add(string.Format("A command was sent of type {0}.", @event.GetType().FullName));
161 : }
162 :
163 : try
164 : {
165 : if (brokeredMessages.Any())
166 : {
167 : #if NET452
168 : EventHubPublisher.SendBatch(brokeredMessages);
169 : #endif
170 : #if NETCOREAPP3_0
171 : EventHubPublisher.SendAsync(brokeredMessages).Wait();
172 : #endif
173 : }
174 : else
175 : Logger.LogDebug("An empty collection of events to publish post validation.");
176 : wasSuccessfull = true;
177 : }
178 : catch (QuotaExceededException exception)
179 : {
180 : responseCode = "429";
181 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Events", sourceEvents } });
182 : throw;
183 : }
184 : catch (Exception exception)
185 : {
186 : responseCode = "500";
187 : Logger.LogError("An issue occurred while trying to publish a event.", exception: exception, metaData: new Dictionary<string, object> { { "Events", sourceEvents } });
188 : throw;
189 : }
190 :
191 : foreach (string message in sourceEventMessages)
192 : Logger.LogInfo(message);
193 :
194 : wasSuccessfull = true;
195 : }
196 : finally
197 : {
198 : mainStopWatch.Stop();
199 : TelemetryHelper.TrackDependency("Azure/EventHub/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
200 : }
201 : }
202 :
203 : #endregion
204 : }
205 : }
|