Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Diagnostics;
12 : using System.Linq;
13 : using System.Text;
14 : using Cqrs.Authentication;
15 : using Cqrs.Configuration;
16 : using Cqrs.Events;
17 : using cdmdotnet.Logging;
18 : using Cqrs.Messages;
19 : using Microsoft.ServiceBus.Messaging;
20 :
21 : namespace Cqrs.Azure.ServiceBus
22 : {
23 : public class AzureEventBusPublisher<TAuthenticationToken> : AzureEventHubBus<TAuthenticationToken>, IEventPublisher<TAuthenticationToken>
24 : {
25 0 : public AzureEventBusPublisher(IConfigurationManager configurationManager, IMessageSerialiser<TAuthenticationToken> messageSerialiser, IAuthenticationTokenHelper<TAuthenticationToken> authenticationTokenHelper, ICorrelationIdHelper correlationIdHelper, ILogger logger, IAzureBusHelper<TAuthenticationToken> azureBusHelper)
26 : : base(configurationManager, messageSerialiser, authenticationTokenHelper, correlationIdHelper, logger, azureBusHelper, true)
27 : {
28 : TelemetryHelper = configurationManager.CreateTelemetryHelper("Cqrs.Azure.EventHub.EventBus.Publisher.UseApplicationInsightTelemetryHelper", correlationIdHelper);
29 : }
30 :
31 : #region Implementation of IEventPublisher<TAuthenticationToken>
32 :
33 2 : public virtual void Publish<TEvent>(TEvent @event)
34 : where TEvent : IEvent<TAuthenticationToken>
35 : {
36 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
37 : Stopwatch mainStopWatch = Stopwatch.StartNew();
38 : string responseCode = "200";
39 : bool wasSuccessfull = false;
40 :
41 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
42 : string telemetryName = string.Format("{0}/{1}", @event.GetType().FullName, @event.Id);
43 : var telemeteredEvent = @event as ITelemeteredMessage;
44 : if (telemeteredEvent != null)
45 : telemetryName = telemeteredEvent.TelemetryName;
46 : telemetryName = string.Format("Event/{0}", telemetryName);
47 :
48 : try
49 : {
50 : if (!AzureBusHelper.PrepareAndValidateEvent(@event, "Azure-EventHub"))
51 : return;
52 :
53 : try
54 : {
55 : var brokeredMessage = new Microsoft.ServiceBus.Messaging.EventData(Encoding.UTF8.GetBytes(MessageSerialiser.SerialiseEvent(@event)));
56 : brokeredMessage.Properties.Add("Type", @event.GetType().FullName);
57 :
58 : EventHubPublisher.Send(brokeredMessage);
59 : wasSuccessfull = true;
60 : }
61 : catch (QuotaExceededException exception)
62 : {
63 : responseCode = "429";
64 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
65 : throw;
66 : }
67 : catch (Exception exception)
68 : {
69 : responseCode = "500";
70 : Logger.LogError("An issue occurred while trying to publish an event.", exception: exception, metaData: new Dictionary<string, object> { { "Event", @event } });
71 : throw;
72 : }
73 : }
74 : finally
75 : {
76 : TelemetryHelper.TrackDependency("Azure/EventHub/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
77 : }
78 : Logger.LogInfo(string.Format("An event was published with the id '{0}' was of type {1}.", @event.Id, @event.GetType().FullName));
79 : }
80 :
81 2 : public virtual void Publish<TEvent>(IEnumerable<TEvent> events)
82 : where TEvent : IEvent<TAuthenticationToken>
83 : {
84 : IList<TEvent> sourceEvents = events.ToList();
85 :
86 : DateTimeOffset startedAt = DateTimeOffset.UtcNow;
87 : Stopwatch mainStopWatch = Stopwatch.StartNew();
88 : string responseCode = "200";
89 : bool wasSuccessfull = false;
90 :
91 : IDictionary<string, string> telemetryProperties = new Dictionary<string, string> { { "Type", "Azure/EventHub" } };
92 : string telemetryName = "Events";
93 : string telemetryNames = string.Empty;
94 : foreach (TEvent @event in sourceEvents)
95 : {
96 : string subTelemetryName = string.Format("{0}/{1}", @event.GetType().FullName, @event.Id);
97 : var telemeteredEvent = @event as ITelemeteredMessage;
98 : if (telemeteredEvent != null)
99 : subTelemetryName = telemeteredEvent.TelemetryName;
100 : telemetryNames = string.Format("{0}{1},", telemetryNames, subTelemetryName);
101 : }
102 : if (telemetryNames.Length > 0)
103 : telemetryNames = telemetryNames.Substring(0, telemetryNames.Length - 1);
104 : telemetryProperties.Add("Events", telemetryNames);
105 :
106 : try
107 : {
108 : IList<string> sourceEventMessages = new List<string>();
109 : IList<Microsoft.ServiceBus.Messaging.EventData> brokeredMessages = new List<Microsoft.ServiceBus.Messaging.EventData>(sourceEvents.Count);
110 : foreach (TEvent @event in sourceEvents)
111 : {
112 : if (!AzureBusHelper.PrepareAndValidateEvent(@event, "Azure-EventHub"))
113 : continue;
114 :
115 : var brokeredMessage = new Microsoft.ServiceBus.Messaging.EventData(Encoding.UTF8.GetBytes(MessageSerialiser.SerialiseEvent(@event)));
116 : brokeredMessage.Properties.Add("Type", @event.GetType().FullName);
117 :
118 : brokeredMessages.Add(brokeredMessage);
119 : sourceEventMessages.Add(string.Format("A command was sent of type {0}.", @event.GetType().FullName));
120 : }
121 :
122 : try
123 : {
124 : EventHubPublisher.SendBatch(brokeredMessages);
125 : wasSuccessfull = true;
126 : }
127 : catch (QuotaExceededException exception)
128 : {
129 : responseCode = "429";
130 : Logger.LogError("The size of the event being sent was too large.", exception: exception, metaData: new Dictionary<string, object> { { "Events", sourceEvents } });
131 : throw;
132 : }
133 : catch (Exception exception)
134 : {
135 : responseCode = "500";
136 : Logger.LogError("An issue occurred while trying to publish a event.", exception: exception, metaData: new Dictionary<string, object> { { "Events", sourceEvents } });
137 : throw;
138 : }
139 :
140 : foreach (string message in sourceEventMessages)
141 : Logger.LogInfo(message);
142 :
143 : wasSuccessfull = true;
144 : }
145 : finally
146 : {
147 : mainStopWatch.Stop();
148 : TelemetryHelper.TrackDependency("Azure/EventHub/EventBus", "Event", telemetryName, null, startedAt, mainStopWatch.Elapsed, responseCode, wasSuccessfull, telemetryProperties);
149 : }
150 : }
151 :
152 : #endregion
153 : }
154 : }
|