Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using System.Threading.Tasks;
13 : using Cqrs.Events;
14 : using cdmdotnet.Logging;
15 : using Microsoft.Azure.Documents;
16 : using Microsoft.Azure.Documents.Client;
17 : using Microsoft.Azure.Documents.Linq;
18 :
19 : namespace Cqrs.Azure.DocumentDb.Events
20 : {
21 : public class AzureDocumentDbEventStore<TAuthenticationToken> : EventStore<TAuthenticationToken>
22 0 : {
23 : protected readonly string[] UniqueIndexProperties = {"AggregateId", "Version"};
24 :
25 : protected IAzureDocumentDbEventStoreConnectionStringFactory AzureDocumentDbEventStoreConnectionStringFactory { get; private set; }
26 :
27 : protected IAzureDocumentDbHelper AzureDocumentDbHelper { get; private set; }
28 :
29 0 : public AzureDocumentDbEventStore(IEventBuilder<TAuthenticationToken> eventBuilder, IEventDeserialiser<TAuthenticationToken> eventDeserialiser, ILogger logger, IAzureDocumentDbHelper azureDocumentDbHelper, IAzureDocumentDbEventStoreConnectionStringFactory azureDocumentDbEventStoreConnectionStringFactory)
30 : : base(eventBuilder, eventDeserialiser, logger)
31 : {
32 : AzureDocumentDbHelper = azureDocumentDbHelper;
33 : AzureDocumentDbEventStoreConnectionStringFactory = azureDocumentDbEventStoreConnectionStringFactory;
34 : }
35 :
36 0 : public override IEnumerable<IEvent<TAuthenticationToken>> Get(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
37 : {
38 : return GetAsync(aggregateRootType, aggregateId, useLastEventOnly, fromVersion).Result;
39 : }
40 :
41 0 : public override IEnumerable<EventData> Get(Guid correlationId)
42 : {
43 : return GetAsync(correlationId).Result;
44 : }
45 :
46 0 : protected async Task<IEnumerable<IEvent<TAuthenticationToken>>> GetAsync<T>(Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
47 : {
48 : return Get(typeof(T), aggregateId, useLastEventOnly, fromVersion);
49 : }
50 :
51 0 : protected async Task<IEnumerable<IEvent<TAuthenticationToken>>> GetAsync(Type aggregateRootType, Guid aggregateId, bool useLastEventOnly = false, int fromVersion = -1)
52 : {
53 : using (DocumentClient client = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionClient())
54 : {
55 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionDatabaseName()).Result;
56 : //DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, string.Format("{0}_{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), typeof(T).FullName)).Result;
57 : string collectionName = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName();
58 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
59 :
60 : IOrderedQueryable<EventData> query = client.CreateDocumentQuery<EventData>(collection.SelfLink);
61 : string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);
62 :
63 : IEnumerable<EventData> results = query.Where(x => x.AggregateId == streamName && x.Version > fromVersion);
64 :
65 : return AzureDocumentDbHelper.ExecuteFaultTollerantFunction(() =>
66 : results
67 : .ToList()
68 : .OrderByDescending(x => x.EventId)
69 : .Select(EventDeserialiser.Deserialise)
70 : );
71 : }
72 : }
73 :
74 0 : protected async Task<IEnumerable<EventData>> GetAsync(Guid correlationId)
75 : {
76 : using (DocumentClient client = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionClient())
77 : {
78 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionDatabaseName()).Result;
79 : //DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, string.Format("{0}_{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), typeof(T).FullName)).Result;
80 : string collectionName = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName();
81 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
82 :
83 : IOrderedQueryable<EventData> query = client.CreateDocumentQuery<EventData>(collection.SelfLink);
84 :
85 : IEnumerable<EventData> results = query.Where(x => x.CorrelationId == correlationId);
86 :
87 : return AzureDocumentDbHelper.ExecuteFaultTollerantFunction(() =>
88 : results
89 : .ToList()
90 : .OrderBy(x => x.Timestamp)
91 : );
92 : }
93 : }
94 :
95 0 : protected override void PersistEvent(EventData eventData)
96 : {
97 : Logger.LogDebug("Persisting aggregate root event", string.Format("{0}\\PersitEvent", GetType().Name));
98 : try
99 : {
100 : using (DocumentClient client = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionClient())
101 : {
102 : Database database = AzureDocumentDbHelper.CreateOrReadDatabase(client, AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionDatabaseName()).Result;
103 : //DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, string.Format("{0}_{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), eventData.EventType)).Result;
104 : //string collectionName = string.Format("{0}::{1}", AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName(), eventData.AggregateId.Substring(0, eventData.AggregateId.IndexOf("/", StringComparison.Ordinal)));
105 : string collectionName = AzureDocumentDbEventStoreConnectionStringFactory.GetEventStoreConnectionCollectionName();
106 : DocumentCollection collection = AzureDocumentDbHelper.CreateOrReadCollection(client, database, collectionName, UniqueIndexProperties).Result;
107 :
108 : Logger.LogDebug("Creating document for event asynchronously", string.Format("{0}\\PersitEvent", GetType().Name));
109 : AzureDocumentDbHelper.ExecuteFaultTollerantFunction
110 : (
111 : () =>
112 : {
113 : Task<ResourceResponse<Document>> work = client.CreateDocumentAsync
114 : (
115 : collection.SelfLink,
116 : eventData,
117 : new RequestOptions {PreTriggerInclude = new[] {"ValidateUniqueConstraints"}}
118 : );
119 : work.ConfigureAwait(false);
120 : work.Wait();
121 : }
122 : );
123 : }
124 : }
125 : finally
126 : {
127 : Logger.LogDebug("Persisting aggregate root event... Done", string.Format("{0}\\PersitEvent", GetType().Name));
128 : }
129 : }
130 : }
131 : }
|