actix/
utils.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5    time::Duration,
6};
7
8use futures_core::ready;
9use pin_project_lite::pin_project;
10use tokio::{
11    sync::oneshot,
12    time::{sleep_until, Instant},
13};
14
15use crate::{
16    actor::Actor,
17    clock::{sleep, Sleep},
18    fut::{ActorFuture, ActorStream},
19};
20
21#[deprecated(
22    since = "0.11.0",
23    note = "Please use tokio::sync::oneshot::Sender instead."
24)]
25pub struct Condition<T>
26where
27    T: Clone,
28{
29    waiters: Vec<oneshot::Sender<T>>,
30}
31
32#[allow(deprecated)]
33impl<T> Condition<T>
34where
35    T: Clone,
36{
37    pub fn wait(&mut self) -> oneshot::Receiver<T> {
38        let (tx, rx) = oneshot::channel();
39        self.waiters.push(tx);
40        rx
41    }
42
43    pub fn set(self, result: T) {
44        for waiter in self.waiters {
45            let _ = waiter.send(result.clone());
46        }
47    }
48}
49
50#[allow(deprecated)]
51impl<T> Default for Condition<T>
52where
53    T: Clone,
54{
55    fn default() -> Self {
56        Condition {
57            waiters: Vec::new(),
58        }
59    }
60}
61
62pin_project! {
63    /// An `ActorFuture` that runs a function in the actor's context after a specified amount of time.
64    ///
65    /// Unless you specifically need access to the future, use [`Context::run_later`] instead.
66    ///
67    /// [`Context::run_later`]: ../prelude/trait.AsyncContext.html#method.run_later
68    ///
69    /// ```
70    /// # use std::io;
71    /// use std::time::Duration;
72    /// use actix::prelude::*;
73    /// use actix::utils::TimerFunc;
74    ///
75    /// struct MyActor;
76    ///
77    /// impl MyActor {
78    ///     fn stop(&mut self, context: &mut Context<Self>) {
79    ///         System::current().stop();
80    ///     }
81    /// }
82    ///
83    /// impl Actor for MyActor {
84    ///    type Context = Context<Self>;
85    ///
86    ///    fn started(&mut self, context: &mut Context<Self>) {
87    ///        // spawn a delayed future into our context
88    ///        TimerFunc::new(Duration::from_millis(100), Self::stop)
89    ///            .spawn(context);
90    ///    }
91    /// }
92    /// # fn main() {
93    /// #    let mut sys = System::new();
94    /// #    let addr = sys.block_on(async { MyActor.start() });
95    /// #    sys.run();
96    /// # }
97    #[must_use = "future do nothing unless polled"]
98    #[allow(clippy::type_complexity)]
99    pub struct TimerFunc<A: Actor> {
100        f: Option<Box<dyn FnOnce(&mut A, &mut A::Context)>>,
101        #[pin]
102        timeout: Sleep,
103    }
104}
105
106impl<A: Actor> TimerFunc<A> {
107    /// Creates a new `TimerFunc` with the given duration.
108    pub fn new<F>(timeout: Duration, f: F) -> TimerFunc<A>
109    where
110        F: FnOnce(&mut A, &mut A::Context) + 'static,
111    {
112        TimerFunc {
113            f: Some(Box::new(f)),
114            timeout: sleep(timeout),
115        }
116    }
117}
118
119impl<A> ActorFuture<A> for TimerFunc<A>
120where
121    A: Actor,
122{
123    type Output = ();
124
125    fn poll(
126        self: Pin<&mut Self>,
127        act: &mut A,
128        ctx: &mut A::Context,
129        task: &mut Context<'_>,
130    ) -> Poll<Self::Output> {
131        let this = self.project();
132        ready!(this.timeout.poll(task));
133        let f = this.f.take().expect("TimerFunc polled after finish");
134        f(act, ctx);
135        Poll::Ready(())
136    }
137}
138
139pin_project! {
140    /// An `ActorStream` that periodically runs a function in the actor's context.
141    ///
142    /// Unless you specifically need access to the future, use [`Context::run_interval`] instead.
143    ///
144    /// [`Context::run_interval`]: ../prelude/trait.AsyncContext.html#method.run_interval
145    ///
146    /// ```
147    /// # use std::io;
148    /// use std::time::Duration;
149    /// use actix::prelude::*;
150    /// use actix::utils::IntervalFunc;
151    ///
152    /// struct MyActor;
153    ///
154    /// impl MyActor {
155    ///     fn tick(&mut self, context: &mut Context<Self>) {
156    ///         println!("tick");
157    ///     }
158    /// }
159    ///
160    /// impl Actor for MyActor {
161    ///    type Context = Context<Self>;
162    ///
163    ///    fn started(&mut self, context: &mut Context<Self>) {
164    ///        // spawn an interval stream into our context
165    ///        IntervalFunc::new(Duration::from_millis(100), Self::tick)
166    ///            .finish()
167    ///            .spawn(context);
168    /// #      context.run_later(Duration::from_millis(200), |_, _| System::current().stop());
169    ///    }
170    /// }
171    /// # fn main() {
172    /// #    let mut sys = System::new();
173    /// #    let addr = sys.block_on(async { MyActor.start() });
174    /// #    sys.run();
175    /// # }
176    /// ```
177    #[must_use = "future do nothing unless polled"]
178    #[allow(clippy::type_complexity)]
179    pub struct IntervalFunc<A: Actor> {
180        f: Box<dyn FnMut(&mut A, &mut A::Context)>,
181        interval: Duration,
182        #[pin]
183        timer: Sleep,
184    }
185}
186
187impl<A: Actor> IntervalFunc<A> {
188    /// Constructs an `IntervalFunc` using the given `interval` duration and `task` function.
189    pub fn new<F>(interval: Duration, task: F) -> IntervalFunc<A>
190    where
191        F: FnMut(&mut A, &mut A::Context) + 'static,
192    {
193        Self {
194            f: Box::new(task),
195            interval,
196            timer: sleep(interval),
197        }
198    }
199
200    /// Constructs an `IntervalFunc` using the given `start` time, `interval` duration, and `task`
201    /// function.
202    pub fn new_at<F>(start: Instant, interval: Duration, task: F) -> IntervalFunc<A>
203    where
204        F: FnMut(&mut A, &mut A::Context) + 'static,
205    {
206        Self {
207            f: Box::new(task),
208            interval,
209            timer: sleep_until(start),
210        }
211    }
212}
213
214impl<A: Actor> ActorStream<A> for IntervalFunc<A> {
215    type Item = ();
216
217    fn poll_next(
218        self: Pin<&mut Self>,
219        act: &mut A,
220        ctx: &mut A::Context,
221        task: &mut Context<'_>,
222    ) -> Poll<Option<Self::Item>> {
223        let mut this = self.project();
224        loop {
225            ready!(this.timer.as_mut().poll(task));
226            let now = this.timer.deadline();
227            this.timer.as_mut().reset(now + *this.interval);
228            (this.f)(act, ctx);
229        }
230    }
231}