Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Threading.Tasks;
12 : using Chinchilla.Logging;
13 : #if NET452
14 : using Microsoft.ServiceBus.Messaging;
15 : #endif
16 : #if NETCOREAPP3_0
17 : using Microsoft.Azure.EventHubs;
18 : using Microsoft.Azure.EventHubs.Processor;
19 : #endif
20 :
21 : namespace Cqrs.Azure.ServiceBus
22 : {
23 : /// <summary>
24 : /// A default implementation of <see cref="IEventProcessor"/> suitable for most situations and conditions.
25 : /// </summary>
26 : internal class DefaultEventProcessor : IEventProcessor
27 : {
28 : protected ILogger Logger { get; private set; }
29 :
30 : protected Action<PartitionContext, EventData> ReceiverMessageHandler { get; private set; }
31 :
32 : /// <summary>
33 : /// Initializes a new instance of the <see cref="DefaultEventProcessor"/> class.
34 : /// </summary>
35 1 : public DefaultEventProcessor(ILogger logger, Action<PartitionContext, EventData> receiverMessageHandler)
36 : {
37 : Logger = logger;
38 : ReceiverMessageHandler = receiverMessageHandler;
39 : }
40 :
41 : #region Implementation of IEventProcessor
42 :
43 : /// <summary>
44 : /// Initializes the Event Hub processor instance. This method is called before any event data is passed to this processor instance.
45 : /// </summary>
46 : /// <param name="context">Ownership information for the partition on which this processor instance works. Any attempt to call <see cref="M:Microsoft.ServiceBus.Messaging.PartitionContext.CheckpointAsync"/> will fail during the Open operation.</param>
47 : /// <returns>
48 : /// The task that indicates that the Open operation is complete.
49 : /// </returns>
50 1 : public virtual Task OpenAsync(PartitionContext context)
51 : {
52 : Logger.LogInfo("Open Async");
53 : return Task.FromResult<object>(null);
54 : }
55 :
56 : /// <summary>
57 : /// Asynchronously processes the specified context and messages. This method is called when there are new messages in the Event Hubs stream. Make sure to checkpoint only when you are finished processing all the events in each batch.
58 : /// </summary>
59 : /// <param name="context">Ownership information for the partition on which this processor instance works.</param>
60 : /// <param name="messages">A batch of Event Hubs events.</param>
61 : /// <returns>
62 : /// The task that indicates that <see cref="M:Microsoft.ServiceBus.Messaging.IEventProcessor.ProcessEventsAsync(Microsoft.ServiceBus.Messaging.PartitionContext,System.Collections.Generic.IEnumerable{Microsoft.ServiceBus.Messaging.EventData})"/> is complete.
63 : /// </returns>
64 1 : public virtual Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
65 : {
66 : Task results = new TaskFactory().StartNew(() =>
67 : {
68 : foreach (EventData eventData in messages)
69 : ReceiverMessageHandler(context, eventData);
70 : });
71 :
72 : return results;
73 : }
74 :
75 : /// <summary>
76 : /// Called when the ownership of partition moves to a different node for load-balancing purpose, or when the host is shutting down. Called in response to <see cref="M:Microsoft.ServiceBus.Messaging.EventHubConsumerGroup.UnregisterProcessorAsync(Microsoft.ServiceBus.Messaging.Lease,Microsoft.ServiceBus.Messaging.CloseReason)"/>.
77 : /// </summary>
78 : /// <param name="context">Partition ownership information for the partition on which this processor instance works. You can call <see cref="M:Microsoft.ServiceBus.Messaging.PartitionContext.CheckpointAsync"/> to checkpoint progress in the processing of messages from Event Hub streams.</param>
79 : /// <param name="reason">The reason for calling <see cref="M:Microsoft.ServiceBus.Messaging.IEventProcessor.CloseAsync(Microsoft.ServiceBus.Messaging.PartitionContext,Microsoft.ServiceBus.Messaging.CloseReason)"/>.</param>
80 : /// <returns>
81 : /// A task indicating that the Close operation is complete.
82 : /// </returns>
83 1 : public virtual Task CloseAsync(PartitionContext context, CloseReason reason)
84 : {
85 : Logger.LogInfo("Close Async");
86 : return Task.FromResult<object>(null);
87 : }
88 :
89 0 : public Task ProcessErrorAsync(PartitionContext context, Exception error)
90 : {
91 : Logger.LogInfo("Process Error Async");
92 : return Task.FromResult<object>(null);
93 : }
94 :
95 : #endregion
96 : }
97 : }
|