actix/stream.rs
1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_core::{ready, stream::Stream};
7use log::error;
8use pin_project_lite::pin_project;
9
10use crate::{
11 actor::{Actor, ActorContext, ActorState, AsyncContext, SpawnHandle},
12 fut::ActorFuture,
13};
14
15/// Stream handling for Actors.
16///
17/// This is helper trait that allows handling [`Stream`]s in a similar way to normal actor messages.
18/// When stream resolves its next item, `handle()` is called with that item.
19///
20/// When the stream completes, `finished()` is called. By default, it stops Actor execution.
21///
22/// # Examples
23/// ```
24/// use actix::prelude::*;
25/// use futures_util::stream::once;
26///
27/// #[derive(Message)]
28/// #[rtype(result = "()")]
29/// struct Ping;
30///
31/// struct MyActor;
32///
33/// impl StreamHandler<Ping> for MyActor {
34/// fn handle(&mut self, item: Ping, ctx: &mut Context<MyActor>) {
35/// println!("PING");
36/// System::current().stop()
37/// }
38///
39/// fn finished(&mut self, ctx: &mut Self::Context) {
40/// println!("finished");
41/// }
42/// }
43///
44/// impl Actor for MyActor {
45/// type Context = Context<Self>;
46///
47/// fn started(&mut self, ctx: &mut Context<Self>) {
48/// Self::add_stream(once(async { Ping }), ctx);
49/// }
50/// }
51///
52/// #[actix::main]
53/// async fn main() {
54/// MyActor.start();
55/// # System::current().stop();
56/// }
57/// ```
58#[allow(unused_variables)]
59pub trait StreamHandler<I>
60where
61 Self: Actor,
62{
63 /// Called for every message emitted by the stream.
64 fn handle(&mut self, item: I, ctx: &mut Self::Context);
65
66 /// Called when stream emits first item.
67 ///
68 /// Default implementation does nothing.
69 fn started(&mut self, ctx: &mut Self::Context) {}
70
71 /// Called when stream finishes.
72 ///
73 /// Default implementation stops Actor execution.
74 fn finished(&mut self, ctx: &mut Self::Context) {
75 ctx.stop()
76 }
77
78 /// Register a Stream to the actor context.
79 fn add_stream<S>(stream: S, ctx: &mut Self::Context) -> SpawnHandle
80 where
81 S: Stream + 'static,
82 Self: StreamHandler<S::Item>,
83 Self::Context: AsyncContext<Self>,
84 {
85 if ctx.state() == ActorState::Stopped {
86 error!("Context::add_stream called for stopped actor.");
87 SpawnHandle::default()
88 } else {
89 ctx.spawn(ActorStream::new(stream))
90 }
91 }
92}
93
94pin_project! {
95 pub(crate) struct ActorStream<S> {
96 #[pin]
97 stream: S,
98 started: bool,
99 }
100}
101
102impl<S> ActorStream<S> {
103 pub fn new(fut: S) -> Self {
104 Self {
105 stream: fut,
106 started: false,
107 }
108 }
109}
110
111impl<A, S> ActorFuture<A> for ActorStream<S>
112where
113 S: Stream,
114 A: Actor + StreamHandler<S::Item>,
115 A::Context: AsyncContext<A>,
116{
117 type Output = ();
118
119 fn poll(
120 self: Pin<&mut Self>,
121 act: &mut A,
122 ctx: &mut A::Context,
123 task: &mut Context<'_>,
124 ) -> Poll<Self::Output> {
125 let mut this = self.project();
126
127 if !*this.started {
128 *this.started = true;
129 <A as StreamHandler<S::Item>>::started(act, ctx);
130 }
131
132 let mut polled = 0;
133
134 while let Some(msg) = ready!(this.stream.as_mut().poll_next(task)) {
135 A::handle(act, msg, ctx);
136
137 polled += 1;
138
139 if ctx.waiting() {
140 return Poll::Pending;
141 } else if polled == 16 {
142 // Yield after 16 consecutive polls on this stream and self wake up.
143 // This is to prevent starvation of other actor futures when this stream yield
144 // too many item in short period of time.
145 task.waker().wake_by_ref();
146 return Poll::Pending;
147 }
148 }
149
150 A::finished(act, ctx);
151 Poll::Ready(())
152 }
153}