actix/fut/stream/
finish.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_core::ready;
7use pin_project_lite::pin_project;
8
9use crate::{
10 actor::Actor,
11 fut::{ActorFuture, ActorStream},
12};
13
14pin_project! {
15 #[derive(Debug)]
17 #[must_use = "streams do nothing unless polled"]
18 pub struct Finish<S> {
19 #[pin]
20 pub(crate) stream: S
21 }
22}
23
24impl<S> Finish<S> {
25 pub fn new(stream: S) -> Finish<S> {
26 Finish { stream }
27 }
28}
29
30impl<S, A> ActorFuture<A> for Finish<S>
31where
32 S: ActorStream<A>,
33 A: Actor,
34{
35 type Output = ();
36
37 fn poll(
38 mut self: Pin<&mut Self>,
39 act: &mut A,
40 ctx: &mut A::Context,
41 task: &mut Context<'_>,
42 ) -> Poll<()> {
43 let mut this = self.as_mut().project();
44 while ready!(this.stream.as_mut().poll_next(act, ctx, task)).is_some() {}
45 Poll::Ready(())
46 }
47}