Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Linq;
13 : using System.Text;
14 : using Cqrs.Authentication;
15 : using Cqrs.Configuration;
16 : using Cqrs.Events;
17 : using cdmdotnet.Logging;
18 : using Cqrs.Messages;
19 : using Microsoft.ServiceBus.Messaging;
20 :
21 : namespace Cqrs.Azure.ServiceBus
22 : {
23 : /// <summary>
24 : /// An <see cref="IEventPublisher{TAuthenticationToken}"/> that resolves handlers and executes the handler.
25 : /// </summary>
26 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
27 : public class AzureEventBusPublisher<TAuthenticationToken>
28 : : AzureEventHubBus<TAuthenticationToken>
29 : , IEventPublisher<TAuthenticationToken>
30 : {
31 : /// <summary>
32 : /// Instantiates a new instance of <see cref="AzureEventBusPublisher{TAuthenticationToken}"/>.
33 : /// </summary>
34 2 : public AzureEventBusPublisher(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
35 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, true)
36 : {
37 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventHub.EventBus.Publisher.UseApplicationInsightTelemetryHelper", correlationIdHelper);
38 : }
39 :
40 : #region Implementation of IEventPublisher<TAuthenticationToken>
41 :
42 : /// <summary>
43 : /// Publishes the provided <paramref name="event"/> on the event bus.
44 : /// </summary>
45 2 : public virtual void Publish<TEvent>(TEvent @event)
46 : where TEvent : IEvent<TAuthenticationToken>
47 : {
48 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
49 : Stopwatch mainStopWatch = Stopwatch.StartNew();
50 : string responseCode = "200";
51 : bool wasSuccessfull = false;
52 :
53 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
54 : string telemetryName = string.Format("{0}/{1}", @event.GetType().FullName, @event.Id);
55 : var telemeteredEvent = @event as ITelemeteredMessage;
56 : if (telemeteredEvent != null)
57 : telemetryName = telemeteredEvent.TelemetryName;
58 : telemetryName = string.Format("Event/{0}", telemetryName);
59 :
60 : try
61 : {
62 : if (!AzureBusHelper.PrepareAndValidateEvent(@event, "Azure-EventHub"))
63 : return;
64 :
65 : try
66 : {
67 : var brokeredMessage = new Microsoft.ServiceBus.Messaging.EventData(Encoding.UTF8.GetBytes(MessageSerialiser.SerialiseEvent(@event)));
68 : brokeredMessage.Properties.Add("Type", @event.GetType().FullName);
69 :
70 : EventHubPublisher.Send(brokeredMessage);
71 : wasSuccessfull = true;
72 : }
73 : catch (QuotaExceededException exception)
74 : {
75 : responseCode = "429";
76 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
77 : throw;
78 : }
79 : catch (Exception exception)
80 : {
81 : responseCode = "500";
82 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
83 : throw;
84 : }
85 : }
86 : finally
87 : {
88 : TelemetryHelper.TrackDependency("Azure/EventHub/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
89 : }
90 : Logger.LogInfo(string.Format("An event was published with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
91 : }
92 :
93 : /// <summary>
94 : /// Publishes the provided <paramref name="events"/> on the event bus.
95 : /// </summary>
96 2 : public virtual void Publish<TEvent>(IEnumerable<TEvent> events)
97 : where TEvent : IEvent<TAuthenticationToken>
98 : {
99 : IList<TEvent> sourceEvents = events.ToList();
100 :
101 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
102 : Stopwatch mainStopWatch = Stopwatch.StartNew();
103 : string responseCode = "200";
104 : bool wasSuccessfull = false;
105 :
106 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
107 : string telemetryName = "Events";
108 : string telemetryNames = string.Empty;
109 : foreach (TEvent @event in sourceEvents)
110 : {
111 : string subTelemetryName = string.Format("{0}/{1}", @event.GetType().FullName, @event.Id);
112 : var telemeteredEvent = @event as ITelemeteredMessage;
113 : if (telemeteredEvent != null)
114 : subTelemetryName = telemeteredEvent.TelemetryName;
115 : telemetryNames = string.Format("{0}{1},", telemetryNames, subTelemetryName);
116 : }
117 : if (telemetryNames.Length > 0)
118 : telemetryNames = telemetryNames.Substring(0, telemetryNames.Length - 1);
119 : telemetryProperties.Add("Events", telemetryNames);
120 :
121 : try
122 : {
123 : IList<string> sourceEventMessages = new List<string>();
124 : IList<Microsoft.ServiceBus.Messaging.EventData> brokeredMessages = new List<Microsoft.ServiceBus.Messaging.EventData>(sourceEvents.Count);
125 : foreach (TEvent @event in sourceEvents)
126 : {
127 : if (!AzureBusHelper.PrepareAndValidateEvent(@event, "Azure-EventHub"))
128 : continue;
129 :
130 : var brokeredMessage = new Microsoft.ServiceBus.Messaging.EventData(Encoding.UTF8.GetBytes(MessageSerialiser.SerialiseEvent(@event)));
131 : brokeredMessage.Properties.Add("Type", @event.GetType().FullName);
132 :
133 : brokeredMessages.Add(brokeredMessage);
134 : sourceEventMessages.Add(string.Format("A command was sent of type {0}.", @event.GetType().FullName));
135 : }
136 :
137 : try
138 : {
139 : EventHubPublisher.SendBatch(brokeredMessages);
140 : wasSuccessfull = true;
141 : }
142 : catch (QuotaExceededException exception)
143 : {
144 : responseCode = "429";
145 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Events", sourceEvents } });
146 : throw;
147 : }
148 : catch (Exception exception)
149 : {
150 : responseCode = "500";
151 : Logger.LogError("An issue occurred while trying to publish a event.", exception: exception, metaData: new Dictionary<string, object> { { "Events", sourceEvents } });
152 : throw;
153 : }
154 :
155 : foreach (string message in sourceEventMessages)
156 : Logger.LogInfo(message);
157 :
158 : wasSuccessfull = true;
159 : }
160 : finally
161 : {
162 : mainStopWatch.Stop();
163 : TelemetryHelper.TrackDependency("Azure/EventHub/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
164 : }
165 : }
166 :
167 : #endregion
168 : }
169 : }
|