Line data Source code
1 : #region Copyright
2 : // // -----------------------------------------------------------------------
3 : // // <copyright company="cdmdotnet Limited">
4 : // // Copyright cdmdotnet Limited. All rights reserved.
5 : // // </copyright>
6 : // // -----------------------------------------------------------------------
7 : #endregion
8 :
9 : using System;
10 : using System.Collections.Generic;
11 : using System.Linq;
12 : using Akka.Actor;
13 : using Cqrs.Commands;
14 : using Cqrs.Events;
15 :
16 : namespace Cqrs.Akka.Commands
17 : {
18 : public class ConcurrentAkkaCommandSender<TAuthenticationToken, TTarget>
19 : : ReceiveActor
20 : , IConcurrentAkkaCommandSender<TAuthenticationToken, TTarget>
21 0 : {
22 : protected IActorRef ActorReference { get; private set; }
23 :
24 : protected ICommandReceiver<TAuthenticationToken> CommandReceiver { get; private set; }
25 :
26 0 : public ConcurrentAkkaCommandSender(IActorRef actorReference, ICommandReceiver<TAuthenticationToken> commandReceiver)
27 : {
28 : ActorReference = actorReference;
29 : CommandReceiver = commandReceiver;
30 : }
31 :
32 : #region Implementation of ICommandSender<TAuthenticationToken>
33 :
34 0 : public void Publish<TCommand>(TCommand command)
35 : where TCommand : ICommand<TAuthenticationToken>
36 : {
37 : // This will trigger the Akka cycle back publishing... It looks weird, but trust it
38 : // This is for when a command originated outside Akka and now needs to be pushed into Akka
39 : CommandReceiver.ReceiveCommand(command);
40 : }
41 :
42 0 : public void Send<TCommand>(TCommand command)
43 : where TCommand : ICommand<TAuthenticationToken>
44 : {
45 : Publish(command);
46 : }
47 :
48 0 : public void Publish<TCommand>(IEnumerable<TCommand> commands)
49 : where TCommand : ICommand<TAuthenticationToken>
50 : {
51 : // This will trigger the Akka cycle back publishing... It looks weird, but trust it
52 : // This is for when a command originated outside Akka and now needs to be pushed into Akka
53 : foreach (TCommand command in commands)
54 : CommandReceiver.ReceiveCommand(command);
55 : }
56 :
57 0 : public void Send<TCommand>(IEnumerable<TCommand> commands)
58 : where TCommand : ICommand<TAuthenticationToken>
59 : {
60 : Publish(commands);
61 : }
62 :
63 : /// <summary>
64 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/>
65 : /// </summary>
66 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
67 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
68 1 : public virtual TEvent SendAndWait<TCommand, TEvent>(TCommand command, IEventReceiver<TAuthenticationToken> eventReceiver = null)
69 : where TCommand : ICommand<TAuthenticationToken>
70 : {
71 : return SendAndWait<TCommand, TEvent>(command, -1, eventReceiver);
72 : }
73 :
74 : /// <summary>
75 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
76 : /// </summary>
77 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
78 : /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see cref="F:System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
79 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
80 1 : public TEvent SendAndWait<TCommand, TEvent>(TCommand command, int millisecondsTimeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
81 : where TCommand : ICommand<TAuthenticationToken>
82 : {
83 : return SendAndWait(command, events => (TEvent)events.SingleOrDefault(@event => @event is TEvent), millisecondsTimeout, eventReceiver);
84 : }
85 :
86 : /// <summary>
87 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
88 : /// </summary>
89 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
90 : /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
91 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
92 1 : public TEvent SendAndWait<TCommand, TEvent>(TCommand command, TimeSpan timeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
93 : where TCommand : ICommand<TAuthenticationToken>
94 : {
95 : long num = (long)timeout.TotalMilliseconds;
96 : if (num < -1L || num > int.MaxValue)
97 : throw new ArgumentOutOfRangeException("timeout", timeout, "SpinWait_SpinUntil_TimeoutWrong");
98 : return SendAndWait<TCommand, TEvent>(command, (int)timeout.TotalMilliseconds, eventReceiver);
99 : }
100 :
101 : /// <summary>
102 : /// Sends the provided <paramref name="command"></paramref> and waits until the specified condition is satisfied an event of <typeparamref name="TEvent"/>
103 : /// </summary>
104 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
105 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
106 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
107 1 : public TEvent SendAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, IEventReceiver<TAuthenticationToken> eventReceiver = null)
108 : where TCommand : ICommand<TAuthenticationToken>
109 : {
110 : return SendAndWait(command, condition, -1, eventReceiver);
111 : }
112 :
113 : /// <summary>
114 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
115 : /// </summary>
116 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
117 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
118 : /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see cref="F:System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
119 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
120 1 : public TEvent SendAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, int millisecondsTimeout,
121 : IEventReceiver<TAuthenticationToken> eventReceiver = null) where TCommand : ICommand<TAuthenticationToken>
122 : {
123 : throw new NotImplementedException("This is a proxy so this wouldn't happen here.");
124 : }
125 :
126 : /// <summary>
127 : /// Sends the provided <paramref name="command"></paramref> and waits for an event of <typeparamref name="TEvent"/> or exits if the specified timeout is expired.
128 : /// </summary>
129 : /// <param name="command">The <typeparamref name="TCommand"/> to send.</param>
130 : /// <param name="condition">A delegate to be executed over and over until it returns the <typeparamref name="TEvent"/> that is desired, return null to keep trying.</param>
131 : /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
132 : /// <param name="eventReceiver">If provided, is the <see cref="IEventReceiver{TAuthenticationToken}" /> that the event is expected to be returned on.</param>
133 1 : public TEvent SendAndWait<TCommand, TEvent>(TCommand command, Func<IEnumerable<IEvent<TAuthenticationToken>>, TEvent> condition, TimeSpan timeout, IEventReceiver<TAuthenticationToken> eventReceiver = null)
134 : where TCommand : ICommand<TAuthenticationToken>
135 : {
136 : long num = (long)timeout.TotalMilliseconds;
137 : if (num < -1L || num > int.MaxValue)
138 : throw new ArgumentOutOfRangeException("timeout", timeout, "SpinWait_SpinUntil_TimeoutWrong");
139 : return SendAndWait(command, condition, (int)timeout.TotalMilliseconds, eventReceiver);
140 : }
141 :
142 : #endregion
143 : }
144 : }
|