actix/supervisor.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
use std::{
future::Future,
pin::Pin,
task::{self, Poll},
};
use actix_rt::ArbiterHandle;
use pin_project_lite::pin_project;
use crate::{
actor::{Actor, AsyncContext, Supervised},
address::{channel, Addr},
context::Context,
context_impl::ContextFut,
mailbox::DEFAULT_CAPACITY,
};
pin_project! {
/// Actor supervisor
///
/// A Supervisor manages incoming messages for an actor. In case of actor failure,
/// the supervisor creates a new execution context and restarts the actor's lifecycle.
/// A Supervisor does not re-create their actor, it just calls the `restarting()`
/// method.
///
/// Supervisors have the same lifecycle as actors. If all addresses to
/// a supervisor gets dropped and its actor does not execute anything, the supervisor
/// terminates.
///
/// Supervisors can not guarantee that their actors successfully processes incoming
/// messages. If the actor fails during message processing, the message can not be
/// recovered. The sender would receive an `Err(Cancelled)` error in this situation.
///
/// # Examples
///
/// ```
/// # use actix::prelude::*;
/// #[derive(Message)]
/// #[rtype(result = "()")]
/// struct Die;
///
/// struct MyActor;
///
/// impl Actor for MyActor {
/// type Context = Context<Self>;
/// }
///
/// // To use actor with supervisor actor has to implement `Supervised` trait
/// impl actix::Supervised for MyActor {
/// fn restarting(&mut self, ctx: &mut Context<MyActor>) {
/// println!("restarting");
/// }
/// }
///
/// impl Handler<Die> for MyActor {
/// type Result = ();
///
/// fn handle(&mut self, _: Die, ctx: &mut Context<MyActor>) {
/// ctx.stop();
/// # System::current().stop();
/// }
/// }
///
/// fn main() {
/// let mut sys = System::new();
///
/// let addr = sys.block_on(async { actix::Supervisor::start(|_| MyActor) });
/// addr.do_send(Die);
///
/// sys.run();
/// }
/// ```
#[derive(Debug)]
pub struct Supervisor<A>
where
A: Supervised,
A: Actor<Context = Context<A>>
{
#[pin]
fut: ContextFut<A, Context<A>>,
}
}
impl<A> Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
/// Start new supervised actor in current tokio runtime.
///
/// Type of returned address depends on variable type. For example to get
/// `Addr<Syn, _>` of newly created actor, use explicitly `Addr<Syn,
/// _>` type as type of a variable.
///
/// ```
/// # use actix::prelude::*;
/// struct MyActor;
///
/// impl Actor for MyActor {
/// type Context = Context<Self>;
/// }
///
/// # impl actix::Supervised for MyActor {}
/// # fn main() {
/// # System::new().block_on(async {
/// // Get `Addr` of a MyActor actor
/// let addr = actix::Supervisor::start(|_| MyActor);
/// # System::current().stop();
/// # });}
/// ```
pub fn start<F>(f: F) -> Addr<A>
where
F: FnOnce(&mut A::Context) -> A + 'static,
A: Actor<Context = Context<A>>,
{
// create actor
let mut ctx = Context::new();
let act = f(&mut ctx);
let addr = ctx.address();
let fut = ctx.into_future(act);
// create supervisor
actix_rt::spawn(Self { fut });
addr
}
/// Start new supervised actor in arbiter's thread.
pub fn start_in_arbiter<F>(sys: &ArbiterHandle, f: F) -> Addr<A>
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut Context<A>) -> A + Send + 'static,
{
let (tx, rx) = channel::channel(DEFAULT_CAPACITY);
sys.spawn_fn(move || {
let mut ctx = Context::with_receiver(rx);
let act = f(&mut ctx);
let fut = ctx.into_future(act);
actix_rt::spawn(Self { fut });
});
Addr::new(tx)
}
}
#[doc(hidden)]
impl<A> Future for Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
match this.fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(_) => {
// stop if context's address is not connected
if !this.fut.restart() {
return Poll::Ready(());
}
}
}
}
}
}