reqwest/async_impl/
body.rs

1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{ready, Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use http_body::Body as HttpBody;
9use http_body_util::combinators::BoxBody;
10use pin_project_lite::pin_project;
11#[cfg(feature = "stream")]
12use tokio::fs::File;
13use tokio::time::Sleep;
14#[cfg(feature = "stream")]
15use tokio_util::io::ReaderStream;
16
17/// An asynchronous request body.
18pub struct Body {
19    inner: Inner,
20}
21
22enum Inner {
23    Reusable(Bytes),
24    Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>),
25}
26
27pin_project! {
28    /// A body with a total timeout.
29    ///
30    /// The timeout does not reset upon each chunk, but rather requires the whole
31    /// body be streamed before the deadline is reached.
32    pub(crate) struct TotalTimeoutBody<B> {
33        #[pin]
34        inner: B,
35        timeout: Pin<Box<Sleep>>,
36    }
37}
38
39pin_project! {
40    pub(crate) struct ReadTimeoutBody<B> {
41        #[pin]
42        inner: B,
43        #[pin]
44        sleep: Option<Sleep>,
45        timeout: Duration,
46    }
47}
48
49/// Converts any `impl Body` into a `impl Stream` of just its DATA frames.
50#[cfg(any(feature = "stream", feature = "multipart",))]
51pub(crate) struct DataStream<B>(pub(crate) B);
52
53impl Body {
54    /// Returns a reference to the internal data of the `Body`.
55    ///
56    /// `None` is returned, if the underlying data is a stream.
57    pub fn as_bytes(&self) -> Option<&[u8]> {
58        match &self.inner {
59            Inner::Reusable(bytes) => Some(bytes.as_ref()),
60            Inner::Streaming(..) => None,
61        }
62    }
63
64    /// Wrap a futures `Stream` in a box inside `Body`.
65    ///
66    /// # Example
67    ///
68    /// ```
69    /// # use reqwest::Body;
70    /// # use futures_util;
71    /// # fn main() {
72    /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![
73    ///     Ok("hello"),
74    ///     Ok(" "),
75    ///     Ok("world"),
76    /// ];
77    ///
78    /// let stream = futures_util::stream::iter(chunks);
79    ///
80    /// let body = Body::wrap_stream(stream);
81    /// # }
82    /// ```
83    ///
84    /// # Optional
85    ///
86    /// This requires the `stream` feature to be enabled.
87    #[cfg(feature = "stream")]
88    #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
89    pub fn wrap_stream<S>(stream: S) -> Body
90    where
91        S: futures_core::stream::TryStream + Send + 'static,
92        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
93        Bytes: From<S::Ok>,
94    {
95        Body::stream(stream)
96    }
97
98    #[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))]
99    pub(crate) fn stream<S>(stream: S) -> Body
100    where
101        S: futures_core::stream::TryStream + Send + 'static,
102        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
103        Bytes: From<S::Ok>,
104    {
105        use futures_util::TryStreamExt;
106        use http_body::Frame;
107        use http_body_util::StreamBody;
108
109        let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
110            stream
111                .map_ok(|d| Frame::data(Bytes::from(d)))
112                .map_err(Into::into),
113        )));
114        Body {
115            inner: Inner::Streaming(body),
116        }
117    }
118
119    pub(crate) fn empty() -> Body {
120        Body::reusable(Bytes::new())
121    }
122
123    pub(crate) fn reusable(chunk: Bytes) -> Body {
124        Body {
125            inner: Inner::Reusable(chunk),
126        }
127    }
128
129    /// Wrap a [`HttpBody`] in a box inside `Body`.
130    ///
131    /// # Example
132    ///
133    /// ```
134    /// # use reqwest::Body;
135    /// # use futures_util;
136    /// # fn main() {
137    /// let content = "hello,world!".to_string();
138    ///
139    /// let body = Body::wrap(content);
140    /// # }
141    /// ```
142    pub fn wrap<B>(inner: B) -> Body
143    where
144        B: HttpBody + Send + Sync + 'static,
145        B::Data: Into<Bytes>,
146        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
147    {
148        use http_body_util::BodyExt;
149
150        let boxed = IntoBytesBody { inner }.map_err(Into::into).boxed();
151
152        Body {
153            inner: Inner::Streaming(boxed),
154        }
155    }
156
157    pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
158        let reuse = match self.inner {
159            Inner::Reusable(ref chunk) => Some(chunk.clone()),
160            Inner::Streaming { .. } => None,
161        };
162
163        (reuse, self)
164    }
165
166    pub(crate) fn try_clone(&self) -> Option<Body> {
167        match self.inner {
168            Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
169            Inner::Streaming { .. } => None,
170        }
171    }
172
173    #[cfg(feature = "multipart")]
174    pub(crate) fn into_stream(self) -> DataStream<Body> {
175        DataStream(self)
176    }
177
178    #[cfg(feature = "multipart")]
179    pub(crate) fn content_length(&self) -> Option<u64> {
180        match self.inner {
181            Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
182            Inner::Streaming(ref body) => body.size_hint().exact(),
183        }
184    }
185}
186
187impl Default for Body {
188    #[inline]
189    fn default() -> Body {
190        Body::empty()
191    }
192}
193
194/*
195impl From<hyper::Body> for Body {
196    #[inline]
197    fn from(body: hyper::Body) -> Body {
198        Self {
199            inner: Inner::Streaming {
200                body: Box::pin(WrapHyper(body)),
201            },
202        }
203    }
204}
205*/
206
207impl From<Bytes> for Body {
208    #[inline]
209    fn from(bytes: Bytes) -> Body {
210        Body::reusable(bytes)
211    }
212}
213
214impl From<Vec<u8>> for Body {
215    #[inline]
216    fn from(vec: Vec<u8>) -> Body {
217        Body::reusable(vec.into())
218    }
219}
220
221impl From<&'static [u8]> for Body {
222    #[inline]
223    fn from(s: &'static [u8]) -> Body {
224        Body::reusable(Bytes::from_static(s))
225    }
226}
227
228impl From<String> for Body {
229    #[inline]
230    fn from(s: String) -> Body {
231        Body::reusable(s.into())
232    }
233}
234
235impl From<&'static str> for Body {
236    #[inline]
237    fn from(s: &'static str) -> Body {
238        s.as_bytes().into()
239    }
240}
241
242#[cfg(feature = "stream")]
243#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
244impl From<File> for Body {
245    #[inline]
246    fn from(file: File) -> Body {
247        Body::wrap_stream(ReaderStream::new(file))
248    }
249}
250
251impl fmt::Debug for Body {
252    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
253        f.debug_struct("Body").finish()
254    }
255}
256
257impl HttpBody for Body {
258    type Data = Bytes;
259    type Error = crate::Error;
260
261    fn poll_frame(
262        mut self: Pin<&mut Self>,
263        cx: &mut Context,
264    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
265        match self.inner {
266            Inner::Reusable(ref mut bytes) => {
267                let out = bytes.split_off(0);
268                if out.is_empty() {
269                    Poll::Ready(None)
270                } else {
271                    Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
272                }
273            }
274            Inner::Streaming(ref mut body) => Poll::Ready(
275                ready!(Pin::new(body).poll_frame(cx))
276                    .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
277            ),
278        }
279    }
280
281    fn size_hint(&self) -> http_body::SizeHint {
282        match self.inner {
283            Inner::Reusable(ref bytes) => http_body::SizeHint::with_exact(bytes.len() as u64),
284            Inner::Streaming(ref body) => body.size_hint(),
285        }
286    }
287
288    fn is_end_stream(&self) -> bool {
289        match self.inner {
290            Inner::Reusable(ref bytes) => bytes.is_empty(),
291            Inner::Streaming(ref body) => body.is_end_stream(),
292        }
293    }
294}
295
296// ===== impl TotalTimeoutBody =====
297
298pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> {
299    TotalTimeoutBody {
300        inner: body,
301        timeout,
302    }
303}
304
305pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> {
306    ReadTimeoutBody {
307        inner: body,
308        sleep: None,
309        timeout,
310    }
311}
312
313impl<B> hyper::body::Body for TotalTimeoutBody<B>
314where
315    B: hyper::body::Body,
316    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
317{
318    type Data = B::Data;
319    type Error = crate::Error;
320
321    fn poll_frame(
322        self: Pin<&mut Self>,
323        cx: &mut Context,
324    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
325        let this = self.project();
326        if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
327            return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
328        }
329        Poll::Ready(
330            ready!(this.inner.poll_frame(cx))
331                .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
332        )
333    }
334
335    #[inline]
336    fn size_hint(&self) -> http_body::SizeHint {
337        self.inner.size_hint()
338    }
339
340    #[inline]
341    fn is_end_stream(&self) -> bool {
342        self.inner.is_end_stream()
343    }
344}
345
346impl<B> hyper::body::Body for ReadTimeoutBody<B>
347where
348    B: hyper::body::Body,
349    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
350{
351    type Data = B::Data;
352    type Error = crate::Error;
353
354    fn poll_frame(
355        self: Pin<&mut Self>,
356        cx: &mut Context,
357    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
358        let mut this = self.project();
359
360        // Start the `Sleep` if not active.
361        let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() {
362            some
363        } else {
364            this.sleep.set(Some(tokio::time::sleep(*this.timeout)));
365            this.sleep.as_mut().as_pin_mut().unwrap()
366        };
367
368        // Error if the timeout has expired.
369        if let Poll::Ready(()) = sleep_pinned.poll(cx) {
370            return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
371        }
372
373        let item = ready!(this.inner.poll_frame(cx))
374            .map(|opt_chunk| opt_chunk.map_err(crate::error::body));
375        // a ready frame means timeout is reset
376        this.sleep.set(None);
377        Poll::Ready(item)
378    }
379
380    #[inline]
381    fn size_hint(&self) -> http_body::SizeHint {
382        self.inner.size_hint()
383    }
384
385    #[inline]
386    fn is_end_stream(&self) -> bool {
387        self.inner.is_end_stream()
388    }
389}
390
391pub(crate) type ResponseBody =
392    http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
393
394pub(crate) fn boxed<B>(body: B) -> ResponseBody
395where
396    B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
397    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
398{
399    use http_body_util::BodyExt;
400
401    body.map_err(box_err).boxed()
402}
403
404pub(crate) fn response<B>(
405    body: B,
406    deadline: Option<Pin<Box<Sleep>>>,
407    read_timeout: Option<Duration>,
408) -> ResponseBody
409where
410    B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
411    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
412{
413    use http_body_util::BodyExt;
414
415    match (deadline, read_timeout) {
416        (Some(total), Some(read)) => {
417            let body = with_read_timeout(body, read).map_err(box_err);
418            total_timeout(body, total).map_err(box_err).boxed()
419        }
420        (Some(total), None) => total_timeout(body, total).map_err(box_err).boxed(),
421        (None, Some(read)) => with_read_timeout(body, read).map_err(box_err).boxed(),
422        (None, None) => body.map_err(box_err).boxed(),
423    }
424}
425
426fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
427where
428    E: Into<Box<dyn std::error::Error + Send + Sync>>,
429{
430    err.into()
431}
432
433// ===== impl DataStream =====
434
435#[cfg(any(feature = "stream", feature = "multipart",))]
436impl<B> futures_core::Stream for DataStream<B>
437where
438    B: HttpBody<Data = Bytes> + Unpin,
439{
440    type Item = Result<Bytes, B::Error>;
441
442    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
443        loop {
444            return match ready!(Pin::new(&mut self.0).poll_frame(cx)) {
445                Some(Ok(frame)) => {
446                    // skip non-data frames
447                    if let Ok(buf) = frame.into_data() {
448                        Poll::Ready(Some(Ok(buf)))
449                    } else {
450                        continue;
451                    }
452                }
453                Some(Err(err)) => Poll::Ready(Some(Err(err))),
454                None => Poll::Ready(None),
455            };
456        }
457    }
458}
459
460// ===== impl IntoBytesBody =====
461
462pin_project! {
463    struct IntoBytesBody<B> {
464        #[pin]
465        inner: B,
466    }
467}
468
469// We can't use `map_frame()` because that loses the hint data (for good reason).
470// But we aren't transforming the data.
471impl<B> hyper::body::Body for IntoBytesBody<B>
472where
473    B: hyper::body::Body,
474    B::Data: Into<Bytes>,
475{
476    type Data = Bytes;
477    type Error = B::Error;
478
479    fn poll_frame(
480        self: Pin<&mut Self>,
481        cx: &mut Context,
482    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
483        match ready!(self.project().inner.poll_frame(cx)) {
484            Some(Ok(f)) => Poll::Ready(Some(Ok(f.map_data(Into::into)))),
485            Some(Err(e)) => Poll::Ready(Some(Err(e))),
486            None => Poll::Ready(None),
487        }
488    }
489
490    #[inline]
491    fn size_hint(&self) -> http_body::SizeHint {
492        self.inner.size_hint()
493    }
494
495    #[inline]
496    fn is_end_stream(&self) -> bool {
497        self.inner.is_end_stream()
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use http_body::Body as _;
504
505    use super::Body;
506
507    #[test]
508    fn test_as_bytes() {
509        let test_data = b"Test body";
510        let body = Body::from(&test_data[..]);
511        assert_eq!(body.as_bytes(), Some(&test_data[..]));
512    }
513
514    #[test]
515    fn body_exact_length() {
516        let empty_body = Body::empty();
517        assert!(empty_body.is_end_stream());
518        assert_eq!(empty_body.size_hint().exact(), Some(0));
519
520        let bytes_body = Body::reusable("abc".into());
521        assert!(!bytes_body.is_end_stream());
522        assert_eq!(bytes_body.size_hint().exact(), Some(3));
523
524        // can delegate even when wrapped
525        let stream_body = Body::wrap(empty_body);
526        assert!(stream_body.is_end_stream());
527        assert_eq!(stream_body.size_hint().exact(), Some(0));
528    }
529}