actix/stream.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures_core::{ready, stream::Stream};
use log::error;
use pin_project_lite::pin_project;
use crate::{
actor::{Actor, ActorContext, ActorState, AsyncContext, SpawnHandle},
fut::ActorFuture,
};
/// Stream handling for Actors.
///
/// This is helper trait that allows handling [`Stream`]s in a similar way to normal actor messages.
/// When stream resolves its next item, `handle()` is called with that item.
///
/// When the stream completes, `finished()` is called. By default, it stops Actor execution.
///
/// # Examples
/// ```
/// use actix::prelude::*;
/// use futures_util::stream::once;
///
/// #[derive(Message)]
/// #[rtype(result = "()")]
/// struct Ping;
///
/// struct MyActor;
///
/// impl StreamHandler<Ping> for MyActor {
/// fn handle(&mut self, item: Ping, ctx: &mut Context<MyActor>) {
/// println!("PING");
/// System::current().stop()
/// }
///
/// fn finished(&mut self, ctx: &mut Self::Context) {
/// println!("finished");
/// }
/// }
///
/// impl Actor for MyActor {
/// type Context = Context<Self>;
///
/// fn started(&mut self, ctx: &mut Context<Self>) {
/// Self::add_stream(once(async { Ping }), ctx);
/// }
/// }
///
/// #[actix::main]
/// async fn main() {
/// MyActor.start();
/// # System::current().stop();
/// }
/// ```
#[allow(unused_variables)]
pub trait StreamHandler<I>
where
Self: Actor,
{
/// Called for every message emitted by the stream.
fn handle(&mut self, item: I, ctx: &mut Self::Context);
/// Called when stream emits first item.
///
/// Default implementation does nothing.
fn started(&mut self, ctx: &mut Self::Context) {}
/// Called when stream finishes.
///
/// Default implementation stops Actor execution.
fn finished(&mut self, ctx: &mut Self::Context) {
ctx.stop()
}
/// Register a Stream to the actor context.
fn add_stream<S>(stream: S, ctx: &mut Self::Context) -> SpawnHandle
where
S: Stream + 'static,
Self: StreamHandler<S::Item>,
Self::Context: AsyncContext<Self>,
{
if ctx.state() == ActorState::Stopped {
error!("Context::add_stream called for stopped actor.");
SpawnHandle::default()
} else {
ctx.spawn(ActorStream::new(stream))
}
}
}
pin_project! {
pub(crate) struct ActorStream<S> {
#[pin]
stream: S,
started: bool,
}
}
impl<S> ActorStream<S> {
pub fn new(fut: S) -> Self {
Self {
stream: fut,
started: false,
}
}
}
impl<A, S> ActorFuture<A> for ActorStream<S>
where
S: Stream,
A: Actor + StreamHandler<S::Item>,
A::Context: AsyncContext<A>,
{
type Output = ();
fn poll(
self: Pin<&mut Self>,
act: &mut A,
ctx: &mut A::Context,
task: &mut Context<'_>,
) -> Poll<Self::Output> {
let mut this = self.project();
if !*this.started {
*this.started = true;
<A as StreamHandler<S::Item>>::started(act, ctx);
}
let mut polled = 0;
while let Some(msg) = ready!(this.stream.as_mut().poll_next(task)) {
A::handle(act, msg, ctx);
polled += 1;
if ctx.waiting() {
return Poll::Pending;
} else if polled == 16 {
// Yield after 16 consecutive polls on this stream and self wake up.
// This is to prevent starvation of other actor futures when this stream yield
// too many item in short period of time.
task.waker().wake_by_ref();
return Poll::Pending;
}
}
A::finished(act, ctx);
Poll::Ready(())
}
}