1use std::{
2 future::Future,
3 pin::Pin,
4 task::{self, Poll},
5 time::Duration,
6};
7
8use pin_project_lite::pin_project;
9use tokio::sync::oneshot;
10
11use super::{
12 channel::{AddressSender, Sender},
13 MailboxError, SendError,
14};
15use crate::{clock::Sleep, handler::Message};
16
17pub type Request<A, M> = MsgRequest<AddressSender<A>, M>;
18
19pub type RecipientRequest<M> = MsgRequest<Box<dyn Sender<M>>, M>;
20
21pin_project! {
22 #[must_use = "You must wait on the request otherwise the Message will not be delivered"]
24 pub struct MsgRequest<S, M>
25 where
26 S: Sender<M>,
27 M: Message,
28 M: Send,
29 M::Result: Send
30 {
31 rx: Option<oneshot::Receiver<M::Result>>,
32 info: Option<(S, M)>,
33 #[pin]
34 timeout: Option<Sleep>,
35 }
36}
37
38impl<S, M> MsgRequest<S, M>
39where
40 S: Sender<M>,
41 M: Message + Send,
42 M::Result: Send,
43{
44 pub(crate) fn new(rx: Option<oneshot::Receiver<M::Result>>, info: Option<(S, M)>) -> Self {
45 Self {
46 rx,
47 info,
48 timeout: None,
49 }
50 }
51
52 #[cfg(test)]
53 pub(crate) fn rx_is_some(&self) -> bool {
54 self.rx.is_some()
55 }
56
57 pub fn timeout(mut self, dur: Duration) -> Self {
59 self.timeout = Some(actix_rt::time::sleep(dur));
60 self
61 }
62}
63
64impl<S, M> Future for MsgRequest<S, M>
65where
66 S: Sender<M>,
67 M: Message + Send,
68 M::Result: Send,
69{
70 type Output = Result<M::Result, MailboxError>;
71
72 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
73 let this = self.project();
74
75 if let Some((sender, msg)) = this.info.take() {
76 match sender.send(msg) {
77 Ok(rx) => *this.rx = Some(rx),
78 Err(SendError::Full(msg)) => {
79 *this.info = Some((sender, msg));
80 return Poll::Pending;
81 }
82 Err(SendError::Closed(_)) => return Poll::Ready(Err(MailboxError::Closed)),
83 }
84 }
85
86 match this.rx {
87 Some(rx) => match Pin::new(rx).poll(cx) {
88 Poll::Ready(res) => Poll::Ready(res.map_err(|_| MailboxError::Closed)),
89 Poll::Pending => match this.timeout.as_pin_mut() {
90 Some(timeout) => timeout.poll(cx).map(|_| Err(MailboxError::Timeout)),
91 None => Poll::Pending,
92 },
93 },
94 None => Poll::Ready(Err(MailboxError::Closed)),
95 }
96 }
97}