actix/fut/stream/
collect.rs

1use std::{
2    mem,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use futures_core::ready;
8use pin_project_lite::pin_project;
9
10use super::ActorStream;
11use crate::{actor::Actor, fut::future::ActorFuture};
12
13pin_project! {
14    /// Future for the [`collect`](super::ActorStreamExt::collect) method.
15    #[derive(Debug)]
16    #[must_use = "futures do nothing unless you `.await` or poll them"]
17    pub struct Collect<S, C> {
18        #[pin]
19        stream: S,
20        collection: C,
21    }
22}
23
24impl<S, C> Collect<S, C>
25where
26    C: Default,
27{
28    pub(super) fn new(stream: S) -> Self {
29        Self {
30            stream,
31            collection: Default::default(),
32        }
33    }
34}
35
36impl<S, A, C> ActorFuture<A> for Collect<S, C>
37where
38    S: ActorStream<A>,
39    A: Actor,
40    C: Default + Extend<S::Item>,
41{
42    type Output = C;
43
44    fn poll(
45        mut self: Pin<&mut Self>,
46        act: &mut A,
47        ctx: &mut A::Context,
48        task: &mut Context<'_>,
49    ) -> Poll<Self::Output> {
50        let mut this = self.as_mut().project();
51        loop {
52            match ready!(this.stream.as_mut().poll_next(act, ctx, task)) {
53                Some(e) => this.collection.extend(Some(e)),
54                None => return Poll::Ready(mem::take(this.collection)),
55            }
56        }
57    }
58}