azure_core/policies/retry_policies/
retry_policy.rs1use crate::{
2 date,
3 error::{Error, ErrorKind, HttpError, ResultExt},
4 headers::{Headers, RETRY_AFTER, RETRY_AFTER_MS, X_MS_RETRY_AFTER_MS},
5 policies::{Policy, PolicyResult, Request},
6 sleep::sleep,
7 Context, StatusCode,
8};
9use async_trait::async_trait;
10use std::{sync::Arc, time::Duration};
11use time::OffsetDateTime;
12use tracing::{debug, trace};
13
14fn try_parse_retry_after_http_date(http_date: &str) -> Option<OffsetDateTime> {
17 crate::date::parse_rfc1123(http_date).ok()
18}
19
20type DateTimeFn = fn() -> OffsetDateTime;
22
23pub(crate) fn get_retry_after(headers: &Headers, datetime_now: DateTimeFn) -> Option<Duration> {
33 [RETRY_AFTER_MS, X_MS_RETRY_AFTER_MS, RETRY_AFTER]
34 .iter()
35 .find_map(|header| {
36 headers.get_str(header).ok().and_then(|v| {
37 if header == &RETRY_AFTER {
38 v.parse::<u64>().ok().map(Duration::from_secs).or_else(|| {
40 try_parse_retry_after_http_date(v).map(|retry_after_datetime| {
41 let now = datetime_now();
42 if retry_after_datetime < now {
43 Duration::from_secs(0)
44 } else {
45 date::diff(retry_after_datetime, now)
46 }
47 })
48 })
49 } else {
50 v.parse::<u64>().ok().map(Duration::from_millis)
52 }
53 })
54 })
55}
56
57#[async_trait]
66pub trait RetryPolicy: std::fmt::Debug + Send + Sync {
67 fn is_expired(&self, duration_since_start: Duration, retry_count: u32) -> bool;
71 fn sleep_duration(&self, retry_count: u32) -> Duration;
73 async fn wait(&self, _error: &Error, retry_count: u32, retry_after: Option<Duration>) {
77 let policy_sleep_duration = self.sleep_duration(retry_count);
78 let sleep_duration = retry_after.map_or(policy_sleep_duration, |retry_after| {
80 std::cmp::max(retry_after, policy_sleep_duration)
81 });
82 sleep(sleep_duration).await;
83 }
84}
85
86const RETRY_STATUSES: &[StatusCode] = &[
90 StatusCode::RequestTimeout,
91 StatusCode::TooManyRequests,
92 StatusCode::InternalServerError,
93 StatusCode::BadGateway,
94 StatusCode::ServiceUnavailable,
95 StatusCode::GatewayTimeout,
96];
97
98#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
99#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
100impl<T> Policy for T
101where
102 T: RetryPolicy,
103{
104 async fn send(
105 &self,
106 ctx: &Context,
107 request: &mut Request,
108 next: &[Arc<dyn Policy>],
109 ) -> PolicyResult {
110 let mut retry_count = 0;
111 let mut start = None;
112
113 loop {
114 if retry_count > 0 {
115 request.body.reset().await.context(
116 ErrorKind::Other,
117 "failed to reset body stream before retrying request",
118 )?;
119 }
120 let result = next[0].send(ctx, request, &next[1..]).await;
121 let start = start.get_or_insert_with(OffsetDateTime::now_utc);
123 let (last_error, retry_after) = match result {
124 Ok(response) if response.status().is_success() => {
125 trace!(
126 "Successful response. Request={:?} response={:?}",
127 request,
128 response
129 );
130 return Ok(response);
131 }
132 Ok(response) => {
133 let status = response.status();
135
136 let retry_after = match status {
140 StatusCode::TooManyRequests | StatusCode::ServiceUnavailable => {
141 get_retry_after(response.headers(), OffsetDateTime::now_utc)
142 }
143 _ => None,
144 };
145
146 let http_error = HttpError::new(response).await;
147
148 let error_kind = ErrorKind::http_response(
149 status,
150 http_error.error_code().map(std::borrow::ToOwned::to_owned),
151 );
152
153 if !RETRY_STATUSES.contains(&status) {
154 debug!(
155 "server returned error status which will not be retried: {}",
156 status
157 );
158 let error = Error::full(
160 error_kind,
161 http_error,
162 format!(
163 "server returned error status which will not be retried: {status}"
164 ),
165 );
166 return Err(error);
167 }
168 debug!(
169 "server returned error status which requires retry: {}",
170 status
171 );
172 (Error::new(error_kind, http_error), retry_after)
173 }
174 Err(error) => {
175 if error.kind() == &ErrorKind::Io {
176 debug!(
177 "io error occurred when making request which will be retried: {}",
178 error
179 );
180 let retry_after = None;
182 (error, retry_after)
183 } else {
184 return Err(
185 error.context("non-io error occurred which will not be retried")
186 );
187 }
188 }
189 };
190
191 let time_since_start = (OffsetDateTime::now_utc() - *start)
192 .try_into()
193 .unwrap_or_default();
194 if self.is_expired(time_since_start, retry_count) {
195 return Err(last_error
196 .context("retry policy expired and the request will no longer be retried"));
197 }
198 retry_count += 1;
199
200 self.wait(&last_error, retry_count, retry_after).await;
201 }
202 }
203}
204
205#[cfg(test)]
207mod test {
208 use super::*;
209 use time::macros::datetime;
210
211 fn datetime_now() -> OffsetDateTime {
213 datetime!(2021-01-01 0:00:00 UTC)
214 }
215
216 #[test]
217 fn test_try_parse_retry_after_http_date() {
218 let retry_after = try_parse_retry_after_http_date("Fri, 01 Jan 2021 00:00:00 GMT");
220 assert_eq!(retry_after, Some(datetime!(2021-01-01 0:00:00 UTC)));
221
222 let retry_after = try_parse_retry_after_http_date("01 Jan 2021 00:00:00 GMT");
224 assert_eq!(retry_after, None);
225
226 let retry_after = try_parse_retry_after_http_date("invalid");
228 assert_eq!(retry_after, None);
229
230 let retry_after = try_parse_retry_after_http_date("123");
232 assert_eq!(retry_after, None);
233 }
234
235 #[test]
236 fn test_get_retry_after() {
237 let mut headers = Headers::new();
239 headers.insert(RETRY_AFTER, "Fri, 01 Jan 2021 00:00:10 GMT");
240 let retry_after = get_retry_after(&headers, datetime_now);
241 assert_eq!(retry_after, Some(Duration::from_secs(10)));
242
243 let mut headers = Headers::new();
245 headers.insert(RETRY_AFTER, "Thu, 31 Dec 2020 23:59:50 GMT");
246 let retry_after = get_retry_after(&headers, datetime_now);
247 assert_eq!(retry_after, Some(Duration::from_secs(0)));
248
249 let headers = Headers::new();
251 let retry_after = get_retry_after(&headers, datetime_now);
252 assert_eq!(retry_after, None);
253
254 let mut headers = Headers::new();
256 headers.insert(RETRY_AFTER, "invalid");
257 let retry_after = get_retry_after(&headers, datetime_now);
258 assert_eq!(retry_after, None);
259
260 let mut headers = Headers::new();
262 headers.insert(RETRY_AFTER, "123");
263 let retry_after = get_retry_after(&headers, datetime_now);
264 assert_eq!(retry_after, Some(Duration::from_secs(123)));
265
266 let mut headers = Headers::new();
268 headers.insert(RETRY_AFTER_MS, "123");
269 let retry_after = get_retry_after(&headers, datetime_now);
270 assert_eq!(retry_after, Some(Duration::from_millis(123)));
271
272 let mut headers = Headers::new();
274 headers.insert(RETRY_AFTER_MS, "123");
275 let retry_after = get_retry_after(&headers, datetime_now);
276 assert_eq!(retry_after, Some(Duration::from_millis(123)));
277
278 let mut headers = Headers::new();
280 headers.insert(X_MS_RETRY_AFTER_MS, "123");
281 let retry_after = get_retry_after(&headers, datetime_now);
282 assert_eq!(retry_after, Some(Duration::from_millis(123)));
283
284 let mut headers = Headers::new();
286 headers.insert(RETRY_AFTER_MS, "123");
287 headers.insert(RETRY_AFTER, "456");
288 let retry_after = get_retry_after(&headers, datetime_now);
289 assert_eq!(retry_after, Some(Duration::from_millis(123)));
290
291 let mut headers = Headers::new();
293 headers.insert(X_MS_RETRY_AFTER_MS, "123");
294 headers.insert(RETRY_AFTER, "456");
295 let retry_after = get_retry_after(&headers, datetime_now);
296 assert_eq!(retry_after, Some(Duration::from_millis(123)));
297
298 let mut headers = Headers::new();
300 headers.insert(X_MS_RETRY_AFTER_MS, "123");
301 headers.insert(RETRY_AFTER_MS, "456");
302 let retry_after = get_retry_after(&headers, datetime_now);
303 assert_eq!(retry_after, Some(Duration::from_millis(456)));
304 }
305}