actix/fut/stream/
timeout.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5    time::Duration,
6};
7
8use pin_project_lite::pin_project;
9
10use crate::{
11    actor::Actor,
12    clock::{sleep, Instant, Sleep},
13    fut::ActorStream,
14};
15
16pin_project! {
17    /// Stream for the [`timeout`](super::ActorStreamExt::timeout) method.
18    #[derive(Debug)]
19    #[must_use = "streams do nothing unless polled"]
20    pub struct Timeout<S> {
21        #[pin]
22        stream: S,
23        dur: Duration,
24        reset_timeout: bool,
25        #[pin]
26        timeout: Sleep,
27    }
28}
29
30impl<S> Timeout<S> {
31    pub(super) fn new(stream: S, timeout: Duration) -> Self {
32        Self {
33            stream,
34            dur: timeout,
35            reset_timeout: false,
36            timeout: sleep(timeout),
37        }
38    }
39}
40
41impl<S, A> ActorStream<A> for Timeout<S>
42where
43    S: ActorStream<A>,
44    A: Actor,
45{
46    type Item = Result<S::Item, ()>;
47
48    fn poll_next(
49        self: Pin<&mut Self>,
50        act: &mut A,
51        ctx: &mut A::Context,
52        task: &mut Context<'_>,
53    ) -> Poll<Option<Result<S::Item, ()>>> {
54        let mut this = self.project();
55
56        match this.stream.poll_next(act, ctx, task) {
57            Poll::Ready(Some(res)) => {
58                *this.reset_timeout = true;
59                Poll::Ready(Some(Ok(res)))
60            }
61            Poll::Ready(None) => Poll::Ready(None),
62            Poll::Pending => {
63                // only reset timeout when poll_next returns Ready and followed by Pending after.
64                if *this.reset_timeout {
65                    *this.reset_timeout = false;
66                    this.timeout.as_mut().reset(Instant::now() + *this.dur);
67                }
68
69                // check timeout
70                this.timeout.poll(task).map(|_| Some(Err(())))
71            }
72        }
73    }
74}