1use super::{Policy, Retry};
4use futures_core::ready;
5use pin_project_lite::pin_project;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use tower_service::Service;
10
11pin_project! {
12    #[derive(Debug)]
14    pub struct ResponseFuture<P, S, Request>
15    where
16        P: Policy<Request, S::Response, S::Error>,
17        S: Service<Request>,
18    {
19        request: Option<Request>,
20        #[pin]
21        retry: Retry<P, S>,
22        #[pin]
23        state: State<S::Future, P::Future>,
24    }
25}
26
27pin_project! {
28    #[project = StateProj]
29    #[derive(Debug)]
30    enum State<F, P> {
31        Called {
33            #[pin]
34            future: F
35        },
36        Waiting {
38            #[pin]
39            waiting: P
40        },
41        Retrying,
43    }
44}
45
46impl<P, S, Request> ResponseFuture<P, S, Request>
47where
48    P: Policy<Request, S::Response, S::Error>,
49    S: Service<Request>,
50{
51    pub(crate) fn new(
52        request: Option<Request>,
53        retry: Retry<P, S>,
54        future: S::Future,
55    ) -> ResponseFuture<P, S, Request> {
56        ResponseFuture {
57            request,
58            retry,
59            state: State::Called { future },
60        }
61    }
62}
63
64impl<P, S, Request> Future for ResponseFuture<P, S, Request>
65where
66    P: Policy<Request, S::Response, S::Error>,
67    S: Service<Request>,
68{
69    type Output = Result<S::Response, S::Error>;
70
71    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72        let mut this = self.project();
73
74        loop {
75            match this.state.as_mut().project() {
76                StateProj::Called { future } => {
77                    let mut result = ready!(future.poll(cx));
78                    if let Some(req) = &mut this.request {
79                        match this.retry.policy.retry(req, &mut result) {
80                            Some(waiting) => {
81                                this.state.set(State::Waiting { waiting });
82                            }
83                            None => return Poll::Ready(result),
84                        }
85                    } else {
86                        return Poll::Ready(result);
88                    }
89                }
90                StateProj::Waiting { waiting } => {
91                    ready!(waiting.poll(cx));
92
93                    this.state.set(State::Retrying);
94                }
95                StateProj::Retrying => {
96                    ready!(this.retry.as_mut().project().service.poll_ready(cx))?;
108                    let req = this
109                        .request
110                        .take()
111                        .expect("retrying requires cloned request");
112                    *this.request = this.retry.policy.clone_request(&req);
113                    this.state.set(State::Called {
114                        future: this.retry.as_mut().project().service.call(req),
115                    });
116                }
117            }
118        }
119    }
120}