actix/actor.rs
1use std::time::Duration;
2
3use actix_rt::ArbiterHandle;
4use futures_core::stream::Stream;
5use log::error;
6
7use crate::{
8 address::{channel, Addr},
9 context::Context,
10 context_items::{ActorDelayedMessageItem, ActorMessageItem, ActorMessageStreamItem},
11 fut::{ActorFuture, ActorStreamExt},
12 handler::{Handler, Message},
13 mailbox::DEFAULT_CAPACITY,
14 stream::StreamHandler,
15 utils::{IntervalFunc, TimerFunc},
16};
17
18/// Actors are objects which encapsulate state and behavior.
19///
20/// Actors run within a specific execution context
21/// [`Context<A>`](struct.Context.html). The context object is available
22/// only during execution. Each actor has a separate execution
23/// context. The execution context also controls the lifecycle of an
24/// actor.
25///
26/// Actors communicate exclusively by exchanging messages. The sender
27/// actor can wait for a response. Actors are not referenced directly,
28/// but by address [`Addr`](struct.Addr.html) To be able to handle a
29/// specific message actor has to provide a
30/// [`Handler<M>`](trait.Handler.html) implementation for this
31/// message. All messages are statically typed. A message can be
32/// handled in asynchronous fashion. An actor can spawn other actors
33/// or add futures or streams to the execution context. The actor
34/// trait provides several methods that allow controlling the actor
35/// lifecycle.
36///
37/// # Actor lifecycle
38///
39/// ## Started
40///
41/// An actor starts in the `Started` state, during this state the
42/// `started` method gets called.
43///
44/// ## Running
45///
46/// After an actor's `started` method got called, the actor
47/// transitions to the `Running` state. An actor can stay in the
48/// `running` state for an indefinite amount of time.
49///
50/// ## Stopping
51///
52/// The actor's execution state changes to `stopping` in the following
53/// situations:
54///
55/// * `Context::stop` gets called by actor itself
56/// * all addresses to the actor get dropped
57/// * no evented objects are registered in its context.
58///
59/// An actor can return from the `stopping` state to the `running`
60/// state by creating a new address or adding an evented object, like
61/// a future or stream, in its `Actor::stopping` method.
62///
63/// If an actor changed to a `stopping` state because
64/// `Context::stop()` got called, the context then immediately stops
65/// processing incoming messages and calls the `Actor::stopping()`
66/// method. If an actor does not return back to a `running` state,
67/// all unprocessed messages get dropped.
68///
69/// ## Stopped
70///
71/// If an actor does not modify execution context while in stopping
72/// state, the actor state changes to `Stopped`. This state is
73/// considered final and at this point the actor gets dropped.
74#[allow(unused_variables)]
75pub trait Actor: Sized + Unpin + 'static {
76 /// Actor execution context type
77 type Context: ActorContext;
78
79 /// Called when an actor gets polled the first time.
80 fn started(&mut self, ctx: &mut Self::Context) {}
81
82 /// Called after an actor is in `Actor::Stopping` state.
83 ///
84 /// There can be several reasons for stopping:
85 ///
86 /// - `Context::stop` gets called by the actor itself.
87 /// - All addresses to the current actor get dropped and no more
88 /// evented objects are left in the context.
89 ///
90 /// An actor can return from the stopping state to the running
91 /// state by returning `Running::Continue`.
92 fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
93 Running::Stop
94 }
95
96 /// Called after an actor is stopped.
97 ///
98 /// This method can be used to perform any needed cleanup work or
99 /// to spawn more actors. This is the final state, after this
100 /// method got called, the actor will be dropped.
101 fn stopped(&mut self, ctx: &mut Self::Context) {}
102
103 /// Start a new asynchronous actor, returning its address.
104 ///
105 /// # Examples
106 ///
107 /// ```
108 /// use actix::prelude::*;
109 ///
110 /// struct MyActor;
111 /// impl Actor for MyActor {
112 /// type Context = Context<Self>;
113 /// }
114 ///
115 /// #[actix::main]
116 /// async fn main() {
117 /// // start actor and get its address
118 /// let addr = MyActor.start();
119 /// # System::current().stop();
120 /// }
121 /// ```
122 fn start(self) -> Addr<Self>
123 where
124 Self: Actor<Context = Context<Self>>,
125 {
126 Context::new().run(self)
127 }
128
129 /// Construct and start a new asynchronous actor, returning its
130 /// address.
131 ///
132 /// This is constructs a new actor using the `Default` trait, and
133 /// invokes its `start` method.
134 fn start_default() -> Addr<Self>
135 where
136 Self: Actor<Context = Context<Self>> + Default,
137 {
138 Self::default().start()
139 }
140
141 /// Start new actor in arbiter's thread.
142 fn start_in_arbiter<F>(wrk: &ArbiterHandle, f: F) -> Addr<Self>
143 where
144 Self: Actor<Context = Context<Self>>,
145 F: FnOnce(&mut Context<Self>) -> Self + Send + 'static,
146 {
147 let (tx, rx) = channel::channel(DEFAULT_CAPACITY);
148
149 // create actor
150 wrk.spawn_fn(move || {
151 let mut ctx = Context::with_receiver(rx);
152 let act = f(&mut ctx);
153 let fut = ctx.into_future(act);
154
155 actix_rt::spawn(fut);
156 });
157
158 Addr::new(tx)
159 }
160
161 /// Start a new asynchronous actor given a `Context`.
162 ///
163 /// Use this method if you need the `Context` object during actor
164 /// initialization.
165 ///
166 /// # Examples
167 ///
168 /// ```
169 /// use actix::prelude::*;
170 ///
171 /// struct MyActor {
172 /// val: usize,
173 /// }
174 /// impl Actor for MyActor {
175 /// type Context = Context<Self>;
176 /// }
177 ///
178 /// #[actix::main]
179 /// async fn main() {
180 /// let addr = MyActor::create(|ctx: &mut Context<MyActor>| MyActor { val: 10 });
181 /// # System::current().stop();
182 /// }
183 /// ```
184 fn create<F>(f: F) -> Addr<Self>
185 where
186 Self: Actor<Context = Context<Self>>,
187 F: FnOnce(&mut Context<Self>) -> Self,
188 {
189 let mut ctx = Context::new();
190 let act = f(&mut ctx);
191 ctx.run(act)
192 }
193}
194
195#[allow(unused_variables)]
196/// Actors with the ability to restart after failure.
197///
198/// Supervised actors can be managed by a
199/// [`Supervisor`](struct.Supervisor.html). As an additional lifecycle
200/// event, the `restarting` method can be implemented.
201///
202/// If a supervised actor fails, its supervisor creates new execution
203/// context and restarts the actor, invoking its `restarting` method.
204/// After a call to this method, the actor's execution state changes
205/// to `Started` and the regular lifecycle process starts.
206///
207/// The `restarting` method gets called with the newly constructed
208/// `Context` object.
209pub trait Supervised: Actor {
210 /// Called when the supervisor restarts a failed actor.
211 fn restarting(&mut self, ctx: &mut <Self as Actor>::Context) {}
212}
213
214/// Actor execution state
215#[derive(PartialEq, Debug, Copy, Clone)]
216pub enum ActorState {
217 /// Actor is started.
218 Started,
219 /// Actor is running.
220 Running,
221 /// Actor is stopping.
222 Stopping,
223 /// Actor is stopped.
224 Stopped,
225}
226
227#[derive(Clone, Copy, Debug, PartialEq)]
228pub enum Running {
229 Stop,
230 Continue,
231}
232
233impl ActorState {
234 /// Indicates whether the actor is alive.
235 pub fn alive(self) -> bool {
236 self == ActorState::Started || self == ActorState::Running
237 }
238 /// Indicates whether the actor is stopped or stopping.
239 pub fn stopping(self) -> bool {
240 self == ActorState::Stopping || self == ActorState::Stopped
241 }
242}
243
244/// Actor execution context.
245///
246/// Each actor runs within a specific execution context. The actor's
247/// associated type `Actor::Context` defines the context to use for
248/// the actor, and must implement the `ActorContext` trait.
249///
250/// The execution context defines the type of execution, and the
251/// actor's communication channels (message handling).
252pub trait ActorContext: Sized {
253 /// Immediately stop processing incoming messages and switch to a
254 /// `stopping` state. This only affects actors that are currently
255 /// `running`. Future attempts to queue messages will fail.
256 fn stop(&mut self);
257
258 /// Terminate actor execution unconditionally. This sets the actor
259 /// into the `stopped` state. This causes future attempts to queue
260 /// messages to fail.
261 fn terminate(&mut self);
262
263 /// Retrieve the current Actor execution state.
264 fn state(&self) -> ActorState;
265}
266
267/// Asynchronous execution context.
268pub trait AsyncContext<A>: ActorContext
269where
270 A: Actor<Context = Self>,
271{
272 /// Returns the address of the context.
273 fn address(&self) -> Addr<A>;
274
275 /// Spawns a future into the context.
276 ///
277 /// Returns a handle of the spawned future, which can be used for
278 /// cancelling its execution.
279 ///
280 /// All futures spawned into an actor's context are cancelled
281 /// during the actor's stopping stage.
282 fn spawn<F>(&mut self, fut: F) -> SpawnHandle
283 where
284 F: ActorFuture<A, Output = ()> + 'static;
285
286 /// Spawns a future into the context, waiting for it to resolve.
287 ///
288 /// This stops processing any incoming events until the future
289 /// resolves.
290 fn wait<F>(&mut self, fut: F)
291 where
292 F: ActorFuture<A, Output = ()> + 'static;
293
294 /// Checks if the context is paused (waiting for future completion or stopping).
295 fn waiting(&self) -> bool;
296
297 /// Cancels a spawned future.
298 ///
299 /// The `handle` is a value returned by the `spawn` method.
300 fn cancel_future(&mut self, handle: SpawnHandle) -> bool;
301
302 /// Registers a stream with the context.
303 ///
304 /// This allows handling a `Stream` in a way similar to normal
305 /// actor messages.
306 ///
307 /// ```
308 /// # use std::io;
309 /// use actix::prelude::*;
310 /// use futures_util::stream::once;
311 ///
312 /// #[derive(Message)]
313 /// #[rtype(result = "()")]
314 /// struct Ping;
315 ///
316 /// struct MyActor;
317 ///
318 /// impl StreamHandler<Ping> for MyActor {
319 ///
320 /// fn handle(&mut self, item: Ping, ctx: &mut Context<MyActor>) {
321 /// println!("PING");
322 /// System::current().stop();
323 /// }
324 ///
325 /// fn finished(&mut self, ctx: &mut Self::Context) {
326 /// println!("finished");
327 /// }
328 /// }
329 ///
330 /// impl Actor for MyActor {
331 /// type Context = Context<Self>;
332 ///
333 /// fn started(&mut self, ctx: &mut Context<Self>) {
334 /// // add stream
335 /// ctx.add_stream(once(async { Ping }));
336 /// }
337 /// }
338 ///
339 /// fn main() {
340 /// let mut sys = System::new();
341 /// let addr = sys.block_on(async { MyActor.start() });
342 /// sys.run();
343 /// }
344 /// ```
345 fn add_stream<S>(&mut self, fut: S) -> SpawnHandle
346 where
347 S: Stream + 'static,
348 A: StreamHandler<S::Item>,
349 {
350 <A as StreamHandler<S::Item>>::add_stream(fut, self)
351 }
352
353 /// Registers a stream with the context, ignoring errors.
354 ///
355 /// This method is similar to `add_stream` but it skips stream
356 /// errors.
357 ///
358 /// ```
359 /// use actix::prelude::*;
360 /// use futures_util::stream::once;
361 ///
362 /// #[derive(Message)]
363 /// #[rtype(result = "()")]
364 /// struct Ping;
365 ///
366 /// struct MyActor;
367 ///
368 /// impl Handler<Ping> for MyActor {
369 /// type Result = ();
370 ///
371 /// fn handle(&mut self, msg: Ping, ctx: &mut Context<MyActor>) {
372 /// println!("PING");
373 /// # System::current().stop();
374 /// }
375 /// }
376 ///
377 /// impl Actor for MyActor {
378 /// type Context = Context<Self>;
379 ///
380 /// fn started(&mut self, ctx: &mut Context<Self>) {
381 /// // add messages stream
382 /// ctx.add_message_stream(once(async { Ping }));
383 /// }
384 /// }
385 ///
386 /// # #[actix::main] async fn main () {
387 /// let addr = MyActor.start();
388 /// # System::current().stop(); }
389 /// ```
390 fn add_message_stream<S>(&mut self, fut: S)
391 where
392 S: Stream + 'static,
393 S::Item: Message,
394 A: Handler<S::Item>,
395 {
396 if self.state() == ActorState::Stopped {
397 error!("Context::add_message_stream called for stopped actor.");
398 } else {
399 self.spawn(ActorMessageStreamItem::new(fut));
400 }
401 }
402
403 /// Sends the message `msg` to self. This bypasses the mailbox capacity, and
404 /// will always queue the message. If the actor is in the `stopped` state, an
405 /// error will be raised.
406 fn notify<M>(&mut self, msg: M)
407 where
408 A: Handler<M>,
409 M: Message + 'static,
410 {
411 if self.state() == ActorState::Stopped {
412 error!("Context::notify called for stopped actor.");
413 } else {
414 self.spawn(ActorMessageItem::new(msg));
415 }
416 }
417
418 /// Sends the message `msg` to self after a specified period of time.
419 ///
420 /// Returns a spawn handle which can be used for cancellation. The
421 /// notification gets cancelled if the context's stop method gets
422 /// called. This bypasses the mailbox capacity, and
423 /// will always queue the message. If the actor is in the `stopped` state, an
424 /// error will be raised.
425 fn notify_later<M>(&mut self, msg: M, after: Duration) -> SpawnHandle
426 where
427 A: Handler<M>,
428 M: Message + 'static,
429 {
430 if self.state() == ActorState::Stopped {
431 error!("Context::notify_later called for stopped actor.");
432 SpawnHandle::default()
433 } else {
434 self.spawn(ActorDelayedMessageItem::new(msg, after))
435 }
436 }
437
438 /// Executes a closure after a specified period of time.
439 ///
440 /// The closure gets passed the same actor and its
441 /// context. Execution gets cancelled if the context's stop method
442 /// gets called.
443 fn run_later<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
444 where
445 F: FnOnce(&mut A, &mut A::Context) + 'static,
446 {
447 self.spawn(TimerFunc::new(dur, f))
448 }
449
450 /// Spawns a job to execute the given closure periodically, at a
451 /// specified fixed interval.
452 fn run_interval<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
453 where
454 F: FnMut(&mut A, &mut A::Context) + 'static,
455 {
456 self.spawn(IntervalFunc::new(dur, f).finish())
457 }
458
459 /// Spawns a periodic `task` function to begin executing at the given `start` time, and with the
460 /// given `interval` duration.
461 fn run_interval_at<F>(
462 &mut self,
463 start: tokio::time::Instant,
464 interval: Duration,
465 task: F,
466 ) -> SpawnHandle
467 where
468 F: FnMut(&mut A, &mut A::Context) + 'static,
469 {
470 self.spawn(IntervalFunc::new_at(start, interval, task).finish())
471 }
472}
473
474/// A handle to a spawned future.
475///
476/// Can be used to cancel the future.
477#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
478pub struct SpawnHandle(usize);
479
480impl SpawnHandle {
481 /// Gets the next handle.
482 pub fn next(self) -> SpawnHandle {
483 SpawnHandle(self.0 + 1)
484 }
485 #[doc(hidden)]
486 pub fn into_usize(self) -> usize {
487 self.0
488 }
489}