actix/address/
message.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{self, Poll},
5    time::Duration,
6};
7
8use pin_project_lite::pin_project;
9use tokio::sync::oneshot;
10
11use super::{
12    channel::{AddressSender, Sender},
13    MailboxError, SendError,
14};
15use crate::{clock::Sleep, handler::Message};
16
17pub type Request<A, M> = MsgRequest<AddressSender<A>, M>;
18
19pub type RecipientRequest<M> = MsgRequest<Box<dyn Sender<M>>, M>;
20
21pin_project! {
22    /// A `Future` which represents an asynchronous message sending process.
23    #[must_use = "You must wait on the request otherwise the Message will not be delivered"]
24    pub struct MsgRequest<S, M>
25    where
26        S: Sender<M>,
27        M: Message,
28        M: Send,
29        M::Result: Send
30    {
31        rx: Option<oneshot::Receiver<M::Result>>,
32        info: Option<(S, M)>,
33        #[pin]
34        timeout: Option<Sleep>,
35    }
36}
37
38impl<S, M> MsgRequest<S, M>
39where
40    S: Sender<M>,
41    M: Message + Send,
42    M::Result: Send,
43{
44    pub(crate) fn new(rx: Option<oneshot::Receiver<M::Result>>, info: Option<(S, M)>) -> Self {
45        Self {
46            rx,
47            info,
48            timeout: None,
49        }
50    }
51
52    #[cfg(test)]
53    pub(crate) fn rx_is_some(&self) -> bool {
54        self.rx.is_some()
55    }
56
57    /// Set message delivery timeout
58    pub fn timeout(mut self, dur: Duration) -> Self {
59        self.timeout = Some(actix_rt::time::sleep(dur));
60        self
61    }
62}
63
64impl<S, M> Future for MsgRequest<S, M>
65where
66    S: Sender<M>,
67    M: Message + Send,
68    M::Result: Send,
69{
70    type Output = Result<M::Result, MailboxError>;
71
72    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
73        let this = self.project();
74
75        if let Some((sender, msg)) = this.info.take() {
76            match sender.send(msg) {
77                Ok(rx) => *this.rx = Some(rx),
78                Err(SendError::Full(msg)) => {
79                    *this.info = Some((sender, msg));
80                    return Poll::Pending;
81                }
82                Err(SendError::Closed(_)) => return Poll::Ready(Err(MailboxError::Closed)),
83            }
84        }
85
86        match this.rx {
87            Some(rx) => match Pin::new(rx).poll(cx) {
88                Poll::Ready(res) => Poll::Ready(res.map_err(|_| MailboxError::Closed)),
89                Poll::Pending => match this.timeout.as_pin_mut() {
90                    Some(timeout) => timeout.poll(cx).map(|_| Err(MailboxError::Timeout)),
91                    None => Poll::Pending,
92                },
93            },
94            None => Poll::Ready(Err(MailboxError::Closed)),
95        }
96    }
97}