actix/handler.rs
1use std::{fmt, future::Future, pin::Pin, sync::Arc};
2
3pub use tokio::sync::oneshot::Sender as OneshotSender;
4
5use crate::{
6 actor::{Actor, AsyncContext},
7 address::Addr,
8 fut::{ActorFuture, ActorFutureExt, LocalBoxActorFuture},
9};
10
11/// Describes how to handle messages of a specific type.
12///
13/// Implementing `Handler` is a general way to handle incoming
14/// messages, streams, and futures.
15///
16/// The type `M` is a message which can be handled by the actor.
17#[allow(unused_variables)]
18pub trait Handler<M>
19where
20 Self: Actor,
21 M: Message,
22{
23 /// The type of value that this handler will return.
24 ///
25 /// Check the [`MessageResponse`] trait for some details
26 /// on how a message can be responded to.
27 type Result: MessageResponse<Self, M>;
28
29 /// This method is called for every message received by this actor.
30 fn handle(&mut self, msg: M, ctx: &mut Self::Context) -> Self::Result;
31}
32
33/// Represent message that can be handled by an actor.
34pub trait Message {
35 /// The type of value that this message will resolved with if it is
36 /// successful.
37 type Result: 'static;
38}
39
40/// Allow users to use `Arc<M>` as a message without having to re-impl `Message`
41impl<M> Message for Arc<M>
42where
43 M: Message,
44{
45 type Result = M::Result;
46}
47
48/// Allow users to use `Box<M>` as a message without having to re-impl `Message`
49impl<M> Message for Box<M>
50where
51 M: Message,
52{
53 type Result = M::Result;
54}
55
56/// A helper type that implements the [`MessageResponse`] trait.
57///
58/// # Examples
59/// ```no_run
60/// use actix::prelude::*;
61///
62/// #[derive(Message)]
63/// #[rtype(Response)]
64/// struct Msg;
65///
66/// struct Response;
67///
68/// struct MyActor;
69///
70/// impl Actor for MyActor {
71/// type Context = Context<Self>;
72/// }
73///
74/// impl Handler<Msg> for MyActor {
75/// type Result = MessageResult<Msg>;
76///
77/// fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result {
78/// MessageResult(Response {})
79/// }
80/// }
81/// ```
82pub struct MessageResult<M: Message>(pub M::Result);
83
84/// A specialized actor future holder for atomic asynchronous message handling.
85///
86/// Intended be used when the future returned will need exclusive access Actor's
87/// internal state or context, e.g., it can yield at critical sessions.
88/// When the actor starts to process this future, it will not pull any other
89/// spawned futures until this one as been completed.
90/// Check [`ActorFuture`] for available methods for accessing Actor's
91/// internal state.
92///
93/// ## Note
94/// The runtime itself is not blocked in the process, only the Actor,
95/// other futures, and therefore, other actors are still allowed to make
96/// progress when this [`AtomicResponse`] is used.
97///
98/// # Examples
99/// On the following example, the response to `Msg` would always be 29
100/// even if there are multiple `Msg` sent to `MyActor`.
101/// ```no_run
102/// # use actix::prelude::*;
103/// # use actix::clock::sleep;
104/// # use std::time::Duration;
105/// #
106/// # #[derive(Message)]
107/// # #[rtype(usize)]
108/// # struct Msg;
109/// #
110/// # struct MyActor(usize);
111/// #
112/// # impl Actor for MyActor {
113/// # type Context = Context<Self>;
114/// # }
115/// #
116/// impl Handler<Msg> for MyActor {
117/// type Result = AtomicResponse<Self, usize>;
118///
119/// fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result {
120/// AtomicResponse::new(Box::pin(
121/// async {}
122/// .into_actor(self)
123/// .map(|_, this, _| {
124/// this.0 = 30;
125/// })
126/// .then(|_, this, _| {
127/// sleep(Duration::from_secs(3)).into_actor(this)
128/// })
129/// .map(|_, this, _| {
130/// this.0 -= 1;
131/// this.0
132/// }),
133/// ))
134/// }
135/// }
136/// ```
137pub struct AtomicResponse<A, T>(ResponseActFuture<A, T>);
138
139impl<A, T> AtomicResponse<A, T> {
140 pub fn new(fut: ResponseActFuture<A, T>) -> Self {
141 AtomicResponse(fut)
142 }
143}
144
145impl<A, M> MessageResponse<A, M> for AtomicResponse<A, M::Result>
146where
147 A: Actor,
148 M: Message,
149 A::Context: AsyncContext<A>,
150{
151 fn handle(self, ctx: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
152 ctx.wait(self.0.map(|res, _, _| tx.send(res)));
153 }
154}
155
156/// A specialized actor future for asynchronous message handling.
157///
158/// Intended be used when the future returned will, at some point, need to access Actor's internal
159/// state or context in order to finish. Check [`ActorFuture`] for available methods for accessing
160/// Actor's internal state.
161///
162/// # Note
163/// It's important to keep in mind that the provided [`AsyncContext`], does not enforce the poll of
164/// any [`ActorFuture`] to be exclusive. Therefore, if other instances of [`ActorFuture`] are spawned
165/// into this Context **their execution won't necessarily be atomic**. Check [`AtomicResponse`] if you
166/// need exclusive access over the actor.
167///
168/// # Examples
169/// ```no_run
170/// use actix::prelude::*;
171///
172/// #[derive(Message)]
173/// #[rtype(result = "Result<usize, ()>")]
174/// struct Msg;
175///
176/// struct MyActor;
177///
178/// impl Actor for MyActor {
179/// type Context = Context<Self>;
180/// }
181///
182/// impl Handler<Msg> for MyActor {
183/// type Result = ResponseActFuture<Self, Result<usize, ()>>;
184///
185/// fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result {
186/// Box::pin(
187/// async {
188/// // Some async computation
189/// 42
190/// }
191/// .into_actor(self) // converts future to ActorFuture
192/// .map(|res, _act, _ctx| {
193/// // Do some computation with actor's state or context
194/// Ok(res)
195/// }),
196/// )
197/// }
198/// }
199/// ```
200pub type ResponseActFuture<A, I> = LocalBoxActorFuture<A, I>;
201
202/// A specialized future for asynchronous message handling.
203///
204/// Intended be used for when the future returned doesn't
205/// need to access Actor's internal state or context to progress, either
206/// because it's completely agnostic to it, or because the required data has
207/// already been moved inside the future and it won't need Actor state to continue.
208///
209/// # Examples
210/// ```no_run
211/// use actix::prelude::*;
212///
213/// #[derive(Message)]
214/// #[rtype(result = "Result<(), ()>")]
215/// struct Msg;
216///
217/// struct MyActor;
218///
219/// impl Actor for MyActor {
220/// type Context = Context<Self>;
221/// }
222///
223/// impl Handler<Msg> for MyActor {
224/// type Result = ResponseFuture<Result<(), ()>>;
225///
226/// fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result {
227/// Box::pin(async move {
228/// // Some async computation
229/// Ok(())
230/// })
231/// }
232/// }
233/// ```
234pub type ResponseFuture<I> = Pin<Box<dyn Future<Output = I>>>;
235
236/// A trait which defines message responses.
237///
238/// We offer implementation for some common language types, if you need
239/// to respond with a new type you can use [`MessageResult`].
240///
241/// If `Actor::Context` implements [`AsyncContext`] it's possible to handle
242/// the message asynchronously.
243/// For asynchronous message handling we offer the following possible response types:
244/// - [`ResponseFuture`] should be used for when the future returned doesn't
245/// need to access Actor's internal state or context to progress, either
246/// because it's completely agnostic to it or because the required data has
247/// already been moved to it and it won't need Actor state to continue.
248/// - [`ResponseActFuture`] should be used when the future returned
249/// will, at some point, need to access Actor's internal state or context
250/// in order to finish.
251/// - [`AtomicResponse`] should be used when the future returned needs exclusive
252/// access to Actor's internal state or context.
253pub trait MessageResponse<A: Actor, M: Message> {
254 fn handle(self, ctx: &mut A::Context, tx: Option<OneshotSender<M::Result>>);
255}
256
257impl<A, M> MessageResponse<A, M> for MessageResult<M>
258where
259 A: Actor,
260 M: Message,
261{
262 fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
263 tx.send(self.0)
264 }
265}
266
267impl<A, M, I, E> MessageResponse<A, M> for Result<I, E>
268where
269 A: Actor,
270 M: Message<Result = Self>,
271 I: 'static,
272 E: 'static,
273{
274 fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
275 tx.send(self)
276 }
277}
278
279impl<A, M, I> MessageResponse<A, M> for Arc<I>
280where
281 A: Actor,
282 M: Message<Result = Self>,
283 I: 'static,
284{
285 fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
286 tx.send(self)
287 }
288}
289
290impl<A, M, I> MessageResponse<A, M> for Option<I>
291where
292 A: Actor,
293 M: Message<Result = Self>,
294 I: 'static,
295{
296 fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
297 tx.send(self)
298 }
299}
300
301impl<A, M, I> MessageResponse<A, M> for Vec<I>
302where
303 A: Actor,
304 M: Message<Result = Self>,
305 I: 'static,
306{
307 fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
308 tx.send(self)
309 }
310}
311
312impl<A, M, B> MessageResponse<A, M> for Addr<B>
313where
314 A: Actor,
315 M: Message<Result = Self>,
316 B: Actor,
317{
318 fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
319 tx.send(self)
320 }
321}
322
323impl<A, M> MessageResponse<A, M> for ResponseActFuture<A, M::Result>
324where
325 A: Actor,
326 M: Message,
327 A::Context: AsyncContext<A>,
328{
329 fn handle(self, ctx: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
330 ctx.spawn(self.map(|res, _, _| tx.send(res)));
331 }
332}
333
334/// [`MessageResponse`] trait impl to enable the use of any `I: 'static` with asynchronous
335/// message handling
336///
337/// # Examples
338/// Usage with `Result<I,E>`:
339/// ```
340/// # pub struct MyActorAsync {}
341/// # impl Actor for MyActorAsync { type Context = actix::Context<Self>; }
342/// # use actix::prelude::*;
343/// # use core::pin::Pin;
344/// #
345/// pub struct MyQuestion{}
346/// impl Message for MyQuestion {
347/// type Result = Result<u8,u8>;
348/// }
349/// impl Handler<MyQuestion> for MyActorAsync {
350/// type Result = ResponseFuture<Result<u8,u8>>;
351/// fn handle(&mut self, question: MyQuestion, _ctx: &mut Context<Self>) -> Self::Result {
352/// Box::pin(async {Ok(5)})
353/// }
354/// }
355/// ```
356/// Usage with `Option<I>`:
357/// ```
358/// # pub struct MyActorAsync {}
359/// # impl Actor for MyActorAsync { type Context = actix::Context<Self>; }
360/// # use actix::prelude::*;
361/// # use core::pin::Pin;
362/// pub struct MyQuestion{}
363/// impl Message for MyQuestion {
364/// type Result = Option<u8>;
365/// }
366/// impl Handler<MyQuestion> for MyActorAsync {
367/// type Result = ResponseFuture<Option<u8>>;
368/// fn handle(&mut self, question: MyQuestion, _ctx: &mut Context<Self>) -> Self::Result {
369/// Box::pin(async {Some(5)})
370/// }
371/// }
372/// ```
373/// Usage with any `I: 'static`:
374/// ```
375/// # pub struct MyActorAsync {}
376/// # impl Actor for MyActorAsync { type Context = actix::Context<Self>; }
377/// # use actix::prelude::*;
378/// # use core::pin::Pin;
379/// pub struct MyQuestion{}
380/// impl Message for MyQuestion {
381/// type Result = u8;
382/// }
383/// impl Handler<MyQuestion> for MyActorAsync {
384/// type Result = ResponseFuture<u8>;
385/// fn handle(&mut self, question: MyQuestion, _ctx: &mut Context<Self>) -> Self::Result {
386/// Box::pin(async {5})
387/// }
388/// }
389/// ```
390impl<A, M> MessageResponse<A, M> for ResponseFuture<M::Result>
391where
392 A: Actor,
393 M: Message,
394{
395 fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
396 actix_rt::spawn(async { tx.send(self.await) });
397 }
398}
399
400enum ResponseTypeItem<I> {
401 Result(I),
402 Fut(Pin<Box<dyn Future<Output = I>>>),
403}
404
405/// Helper type for representing different type of message responses
406pub struct Response<I> {
407 item: ResponseTypeItem<I>,
408}
409
410impl<I> fmt::Debug for Response<I> {
411 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
412 let mut fmt = fmt.debug_struct("Response");
413 match self.item {
414 ResponseTypeItem::Result(_) => fmt.field("item", &"Result(_)".to_string()),
415 ResponseTypeItem::Fut(_) => fmt.field("item", &"Fut(_)".to_string()),
416 }
417 .finish()
418 }
419}
420
421impl<I> Response<I> {
422 /// Creates an asynchronous response.
423 pub fn fut<T>(fut: T) -> Self
424 where
425 T: Future<Output = I> + 'static,
426 {
427 Self {
428 item: ResponseTypeItem::Fut(Box::pin(fut)),
429 }
430 }
431
432 /// Creates a response.
433 pub fn reply(val: I) -> Self {
434 Self {
435 item: ResponseTypeItem::Result(val),
436 }
437 }
438}
439
440impl<A, M> MessageResponse<A, M> for Response<M::Result>
441where
442 A: Actor,
443 M: Message,
444{
445 fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
446 match self.item {
447 ResponseTypeItem::Fut(fut) => {
448 actix_rt::spawn(async { tx.send(fut.await) });
449 }
450 ResponseTypeItem::Result(res) => tx.send(res),
451 }
452 }
453}
454
455enum ActorResponseTypeItem<A, I> {
456 Result(I),
457 Fut(Pin<Box<dyn ActorFuture<A, Output = I>>>),
458}
459
460/// A helper type for representing different types of message responses.
461pub struct ActorResponse<A, I> {
462 item: ActorResponseTypeItem<A, I>,
463}
464
465impl<A, I> fmt::Debug for ActorResponse<A, I> {
466 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
467 let mut fmt = fmt.debug_struct("ActorResponse");
468 match self.item {
469 ActorResponseTypeItem::Result(_) => fmt.field("item", &"Result(_)".to_string()),
470 ActorResponseTypeItem::Fut(_) => fmt.field("item", &"Fut(_)".to_string()),
471 }
472 .finish()
473 }
474}
475
476impl<A: Actor, I> ActorResponse<A, I> {
477 /// Creates a response.
478 pub fn reply(val: I) -> Self {
479 Self {
480 item: ActorResponseTypeItem::Result(val),
481 }
482 }
483
484 /// Creates an asynchronous response.
485 pub fn r#async<T>(fut: T) -> Self
486 where
487 T: ActorFuture<A, Output = I> + 'static,
488 {
489 Self {
490 item: ActorResponseTypeItem::Fut(Box::pin(fut)),
491 }
492 }
493}
494
495impl<A, M> MessageResponse<A, M> for ActorResponse<A, M::Result>
496where
497 A: Actor,
498 M: Message,
499 A::Context: AsyncContext<A>,
500{
501 fn handle(self, ctx: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
502 match self.item {
503 ActorResponseTypeItem::Fut(fut) => {
504 let fut = fut.map(|res, _, _| tx.send(res));
505 ctx.spawn(fut);
506 }
507 ActorResponseTypeItem::Result(res) => tx.send(res),
508 }
509 }
510}
511
512impl<A: Actor, I> From<Pin<Box<dyn ActorFuture<A, Output = I>>>> for ActorResponse<A, I> {
513 fn from(fut: Pin<Box<dyn ActorFuture<A, Output = I>>>) -> Self {
514 Self {
515 item: ActorResponseTypeItem::Fut(fut),
516 }
517 }
518}
519
520macro_rules! SIMPLE_RESULT {
521 ($type:ty) => {
522 impl<A, M> MessageResponse<A, M> for $type
523 where
524 A: Actor,
525 M: Message<Result = $type>,
526 {
527 fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<$type>>) {
528 tx.send(self)
529 }
530 }
531 };
532}
533
534SIMPLE_RESULT!(());
535SIMPLE_RESULT!(u8);
536SIMPLE_RESULT!(u16);
537SIMPLE_RESULT!(u32);
538SIMPLE_RESULT!(u64);
539SIMPLE_RESULT!(usize);
540SIMPLE_RESULT!(i8);
541SIMPLE_RESULT!(i16);
542SIMPLE_RESULT!(i32);
543SIMPLE_RESULT!(i64);
544SIMPLE_RESULT!(isize);
545SIMPLE_RESULT!(f32);
546SIMPLE_RESULT!(f64);
547SIMPLE_RESULT!(String);
548SIMPLE_RESULT!(bool);
549
550// Helper trait for send one shot message from Option<Sender> type.
551// None and error are ignored.
552trait OneshotSend<M> {
553 fn send(self, msg: M);
554}
555
556impl<M> OneshotSend<M> for Option<OneshotSender<M>> {
557 fn send(self, msg: M) {
558 if let Some(tx) = self {
559 let _ = tx.send(msg);
560 }
561 }
562}