actix/fut/stream/
map.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_core::ready;
7use pin_project_lite::pin_project;
8
9use crate::{actor::Actor, fut::ActorStream};
10
11pin_project! {
12    /// Stream for the [`map`](super::ActorStreamExt::map) method.
13    #[derive(Debug)]
14    #[must_use = "streams do nothing unless polled"]
15    pub struct Map<S, F> {
16        #[pin]
17        stream: S,
18        f: F,
19    }
20}
21
22pub(super) fn new<S, A, F, U>(stream: S, f: F) -> Map<S, F>
23where
24    S: ActorStream<A>,
25    A: Actor,
26    F: FnMut(S::Item, &mut A, &mut A::Context) -> U,
27{
28    Map { stream, f }
29}
30
31impl<S, A, F, U> ActorStream<A> for Map<S, F>
32where
33    S: ActorStream<A>,
34    A: Actor,
35    F: FnMut(S::Item, &mut A, &mut A::Context) -> U,
36{
37    type Item = U;
38
39    fn poll_next(
40        self: Pin<&mut Self>,
41        act: &mut A,
42        ctx: &mut A::Context,
43        task: &mut Context<'_>,
44    ) -> Poll<Option<Self::Item>> {
45        let mut this = self.project();
46        let res = ready!(this.stream.as_mut().poll_next(act, ctx, task));
47        Poll::Ready(res.map(|x| (this.f)(x, act, ctx)))
48    }
49}