Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using Cqrs.Bus;
11 : using EventStore.ClientAPI;
12 :
13 : namespace Cqrs.EventStore.Bus
14 : {
15 : /// <summary>
16 : /// Indicates the position in store where the stream has been read up to.
17 : /// </summary>
18 : public class EventStoreBasedLastEventProcessedStore : IStoreLastEventProcessed
19 1 : {
20 : /// <summary>
21 : /// The name of the event stream use to store the position/location information.
22 : /// </summary>
23 : public const string EventsProcessedStreamName = @"EventsProcessed";
24 :
25 : /// <summary>
26 : /// The name of the event type we use in the event stream to store the position/location information.
27 : /// </summary>
28 : public const string EventType = @"ProcessedEvent";
29 :
30 : /// <summary>
31 : /// The <see cref="IEventStoreConnection"/> used to read and write streams in the Greg Young Event Store.
32 : /// </summary>
33 : protected IEventStoreConnection EventStoreConnection { get; private set; }
34 :
35 : /// <summary>
36 : /// Instantiates a new instance of <see cref="EventStoreBasedLastEventProcessedStore"/>.
37 : /// </summary>
38 : /// <param name="eventStoreConnection">The <see cref="IEventStoreConnection"/> used to read streams.</param>
39 1 : public EventStoreBasedLastEventProcessedStore(IEventStoreConnection eventStoreConnection)
40 : {
41 : if (eventStoreConnection == null)
42 : {
43 : throw new ArgumentNullException("eventStoreConnection");
44 : }
45 :
46 : EventStoreConnection = eventStoreConnection;
47 : }
48 :
49 : /// <summary>
50 : /// The location within the store where the stream has been read up to.
51 : /// </summary>
52 : public string EventLocation
53 : {
54 : get
55 : {
56 : StreamEventsSlice slice = EventStoreConnection.ReadStreamEventsBackwardAsync(EventsProcessedStreamName, StreamPosition.End, 1, false).Result;
57 : if (slice.Events.Length > 0)
58 : {
59 : return EventStoreUtilities.ByteArrayToString(slice.Events[0].Event.Data);
60 : }
61 :
62 : return string.Empty;
63 : }
64 : set
65 : {
66 : var eventData = new EventData(Guid.NewGuid(), EventType, false, EventStoreUtilities.StringToByteArray(value), null);
67 : EventStoreConnection.AppendToStreamAsync(EventsProcessedStreamName, ExpectedVersion.Any, eventData).RunSynchronously();
68 : }
69 : }
70 : }
71 : }
|