actix/fut/stream/
collect.rs1use std::{
2 mem,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use futures_core::ready;
8use pin_project_lite::pin_project;
9
10use super::ActorStream;
11use crate::{actor::Actor, fut::future::ActorFuture};
12
13pin_project! {
14 #[derive(Debug)]
16 #[must_use = "futures do nothing unless you `.await` or poll them"]
17 pub struct Collect<S, C> {
18 #[pin]
19 stream: S,
20 collection: C,
21 }
22}
23
24impl<S, C> Collect<S, C>
25where
26 C: Default,
27{
28 pub(super) fn new(stream: S) -> Self {
29 Self {
30 stream,
31 collection: Default::default(),
32 }
33 }
34}
35
36impl<S, A, C> ActorFuture<A> for Collect<S, C>
37where
38 S: ActorStream<A>,
39 A: Actor,
40 C: Default + Extend<S::Item>,
41{
42 type Output = C;
43
44 fn poll(
45 mut self: Pin<&mut Self>,
46 act: &mut A,
47 ctx: &mut A::Context,
48 task: &mut Context<'_>,
49 ) -> Poll<Self::Output> {
50 let mut this = self.as_mut().project();
51 loop {
52 match ready!(this.stream.as_mut().poll_next(act, ctx, task)) {
53 Some(e) => this.collection.extend(Some(e)),
54 None => return Poll::Ready(mem::take(this.collection)),
55 }
56 }
57 }
58}