actix/fut/stream/
finish.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_core::ready;
7use pin_project_lite::pin_project;
8
9use crate::{
10    actor::Actor,
11    fut::{ActorFuture, ActorStream},
12};
13
14pin_project! {
15    /// Future for the [`finish`](super::ActorStreamExt::finish) method.
16    #[derive(Debug)]
17    #[must_use = "streams do nothing unless polled"]
18    pub struct Finish<S> {
19        #[pin]
20        pub(crate) stream: S
21    }
22}
23
24impl<S> Finish<S> {
25    pub fn new(stream: S) -> Finish<S> {
26        Finish { stream }
27    }
28}
29
30impl<S, A> ActorFuture<A> for Finish<S>
31where
32    S: ActorStream<A>,
33    A: Actor,
34{
35    type Output = ();
36
37    fn poll(
38        mut self: Pin<&mut Self>,
39        act: &mut A,
40        ctx: &mut A::Context,
41        task: &mut Context<'_>,
42    ) -> Poll<()> {
43        let mut this = self.as_mut().project();
44        while ready!(this.stream.as_mut().poll_next(act, ctx, task)).is_some() {}
45        Poll::Ready(())
46    }
47}