actix/fut/stream/mod.rs
1use std::{
2 marker::PhantomData,
3 pin::Pin,
4 task::{Context, Poll},
5 time::Duration,
6};
7
8pub use collect::Collect;
9pub use finish::Finish;
10pub use fold::Fold;
11use futures_core::stream::Stream;
12pub use map::Map;
13use pin_project_lite::pin_project;
14pub use skip_while::SkipWhile;
15pub use take_while::TakeWhile;
16pub use then::Then;
17pub use timeout::Timeout;
18
19use super::future::ActorFuture;
20use crate::actor::Actor;
21
22mod collect;
23mod finish;
24mod fold;
25mod map;
26mod skip_while;
27mod take_while;
28mod then;
29mod timeout;
30
31/// A stream of values, not all of which may have been produced yet.
32///
33/// This is similar to `futures_util::stream::Stream` trait, except it works with `Actor`
34pub trait ActorStream<A: Actor> {
35 /// The type of item this stream will yield on success.
36 type Item;
37
38 fn poll_next(
39 self: Pin<&mut Self>,
40 srv: &mut A,
41 ctx: &mut A::Context,
42 task: &mut Context<'_>,
43 ) -> Poll<Option<Self::Item>>;
44}
45
46pub trait ActorStreamExt<A: Actor>: ActorStream<A> {
47 /// Maps this stream's items to a different type, returning a new stream of
48 /// the resulting type.
49 ///
50 /// The provided closure is executed over all elements of this stream as
51 /// they are made available. It is executed inline with calls to
52 /// [`poll_next`](ActorStream::poll_next).
53 ///
54 /// Note that this function consumes the stream passed into it and returns a
55 /// wrapped version of it, similar to the existing `map` methods in the
56 /// standard library.
57 fn map<F, U>(self, f: F) -> Map<Self, F>
58 where
59 F: FnMut(Self::Item, &mut A, &mut A::Context) -> U,
60 Self: Sized,
61 {
62 map::new(self, f)
63 }
64
65 /// Computes from this stream's items new items of a different type using
66 /// an asynchronous closure.
67 ///
68 /// The provided closure `f` will be called with an `Item` once a value is
69 /// ready, it returns a future which will then be run to completion
70 /// to produce the next value on this stream.
71 ///
72 /// Note that this function consumes the stream passed into it and returns a
73 /// wrapped version of it.
74 fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
75 where
76 F: FnMut(Self::Item, &mut A, &mut A::Context) -> Fut,
77 Fut: ActorFuture<A>,
78 Self: Sized,
79 {
80 then::new(self, f)
81 }
82
83 /// Execute an accumulating asynchronous computation over a stream,
84 /// collecting all the values into one final result.
85 ///
86 /// This combinator will accumulate all values returned by this stream
87 /// according to the closure provided. The initial state is also provided to
88 /// this method and then is returned again by each execution of the closure.
89 /// Once the entire stream has been exhausted the returned future will
90 /// resolve to this value.
91 fn fold<F, Fut>(self, init: Fut::Output, f: F) -> Fold<Self, F, Fut, Fut::Output>
92 where
93 F: FnMut(Fut::Output, Self::Item, &mut A, &mut A::Context) -> Fut,
94 Fut: ActorFuture<A>,
95 Self: Sized,
96 {
97 fold::new(self, f, init)
98 }
99
100 /// Take elements from this stream while the provided asynchronous predicate
101 /// resolves to `true`.
102 ///
103 /// This function, like `Iterator::take_while`, will take elements from the
104 /// stream until the predicate `f` resolves to `false`. Once one element
105 /// returns `false`, it will always return that the stream is done.
106 fn take_while<F, Fut>(self, f: F) -> TakeWhile<Self, Self::Item, F, Fut>
107 where
108 F: FnMut(&Self::Item, &mut A, &mut A::Context) -> Fut,
109 Fut: ActorFuture<A, Output = bool>,
110 Self: Sized,
111 {
112 take_while::new(self, f)
113 }
114
115 /// Skip elements on this stream while the provided asynchronous predicate
116 /// resolves to `true`.
117 ///
118 /// This function, like `Iterator::skip_while`, will skip elements on the
119 /// stream until the predicate `f` resolves to `false`. Once one element
120 /// returns `false`, all future elements will be returned from the underlying
121 /// stream.
122 fn skip_while<F, Fut>(self, f: F) -> SkipWhile<Self, Self::Item, F, Fut>
123 where
124 F: FnMut(&Self::Item, &mut A, &mut A::Context) -> Fut,
125 Fut: ActorFuture<A, Output = bool>,
126 Self: Sized,
127 {
128 skip_while::new(self, f)
129 }
130
131 /// Add timeout to stream.
132 ///
133 /// `Err(())` returned as a timeout error.
134 fn timeout(self, timeout: Duration) -> Timeout<Self>
135 where
136 Self: Sized,
137 {
138 Timeout::new(self, timeout)
139 }
140
141 /// Transforms a stream into a collection, returning a
142 /// future representing the result of that computation.
143 ///
144 /// The returned future will be resolved when the stream terminates.
145 fn collect<C>(self) -> Collect<Self, C>
146 where
147 C: Default + Extend<Self::Item>,
148 Self: Sized,
149 {
150 Collect::new(self)
151 }
152
153 /// Transforms a stream to a future that resolves when stream finishes.
154 fn finish(self) -> Finish<Self>
155 where
156 Self: Sized,
157 {
158 Finish::new(self)
159 }
160}
161
162impl<A, S> ActorStreamExt<A> for S
163where
164 S: ActorStream<A>,
165 A: Actor,
166{
167}
168
169/// Helper trait that allows conversion of normal stream into `ActorStream`
170pub trait WrapStream<A>
171where
172 A: Actor,
173{
174 /// The stream that this type can be converted into.
175 type Stream: ActorStream<A>;
176
177 #[deprecated(since = "0.11.0", note = "Please use WrapStream::into_actor")]
178 #[doc(hidden)]
179 fn actstream(self) -> Self::Stream;
180
181 /// Convert normal stream to a [`ActorStream`]
182 fn into_actor(self, a: &A) -> Self::Stream;
183}
184
185impl<S, A> WrapStream<A> for S
186where
187 S: Stream,
188 A: Actor,
189{
190 type Stream = StreamWrap<S, A>;
191
192 #[doc(hidden)]
193 fn actstream(self) -> Self::Stream {
194 wrap_stream(self)
195 }
196
197 fn into_actor(self, _: &A) -> Self::Stream {
198 wrap_stream(self)
199 }
200}
201
202pin_project! {
203 pub struct StreamWrap<S, A>
204 where
205 S: Stream,
206 A: Actor
207 {
208 #[pin]
209 stream: S,
210 _act: PhantomData<A>
211 }
212}
213
214/// Converts normal stream into `ActorStream`
215pub fn wrap_stream<S, A>(stream: S) -> StreamWrap<S, A>
216where
217 S: Stream,
218 A: Actor,
219{
220 StreamWrap {
221 stream,
222 _act: PhantomData,
223 }
224}
225
226impl<S, A> ActorStream<A> for StreamWrap<S, A>
227where
228 S: Stream,
229 A: Actor,
230{
231 type Item = S::Item;
232
233 fn poll_next(
234 self: Pin<&mut Self>,
235 _: &mut A,
236 _: &mut A::Context,
237 task: &mut Context<'_>,
238 ) -> Poll<Option<Self::Item>> {
239 self.project().stream.poll_next(task)
240 }
241}