Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Linq;
13 : using Cqrs.Authentication;
14 : using Cqrs.Configuration;
15 : using Cqrs.Events;
16 : using cdmdotnet.Logging;
17 : using Cqrs.Bus;
18 : using Cqrs.Messages;
19 : using Microsoft.ServiceBus.Messaging;
20 :
21 : namespace Cqrs.Azure.ServiceBus
22 : {
23 : /// <summary>
24 : /// An <see cref="IEventPublisher{TAuthenticationToken}"/> that resolves handlers and executes the handler.
25 : /// </summary>
26 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
27 : public class AzureEventBusPublisher<TAuthenticationToken>
28 : : AzureEventHubBus<TAuthenticationToken>
29 : , IEventPublisher<TAuthenticationToken>
30 2 : {
31 : /// <summary>
32 : /// Instantiates a new instance of <see cref="AzureEventBusPublisher{TAuthenticationToken}"/>.
33 : /// </summary>
34 : public AzureEventBusPublisher(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IHashAlgorithmFactory hashAlgorithmFactory, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
35 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, hashAlgorithmFactory, azureBusHelper, true)
36 : {
37 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventHub.EventBus.Publisher.UseApplicationInsightTelemetryHelper", correlationIdHelper);
38 : }
39 :
40 : #region Implementation of IEventPublisher<TAuthenticationToken>
41 :
42 : /// <summary>
43 : /// Publishes the provided <paramref name="event"/> on the event bus.
44 : /// </summary>
45 : public virtual void Publish<TEvent>(TEvent @event)
46 : where TEvent : IEvent<TAuthenticationToken>
47 : {
48 : if (@event == null)
49 : {
50 : Logger.LogDebug("No event to publish.");
51 : return;
52 : }
53 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
54 : Stopwatch mainStopWatch = Stopwatch.StartNew();
55 : string responseCode = "200";
56 : bool wasSuccessfull = false;
57 :
58 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
59 : string telemetryName = string.Format("{0}/{1}/{2}", @event.GetType().FullName, @event.GetIdentity(), @event.Id);
60 : var telemeteredEvent = @event as ITelemeteredMessage;
61 : if (telemeteredEvent != null)
62 : telemetryName = telemeteredEvent.TelemetryName;
63 : telemetryName = string.Format("Event/{0}", telemetryName);
64 :
65 : try
66 : {
67 : if (!AzureBusHelper.PrepareAndValidateEvent(@event, "Azure-EventHub"))
68 : return;
69 :
70 : try
71 : {
72 : var brokeredMessage = CreateBrokeredMessage(MessageSerialiser.SerialiseEvent, @event.GetType(), @event);
73 :
74 : EventHubPublisher.Send(brokeredMessage);
75 : wasSuccessfull = true;
76 : }
77 : catch (QuotaExceededException exception)
78 : {
79 : responseCode = "429";
80 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
81 : throw;
82 : }
83 : catch (Exception exception)
84 : {
85 : responseCode = "500";
86 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
87 : throw;
88 : }
89 : }
90 : finally
91 : {
92 : TelemetryHelper.TrackDependency("Azure/EventHub/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
93 : }
94 : Logger.LogInfo(string.Format("An event was published with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
95 : }
96 :
97 : /// <summary>
98 : /// Publishes the provided <paramref name="events"/> on the event bus.
99 : /// </summary>
100 : public virtual void Publish<TEvent>(IEnumerable<TEvent> events)
101 : where TEvent : IEvent<TAuthenticationToken>
102 : {
103 : if (events == null)
104 : {
105 : Logger.LogDebug("No events to publish.");
106 : return;
107 : }
108 : IList<TEvent> sourceEvents = events.ToList();
109 : if (!sourceEvents.Any())
110 : {
111 : Logger.LogDebug("An empty collection of events to publish.");
112 : return;
113 : }
114 :
115 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
116 : Stopwatch mainStopWatch = Stopwatch.StartNew();
117 : string responseCode = "200";
118 : bool wasSuccessfull = false;
119 :
120 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
121 : string telemetryName = "Events";
122 : string telemetryNames = string.Empty;
123 : foreach (TEvent @event in sourceEvents)
124 : {
125 : string subTelemetryName = string.Format("{0}/{1}/{2}", @event.GetType().FullName, @event.GetIdentity(), @event.Id);
126 : var telemeteredEvent = @event as ITelemeteredMessage;
127 : if (telemeteredEvent != null)
128 : subTelemetryName = telemeteredEvent.TelemetryName;
129 : telemetryNames = string.Format("{0}{1},", telemetryNames, subTelemetryName);
130 : }
131 : if (telemetryNames.Length > 0)
132 : telemetryNames = telemetryNames.Substring(0, telemetryNames.Length - 1);
133 : telemetryProperties.Add("Events", telemetryNames);
134 :
135 : try
136 : {
137 : IList<string> sourceEventMessages = new List<string>();
138 : IList<Microsoft.ServiceBus.Messaging.EventData> brokeredMessages = new List<Microsoft.ServiceBus.Messaging.EventData>(sourceEvents.Count);
139 : foreach (TEvent @event in sourceEvents)
140 : {
141 : if (!AzureBusHelper.PrepareAndValidateEvent(@event, "Azure-EventHub"))
142 : continue;
143 :
144 : var brokeredMessage = CreateBrokeredMessage(MessageSerialiser.SerialiseEvent, @event.GetType(), @event);
145 :
146 : brokeredMessages.Add(brokeredMessage);
147 : sourceEventMessages.Add(string.Format("A command was sent of type {0}.", @event.GetType().FullName));
148 : }
149 :
150 : try
151 : {
152 : if (brokeredMessages.Any())
153 : EventHubPublisher.SendBatch(brokeredMessages);
154 : else
155 : Logger.LogDebug("An empty collection of events to publish post validation.");
156 : wasSuccessfull = true;
157 : }
158 : catch (QuotaExceededException exception)
159 : {
160 : responseCode = "429";
161 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Events", sourceEvents } });
162 : throw;
163 : }
164 : catch (Exception exception)
165 : {
166 : responseCode = "500";
167 : Logger.LogError("An issue occurred while trying to publish a event.", exception: exception, metaData: new Dictionary<string, object> { { "Events", sourceEvents } });
168 : throw;
169 : }
170 :
171 : foreach (string message in sourceEventMessages)
172 : Logger.LogInfo(message);
173 :
174 : wasSuccessfull = true;
175 : }
176 : finally
177 : {
178 : mainStopWatch.Stop();
179 : TelemetryHelper.TrackDependency("Azure/EventHub/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
180 : }
181 : }
182 :
183 : #endregion
184 : }
185 : }
|