actix/utils.rs
1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5 time::Duration,
6};
7
8use futures_core::ready;
9use pin_project_lite::pin_project;
10use tokio::{
11 sync::oneshot,
12 time::{sleep_until, Instant},
13};
14
15use crate::{
16 actor::Actor,
17 clock::{sleep, Sleep},
18 fut::{ActorFuture, ActorStream},
19};
20
21#[deprecated(
22 since = "0.11.0",
23 note = "Please use tokio::sync::oneshot::Sender instead."
24)]
25pub struct Condition<T>
26where
27 T: Clone,
28{
29 waiters: Vec<oneshot::Sender<T>>,
30}
31
32#[allow(deprecated)]
33impl<T> Condition<T>
34where
35 T: Clone,
36{
37 pub fn wait(&mut self) -> oneshot::Receiver<T> {
38 let (tx, rx) = oneshot::channel();
39 self.waiters.push(tx);
40 rx
41 }
42
43 pub fn set(self, result: T) {
44 for waiter in self.waiters {
45 let _ = waiter.send(result.clone());
46 }
47 }
48}
49
50#[allow(deprecated)]
51impl<T> Default for Condition<T>
52where
53 T: Clone,
54{
55 fn default() -> Self {
56 Condition {
57 waiters: Vec::new(),
58 }
59 }
60}
61
62pin_project! {
63 /// An `ActorFuture` that runs a function in the actor's context after a specified amount of time.
64 ///
65 /// Unless you specifically need access to the future, use [`Context::run_later`] instead.
66 ///
67 /// [`Context::run_later`]: ../prelude/trait.AsyncContext.html#method.run_later
68 ///
69 /// ```
70 /// # use std::io;
71 /// use std::time::Duration;
72 /// use actix::prelude::*;
73 /// use actix::utils::TimerFunc;
74 ///
75 /// struct MyActor;
76 ///
77 /// impl MyActor {
78 /// fn stop(&mut self, context: &mut Context<Self>) {
79 /// System::current().stop();
80 /// }
81 /// }
82 ///
83 /// impl Actor for MyActor {
84 /// type Context = Context<Self>;
85 ///
86 /// fn started(&mut self, context: &mut Context<Self>) {
87 /// // spawn a delayed future into our context
88 /// TimerFunc::new(Duration::from_millis(100), Self::stop)
89 /// .spawn(context);
90 /// }
91 /// }
92 /// # fn main() {
93 /// # let mut sys = System::new();
94 /// # let addr = sys.block_on(async { MyActor.start() });
95 /// # sys.run();
96 /// # }
97 #[must_use = "future do nothing unless polled"]
98 #[allow(clippy::type_complexity)]
99 pub struct TimerFunc<A: Actor> {
100 f: Option<Box<dyn FnOnce(&mut A, &mut A::Context)>>,
101 #[pin]
102 timeout: Sleep,
103 }
104}
105
106impl<A: Actor> TimerFunc<A> {
107 /// Creates a new `TimerFunc` with the given duration.
108 pub fn new<F>(timeout: Duration, f: F) -> TimerFunc<A>
109 where
110 F: FnOnce(&mut A, &mut A::Context) + 'static,
111 {
112 TimerFunc {
113 f: Some(Box::new(f)),
114 timeout: sleep(timeout),
115 }
116 }
117}
118
119impl<A> ActorFuture<A> for TimerFunc<A>
120where
121 A: Actor,
122{
123 type Output = ();
124
125 fn poll(
126 self: Pin<&mut Self>,
127 act: &mut A,
128 ctx: &mut A::Context,
129 task: &mut Context<'_>,
130 ) -> Poll<Self::Output> {
131 let this = self.project();
132 ready!(this.timeout.poll(task));
133 let f = this.f.take().expect("TimerFunc polled after finish");
134 f(act, ctx);
135 Poll::Ready(())
136 }
137}
138
139pin_project! {
140 /// An `ActorStream` that periodically runs a function in the actor's context.
141 ///
142 /// Unless you specifically need access to the future, use [`Context::run_interval`] instead.
143 ///
144 /// [`Context::run_interval`]: ../prelude/trait.AsyncContext.html#method.run_interval
145 ///
146 /// ```
147 /// # use std::io;
148 /// use std::time::Duration;
149 /// use actix::prelude::*;
150 /// use actix::utils::IntervalFunc;
151 ///
152 /// struct MyActor;
153 ///
154 /// impl MyActor {
155 /// fn tick(&mut self, context: &mut Context<Self>) {
156 /// println!("tick");
157 /// }
158 /// }
159 ///
160 /// impl Actor for MyActor {
161 /// type Context = Context<Self>;
162 ///
163 /// fn started(&mut self, context: &mut Context<Self>) {
164 /// // spawn an interval stream into our context
165 /// IntervalFunc::new(Duration::from_millis(100), Self::tick)
166 /// .finish()
167 /// .spawn(context);
168 /// # context.run_later(Duration::from_millis(200), |_, _| System::current().stop());
169 /// }
170 /// }
171 /// # fn main() {
172 /// # let mut sys = System::new();
173 /// # let addr = sys.block_on(async { MyActor.start() });
174 /// # sys.run();
175 /// # }
176 /// ```
177 #[must_use = "future do nothing unless polled"]
178 #[allow(clippy::type_complexity)]
179 pub struct IntervalFunc<A: Actor> {
180 f: Box<dyn FnMut(&mut A, &mut A::Context)>,
181 interval: Duration,
182 #[pin]
183 timer: Sleep,
184 }
185}
186
187impl<A: Actor> IntervalFunc<A> {
188 /// Constructs an `IntervalFunc` using the given `interval` duration and `task` function.
189 pub fn new<F>(interval: Duration, task: F) -> IntervalFunc<A>
190 where
191 F: FnMut(&mut A, &mut A::Context) + 'static,
192 {
193 Self {
194 f: Box::new(task),
195 interval,
196 timer: sleep(interval),
197 }
198 }
199
200 /// Constructs an `IntervalFunc` using the given `start` time, `interval` duration, and `task`
201 /// function.
202 pub fn new_at<F>(start: Instant, interval: Duration, task: F) -> IntervalFunc<A>
203 where
204 F: FnMut(&mut A, &mut A::Context) + 'static,
205 {
206 Self {
207 f: Box::new(task),
208 interval,
209 timer: sleep_until(start),
210 }
211 }
212}
213
214impl<A: Actor> ActorStream<A> for IntervalFunc<A> {
215 type Item = ();
216
217 fn poll_next(
218 self: Pin<&mut Self>,
219 act: &mut A,
220 ctx: &mut A::Context,
221 task: &mut Context<'_>,
222 ) -> Poll<Option<Self::Item>> {
223 let mut this = self.project();
224 loop {
225 ready!(this.timer.as_mut().poll(task));
226 let now = this.timer.deadline();
227 this.timer.as_mut().reset(now + *this.interval);
228 (this.f)(act, ctx);
229 }
230 }
231}