actix/fut/stream/
timeout.rs1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5 time::Duration,
6};
7
8use pin_project_lite::pin_project;
9
10use crate::{
11 actor::Actor,
12 clock::{sleep, Instant, Sleep},
13 fut::ActorStream,
14};
15
16pin_project! {
17 #[derive(Debug)]
19 #[must_use = "streams do nothing unless polled"]
20 pub struct Timeout<S> {
21 #[pin]
22 stream: S,
23 dur: Duration,
24 reset_timeout: bool,
25 #[pin]
26 timeout: Sleep,
27 }
28}
29
30impl<S> Timeout<S> {
31 pub(super) fn new(stream: S, timeout: Duration) -> Self {
32 Self {
33 stream,
34 dur: timeout,
35 reset_timeout: false,
36 timeout: sleep(timeout),
37 }
38 }
39}
40
41impl<S, A> ActorStream<A> for Timeout<S>
42where
43 S: ActorStream<A>,
44 A: Actor,
45{
46 type Item = Result<S::Item, ()>;
47
48 fn poll_next(
49 self: Pin<&mut Self>,
50 act: &mut A,
51 ctx: &mut A::Context,
52 task: &mut Context<'_>,
53 ) -> Poll<Option<Result<S::Item, ()>>> {
54 let mut this = self.project();
55
56 match this.stream.poll_next(act, ctx, task) {
57 Poll::Ready(Some(res)) => {
58 *this.reset_timeout = true;
59 Poll::Ready(Some(Ok(res)))
60 }
61 Poll::Ready(None) => Poll::Ready(None),
62 Poll::Pending => {
63 if *this.reset_timeout {
65 *this.reset_timeout = false;
66 this.timeout.as_mut().reset(Instant::now() + *this.dur);
67 }
68
69 this.timeout.poll(task).map(|_| Some(Err(())))
71 }
72 }
73 }
74}