Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="Chinchilla Software Limited">
4 : // // Copyright Chinchilla Software Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using Akka.Actor;
13 : using Cqrs.Commands;
14 : using Cqrs.Events;
15 :
16 : namespace Cqrs.Akka.Commands
17 : {
18 : /// <summary>
19 : /// A <see cref="IAkkaCommandPublisher{TAuthenticationToken}"/> that ensure concurrency regardless of what it passes the command onto as it is a <see cref="ReceiveActor"/>
20 : /// </summary>
21 : /// <typeparam name="TAuthenticationToken">The <see cref="Type"/> of the authentication token.</typeparam>
22 : /// <typeparam name="TTarget">The <see cref="Type"/> of the object that is targeted that needs concurrency.</typeparam>
23 : public class ConcurrentAkkaCommandPublisher<TAuthenticationToken, TTarget>
24 : : ReceiveActor
25 : , IConcurrentAkkaCommandPublisher<TAuthenticationToken, TTarget>
26 1 : {
27 : /// <summary>
28 : /// Gets the <see cref="IActorRef"/>.
29 : /// </summary>
30 : protected IActorRef ActorReference { get; private set; }
31 :
32 : /// <summary>
33 : /// Gets the <see cref="ICommandReceiver{TAuthenticationToken}"/>.
34 : /// </summary>
35 : protected ICommandReceiver<TAuthenticationToken> CommandReceiver { get; private set; }
36 :
37 : /// <summary>
38 : /// Instantiates a new instance of <see cref="ConcurrentAkkaCommandPublisher{TAuthenticationToken,TTarget}"/>.
39 : /// </summary>
40 1 : public ConcurrentAkkaCommandPublisher(IActorRef actorReference, ICommandReceiver<TAuthenticationToken> commandReceiver)
41 : {
42 : ActorReference = actorReference;
43 : CommandReceiver = commandReceiver;
44 : }
45 :
46 : #region Implementation of ICommandPublisher<TAuthenticationToken>
47 :
48 : /// <summary>
49 : /// Publishes the provided <paramref name="command"/> on the <see cref="CommandReceiver"/>.
50 : /// </summary>
51 : /// <remarks>
52 : /// This is for when a command originated outside Akka and now needs to be pushed into Akka.
53 : /// </remarks>
54 1 : public void Publish<TCommand>(TCommand command)
55 : where TCommand : ICommand<TAuthenticationToken>
56 : {
57 : // This will trigger the Akka cycle back publishing... It looks weird, but trust it
58 : // This is for when a command originated outside Akka and now needs to be pushed into Akka
59 : CommandReceiver.ReceiveCommand(command);
60 : }
61 :
62 : /// <summary>
63 : /// Publishes the provided <paramref name="commands"/> on the <see cref="CommandReceiver"/>.
64 : /// </summary>
65 : /// <remarks>
66 : /// This is for when a command originated outside Akka and now needs to be pushed into Akka.
67 : /// </remarks>
68 1 : public void Publish<TCommand>(IEnumerable<TCommand> commands)
69 : where TCommand : ICommand<TAuthenticationToken>
70 : {
71 : // This will trigger the Akka cycle back publishing... It looks weird, but trust it
72 : // This is for when a command originated outside Akka and now needs to be pushed into Akka
73 : foreach (TCommand command in commands)
74 : CommandReceiver.ReceiveCommand(command);
75 : }
76 :
77 : /// <summary>
78 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/>
79 : /// </summary>
80 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
81 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
82 1 : public virtual TEvent PublishAndWait<TCommand, TEvent>(TCommand command, IEventReceiver<TAuthenticationToken> eventReceiver = null)
83 : where TCommand : ICommand<TAuthenticationToken>
84 : {
85 : return PublishAndWait<TCommand, TEvent>(command, -1, eventReceiver);
86 : }
87 :
88 : /// <summary>
89 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
90 : /// </summary>
91 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
92 : /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see cref="F:System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
93 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
94 1 : public TEvent PublishAndWait<TCommand, TEvent>(TCommand command, int millisecondsTimeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
95 : where TCommand : ICommand<TAuthenticationToken>
96 : {
97 : return PublishAndWait(command, events => (TEvent)events.SingleOrDefault(@event => @event is TEvent), millisecondsTimeout, eventReceiver);
98 : }
99 :
100 : /// <summary>
101 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
102 : /// </summary>
103 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
104 : /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
105 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
106 1 : public TEvent PublishAndWait<TCommand, TEvent>(TCommand command, TimeSpan timeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
107 : where TCommand : ICommand<TAuthenticationToken>
108 : {
109 : long num = (long)timeout.TotalMilliseconds;
110 : if (num < -1L || num > int.MaxValue)
111 : throw new ArgumentOutOfRangeException("timeout", timeout, "SpinWait_SpinUntil_TimeoutWrong");
112 : return PublishAndWait<TCommand, TEvent>(command, (int)timeout.TotalMilliseconds, eventReceiver);
113 : }
114 :
115 : /// <summary>
116 : /// Publishes the provided <paramref name="command"></paramref> and waits until the specified condition is satisfied an event of <typeparamref name="TEvent"/>
117 : /// </summary>
118 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
119 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
120 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
121 1 : public TEvent PublishAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, IEventReceiver<TAuthenticationToken> eventReceiver = null)
122 : where TCommand : ICommand<TAuthenticationToken>
123 : {
124 : return PublishAndWait(command, condition, -1, eventReceiver);
125 : }
126 :
127 : /// <summary>
128 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
129 : /// </summary>
130 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
131 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
132 : /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see cref="F:System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
133 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
134 1 : public TEvent PublishAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, int millisecondsTimeout,
135 : IEventReceiver<TAuthenticationToken> eventReceiver = null) where TCommand : ICommand<TAuthenticationToken>
136 : {
137 : throw new NotImplementedException("This is a proxy so this wouldn't happen here.");
138 : }
139 :
140 : /// <summary>
141 : /// Publishes the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
142 : /// </summary>
143 : /// <param name="command">The <typeparamref name="TCommand"/> to publish.</param>
144 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
145 : /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
146 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
147 1 : public TEvent PublishAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, TimeSpan timeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
148 : where TCommand : ICommand<TAuthenticationToken>
149 : {
150 : long num = (long)timeout.TotalMilliseconds;
151 : if (num < -1L || num > int.MaxValue)
152 : throw new ArgumentOutOfRangeException("timeout", timeout, "SpinWait_SpinUntil_TimeoutWrong");
153 : return PublishAndWait(command, condition, (int)timeout.TotalMilliseconds, eventReceiver);
154 : }
155 :
156 : #endregion
157 : }
158 : }
|