actix/fut/stream/
timeout.rsuse std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use pin_project_lite::pin_project;
use crate::{
actor::Actor,
clock::{sleep, Instant, Sleep},
fut::ActorStream,
};
pin_project! {
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Timeout<S> {
#[pin]
stream: S,
dur: Duration,
reset_timeout: bool,
#[pin]
timeout: Sleep,
}
}
impl<S> Timeout<S> {
pub(super) fn new(stream: S, timeout: Duration) -> Self {
Self {
stream,
dur: timeout,
reset_timeout: false,
timeout: sleep(timeout),
}
}
}
impl<S, A> ActorStream<A> for Timeout<S>
where
S: ActorStream<A>,
A: Actor,
{
type Item = Result<S::Item, ()>;
fn poll_next(
self: Pin<&mut Self>,
act: &mut A,
ctx: &mut A::Context,
task: &mut Context<'_>,
) -> Poll<Option<Result<S::Item, ()>>> {
let mut this = self.project();
match this.stream.poll_next(act, ctx, task) {
Poll::Ready(Some(res)) => {
*this.reset_timeout = true;
Poll::Ready(Some(Ok(res)))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => {
if *this.reset_timeout {
*this.reset_timeout = false;
this.timeout.as_mut().reset(Instant::now() + *this.dur);
}
this.timeout.poll(task).map(|_| Some(Err(())))
}
}
}
}