1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{ready, Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use http_body::Body as HttpBody;
9use http_body_util::combinators::BoxBody;
10use pin_project_lite::pin_project;
11#[cfg(feature = "stream")]
12use tokio::fs::File;
13use tokio::time::Sleep;
14#[cfg(feature = "stream")]
15use tokio_util::io::ReaderStream;
16
17pub struct Body {
19 inner: Inner,
20}
21
22enum Inner {
23 Reusable(Bytes),
24 Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>),
25}
26
27pin_project! {
28 pub(crate) struct TotalTimeoutBody<B> {
33 #[pin]
34 inner: B,
35 timeout: Pin<Box<Sleep>>,
36 }
37}
38
39pin_project! {
40 pub(crate) struct ReadTimeoutBody<B> {
41 #[pin]
42 inner: B,
43 #[pin]
44 sleep: Option<Sleep>,
45 timeout: Duration,
46 }
47}
48
49#[cfg(any(feature = "stream", feature = "multipart",))]
51pub(crate) struct DataStream<B>(pub(crate) B);
52
53impl Body {
54 pub fn as_bytes(&self) -> Option<&[u8]> {
58 match &self.inner {
59 Inner::Reusable(bytes) => Some(bytes.as_ref()),
60 Inner::Streaming(..) => None,
61 }
62 }
63
64 #[cfg(feature = "stream")]
88 #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
89 pub fn wrap_stream<S>(stream: S) -> Body
90 where
91 S: futures_core::stream::TryStream + Send + 'static,
92 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
93 Bytes: From<S::Ok>,
94 {
95 Body::stream(stream)
96 }
97
98 #[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))]
99 pub(crate) fn stream<S>(stream: S) -> Body
100 where
101 S: futures_core::stream::TryStream + Send + 'static,
102 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
103 Bytes: From<S::Ok>,
104 {
105 use futures_util::TryStreamExt;
106 use http_body::Frame;
107 use http_body_util::StreamBody;
108
109 let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
110 stream
111 .map_ok(|d| Frame::data(Bytes::from(d)))
112 .map_err(Into::into),
113 )));
114 Body {
115 inner: Inner::Streaming(body),
116 }
117 }
118
119 pub(crate) fn empty() -> Body {
120 Body::reusable(Bytes::new())
121 }
122
123 pub(crate) fn reusable(chunk: Bytes) -> Body {
124 Body {
125 inner: Inner::Reusable(chunk),
126 }
127 }
128
129 pub fn wrap<B>(inner: B) -> Body
143 where
144 B: HttpBody + Send + Sync + 'static,
145 B::Data: Into<Bytes>,
146 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
147 {
148 use http_body_util::BodyExt;
149
150 let boxed = IntoBytesBody { inner }.map_err(Into::into).boxed();
151
152 Body {
153 inner: Inner::Streaming(boxed),
154 }
155 }
156
157 pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
158 let reuse = match self.inner {
159 Inner::Reusable(ref chunk) => Some(chunk.clone()),
160 Inner::Streaming { .. } => None,
161 };
162
163 (reuse, self)
164 }
165
166 pub(crate) fn try_clone(&self) -> Option<Body> {
167 match self.inner {
168 Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
169 Inner::Streaming { .. } => None,
170 }
171 }
172
173 #[cfg(feature = "multipart")]
174 pub(crate) fn into_stream(self) -> DataStream<Body> {
175 DataStream(self)
176 }
177
178 #[cfg(feature = "multipart")]
179 pub(crate) fn content_length(&self) -> Option<u64> {
180 match self.inner {
181 Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
182 Inner::Streaming(ref body) => body.size_hint().exact(),
183 }
184 }
185}
186
187impl Default for Body {
188 #[inline]
189 fn default() -> Body {
190 Body::empty()
191 }
192}
193
194impl From<Bytes> for Body {
208 #[inline]
209 fn from(bytes: Bytes) -> Body {
210 Body::reusable(bytes)
211 }
212}
213
214impl From<Vec<u8>> for Body {
215 #[inline]
216 fn from(vec: Vec<u8>) -> Body {
217 Body::reusable(vec.into())
218 }
219}
220
221impl From<&'static [u8]> for Body {
222 #[inline]
223 fn from(s: &'static [u8]) -> Body {
224 Body::reusable(Bytes::from_static(s))
225 }
226}
227
228impl From<String> for Body {
229 #[inline]
230 fn from(s: String) -> Body {
231 Body::reusable(s.into())
232 }
233}
234
235impl From<&'static str> for Body {
236 #[inline]
237 fn from(s: &'static str) -> Body {
238 s.as_bytes().into()
239 }
240}
241
242#[cfg(feature = "stream")]
243#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
244impl From<File> for Body {
245 #[inline]
246 fn from(file: File) -> Body {
247 Body::wrap_stream(ReaderStream::new(file))
248 }
249}
250
251impl fmt::Debug for Body {
252 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
253 f.debug_struct("Body").finish()
254 }
255}
256
257impl HttpBody for Body {
258 type Data = Bytes;
259 type Error = crate::Error;
260
261 fn poll_frame(
262 mut self: Pin<&mut Self>,
263 cx: &mut Context,
264 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
265 match self.inner {
266 Inner::Reusable(ref mut bytes) => {
267 let out = bytes.split_off(0);
268 if out.is_empty() {
269 Poll::Ready(None)
270 } else {
271 Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
272 }
273 }
274 Inner::Streaming(ref mut body) => Poll::Ready(
275 ready!(Pin::new(body).poll_frame(cx))
276 .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
277 ),
278 }
279 }
280
281 fn size_hint(&self) -> http_body::SizeHint {
282 match self.inner {
283 Inner::Reusable(ref bytes) => http_body::SizeHint::with_exact(bytes.len() as u64),
284 Inner::Streaming(ref body) => body.size_hint(),
285 }
286 }
287
288 fn is_end_stream(&self) -> bool {
289 match self.inner {
290 Inner::Reusable(ref bytes) => bytes.is_empty(),
291 Inner::Streaming(ref body) => body.is_end_stream(),
292 }
293 }
294}
295
296pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> {
299 TotalTimeoutBody {
300 inner: body,
301 timeout,
302 }
303}
304
305pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> {
306 ReadTimeoutBody {
307 inner: body,
308 sleep: None,
309 timeout,
310 }
311}
312
313impl<B> hyper::body::Body for TotalTimeoutBody<B>
314where
315 B: hyper::body::Body,
316 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
317{
318 type Data = B::Data;
319 type Error = crate::Error;
320
321 fn poll_frame(
322 self: Pin<&mut Self>,
323 cx: &mut Context,
324 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
325 let this = self.project();
326 if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
327 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
328 }
329 Poll::Ready(
330 ready!(this.inner.poll_frame(cx))
331 .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
332 )
333 }
334
335 #[inline]
336 fn size_hint(&self) -> http_body::SizeHint {
337 self.inner.size_hint()
338 }
339
340 #[inline]
341 fn is_end_stream(&self) -> bool {
342 self.inner.is_end_stream()
343 }
344}
345
346impl<B> hyper::body::Body for ReadTimeoutBody<B>
347where
348 B: hyper::body::Body,
349 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
350{
351 type Data = B::Data;
352 type Error = crate::Error;
353
354 fn poll_frame(
355 self: Pin<&mut Self>,
356 cx: &mut Context,
357 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
358 let mut this = self.project();
359
360 let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() {
362 some
363 } else {
364 this.sleep.set(Some(tokio::time::sleep(*this.timeout)));
365 this.sleep.as_mut().as_pin_mut().unwrap()
366 };
367
368 if let Poll::Ready(()) = sleep_pinned.poll(cx) {
370 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
371 }
372
373 let item = ready!(this.inner.poll_frame(cx))
374 .map(|opt_chunk| opt_chunk.map_err(crate::error::body));
375 this.sleep.set(None);
377 Poll::Ready(item)
378 }
379
380 #[inline]
381 fn size_hint(&self) -> http_body::SizeHint {
382 self.inner.size_hint()
383 }
384
385 #[inline]
386 fn is_end_stream(&self) -> bool {
387 self.inner.is_end_stream()
388 }
389}
390
391pub(crate) type ResponseBody =
392 http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
393
394pub(crate) fn boxed<B>(body: B) -> ResponseBody
395where
396 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
397 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
398{
399 use http_body_util::BodyExt;
400
401 body.map_err(box_err).boxed()
402}
403
404pub(crate) fn response<B>(
405 body: B,
406 deadline: Option<Pin<Box<Sleep>>>,
407 read_timeout: Option<Duration>,
408) -> ResponseBody
409where
410 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
411 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
412{
413 use http_body_util::BodyExt;
414
415 match (deadline, read_timeout) {
416 (Some(total), Some(read)) => {
417 let body = with_read_timeout(body, read).map_err(box_err);
418 total_timeout(body, total).map_err(box_err).boxed()
419 }
420 (Some(total), None) => total_timeout(body, total).map_err(box_err).boxed(),
421 (None, Some(read)) => with_read_timeout(body, read).map_err(box_err).boxed(),
422 (None, None) => body.map_err(box_err).boxed(),
423 }
424}
425
426fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
427where
428 E: Into<Box<dyn std::error::Error + Send + Sync>>,
429{
430 err.into()
431}
432
433#[cfg(any(feature = "stream", feature = "multipart",))]
436impl<B> futures_core::Stream for DataStream<B>
437where
438 B: HttpBody<Data = Bytes> + Unpin,
439{
440 type Item = Result<Bytes, B::Error>;
441
442 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
443 loop {
444 return match ready!(Pin::new(&mut self.0).poll_frame(cx)) {
445 Some(Ok(frame)) => {
446 if let Ok(buf) = frame.into_data() {
448 Poll::Ready(Some(Ok(buf)))
449 } else {
450 continue;
451 }
452 }
453 Some(Err(err)) => Poll::Ready(Some(Err(err))),
454 None => Poll::Ready(None),
455 };
456 }
457 }
458}
459
460pin_project! {
463 struct IntoBytesBody<B> {
464 #[pin]
465 inner: B,
466 }
467}
468
469impl<B> hyper::body::Body for IntoBytesBody<B>
472where
473 B: hyper::body::Body,
474 B::Data: Into<Bytes>,
475{
476 type Data = Bytes;
477 type Error = B::Error;
478
479 fn poll_frame(
480 self: Pin<&mut Self>,
481 cx: &mut Context,
482 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
483 match ready!(self.project().inner.poll_frame(cx)) {
484 Some(Ok(f)) => Poll::Ready(Some(Ok(f.map_data(Into::into)))),
485 Some(Err(e)) => Poll::Ready(Some(Err(e))),
486 None => Poll::Ready(None),
487 }
488 }
489
490 #[inline]
491 fn size_hint(&self) -> http_body::SizeHint {
492 self.inner.size_hint()
493 }
494
495 #[inline]
496 fn is_end_stream(&self) -> bool {
497 self.inner.is_end_stream()
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use http_body::Body as _;
504
505 use super::Body;
506
507 #[test]
508 fn test_as_bytes() {
509 let test_data = b"Test body";
510 let body = Body::from(&test_data[..]);
511 assert_eq!(body.as_bytes(), Some(&test_data[..]));
512 }
513
514 #[test]
515 fn body_exact_length() {
516 let empty_body = Body::empty();
517 assert!(empty_body.is_end_stream());
518 assert_eq!(empty_body.size_hint().exact(), Some(0));
519
520 let bytes_body = Body::reusable("abc".into());
521 assert!(!bytes_body.is_end_stream());
522 assert_eq!(bytes_body.size_hint().exact(), Some(3));
523
524 let stream_body = Body::wrap(empty_body);
526 assert!(stream_body.is_end_stream());
527 assert_eq!(stream_body.size_hint().exact(), Some(0));
528 }
529}