actix_http/encoding/
encoder.rs

1//! Stream encoders.
2
3use std::{
4    error::Error as StdError,
5    future::Future,
6    io::{self, Write as _},
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use actix_rt::task::{spawn_blocking, JoinHandle};
12use bytes::Bytes;
13use derive_more::Display;
14#[cfg(feature = "compress-gzip")]
15use flate2::write::{GzEncoder, ZlibEncoder};
16use futures_core::ready;
17use pin_project_lite::pin_project;
18use tracing::trace;
19#[cfg(feature = "compress-zstd")]
20use zstd::stream::write::Encoder as ZstdEncoder;
21
22use super::Writer;
23use crate::{
24    body::{self, BodySize, MessageBody},
25    header::{self, ContentEncoding, HeaderValue, CONTENT_ENCODING},
26    ResponseHead, StatusCode,
27};
28
29const MAX_CHUNK_SIZE_ENCODE_IN_PLACE: usize = 1024;
30
31pin_project! {
32    pub struct Encoder<B> {
33        #[pin]
34        body: EncoderBody<B>,
35        encoder: Option<ContentEncoder>,
36        fut: Option<JoinHandle<Result<ContentEncoder, io::Error>>>,
37        eof: bool,
38    }
39}
40
41impl<B: MessageBody> Encoder<B> {
42    fn none() -> Self {
43        Encoder {
44            body: EncoderBody::None {
45                body: body::None::new(),
46            },
47            encoder: None,
48            fut: None,
49            eof: true,
50        }
51    }
52
53    fn empty() -> Self {
54        Encoder {
55            body: EncoderBody::Full { body: Bytes::new() },
56            encoder: None,
57            fut: None,
58            eof: true,
59        }
60    }
61
62    pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, body: B) -> Self {
63        // no need to compress empty bodies
64        match body.size() {
65            BodySize::None => return Self::none(),
66            BodySize::Sized(0) => return Self::empty(),
67            _ => {}
68        }
69
70        let should_encode = !(head.headers().contains_key(&CONTENT_ENCODING)
71            || head.status == StatusCode::SWITCHING_PROTOCOLS
72            || head.status == StatusCode::NO_CONTENT
73            || encoding == ContentEncoding::Identity);
74
75        let body = match body.try_into_bytes() {
76            Ok(body) => EncoderBody::Full { body },
77            Err(body) => EncoderBody::Stream { body },
78        };
79
80        if should_encode {
81            // wrap body only if encoder is feature-enabled
82            if let Some(enc) = ContentEncoder::select(encoding) {
83                update_head(encoding, head);
84
85                return Encoder {
86                    body,
87                    encoder: Some(enc),
88                    fut: None,
89                    eof: false,
90                };
91            }
92        }
93
94        Encoder {
95            body,
96            encoder: None,
97            fut: None,
98            eof: false,
99        }
100    }
101}
102
103pin_project! {
104    #[project = EncoderBodyProj]
105    enum EncoderBody<B> {
106        None { body: body::None },
107        Full { body: Bytes },
108        Stream { #[pin] body: B },
109    }
110}
111
112impl<B> MessageBody for EncoderBody<B>
113where
114    B: MessageBody,
115{
116    type Error = EncoderError;
117
118    #[inline]
119    fn size(&self) -> BodySize {
120        match self {
121            EncoderBody::None { body } => body.size(),
122            EncoderBody::Full { body } => body.size(),
123            EncoderBody::Stream { body } => body.size(),
124        }
125    }
126
127    fn poll_next(
128        self: Pin<&mut Self>,
129        cx: &mut Context<'_>,
130    ) -> Poll<Option<Result<Bytes, Self::Error>>> {
131        match self.project() {
132            EncoderBodyProj::None { body } => {
133                Pin::new(body).poll_next(cx).map_err(|err| match err {})
134            }
135            EncoderBodyProj::Full { body } => {
136                Pin::new(body).poll_next(cx).map_err(|err| match err {})
137            }
138            EncoderBodyProj::Stream { body } => body
139                .poll_next(cx)
140                .map_err(|err| EncoderError::Body(err.into())),
141        }
142    }
143
144    #[inline]
145    fn try_into_bytes(self) -> Result<Bytes, Self>
146    where
147        Self: Sized,
148    {
149        match self {
150            EncoderBody::None { body } => Ok(body.try_into_bytes().unwrap()),
151            EncoderBody::Full { body } => Ok(body.try_into_bytes().unwrap()),
152            _ => Err(self),
153        }
154    }
155}
156
157impl<B> MessageBody for Encoder<B>
158where
159    B: MessageBody,
160{
161    type Error = EncoderError;
162
163    #[inline]
164    fn size(&self) -> BodySize {
165        if self.encoder.is_some() {
166            BodySize::Stream
167        } else {
168            self.body.size()
169        }
170    }
171
172    fn poll_next(
173        self: Pin<&mut Self>,
174        cx: &mut Context<'_>,
175    ) -> Poll<Option<Result<Bytes, Self::Error>>> {
176        let mut this = self.project();
177
178        loop {
179            if *this.eof {
180                return Poll::Ready(None);
181            }
182
183            if let Some(ref mut fut) = this.fut {
184                let mut encoder = ready!(Pin::new(fut).poll(cx))
185                    .map_err(|_| {
186                        EncoderError::Io(io::Error::other(
187                            "Blocking task was cancelled unexpectedly",
188                        ))
189                    })?
190                    .map_err(EncoderError::Io)?;
191
192                let chunk = encoder.take();
193                *this.encoder = Some(encoder);
194                this.fut.take();
195
196                if !chunk.is_empty() {
197                    return Poll::Ready(Some(Ok(chunk)));
198                }
199            }
200
201            let result = ready!(this.body.as_mut().poll_next(cx));
202
203            match result {
204                Some(Err(err)) => return Poll::Ready(Some(Err(err))),
205
206                Some(Ok(chunk)) => {
207                    if let Some(mut encoder) = this.encoder.take() {
208                        if chunk.len() < MAX_CHUNK_SIZE_ENCODE_IN_PLACE {
209                            encoder.write(&chunk).map_err(EncoderError::Io)?;
210                            let chunk = encoder.take();
211                            *this.encoder = Some(encoder);
212
213                            if !chunk.is_empty() {
214                                return Poll::Ready(Some(Ok(chunk)));
215                            }
216                        } else {
217                            *this.fut = Some(spawn_blocking(move || {
218                                encoder.write(&chunk)?;
219                                Ok(encoder)
220                            }));
221                        }
222                    } else {
223                        return Poll::Ready(Some(Ok(chunk)));
224                    }
225                }
226
227                None => {
228                    if let Some(encoder) = this.encoder.take() {
229                        let chunk = encoder.finish().map_err(EncoderError::Io)?;
230
231                        if chunk.is_empty() {
232                            return Poll::Ready(None);
233                        } else {
234                            *this.eof = true;
235                            return Poll::Ready(Some(Ok(chunk)));
236                        }
237                    } else {
238                        return Poll::Ready(None);
239                    }
240                }
241            }
242        }
243    }
244
245    #[inline]
246    fn try_into_bytes(mut self) -> Result<Bytes, Self>
247    where
248        Self: Sized,
249    {
250        if self.encoder.is_some() {
251            Err(self)
252        } else {
253            match self.body.try_into_bytes() {
254                Ok(body) => Ok(body),
255                Err(body) => {
256                    self.body = body;
257                    Err(self)
258                }
259            }
260        }
261    }
262}
263
264fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) {
265    head.headers_mut()
266        .insert(header::CONTENT_ENCODING, encoding.to_header_value());
267    head.headers_mut()
268        .append(header::VARY, HeaderValue::from_static("accept-encoding"));
269
270    head.no_chunking(false);
271}
272
273enum ContentEncoder {
274    #[cfg(feature = "compress-gzip")]
275    Deflate(ZlibEncoder<Writer>),
276
277    #[cfg(feature = "compress-gzip")]
278    Gzip(GzEncoder<Writer>),
279
280    #[cfg(feature = "compress-brotli")]
281    Brotli(Box<brotli::CompressorWriter<Writer>>),
282
283    // Wwe need explicit 'static lifetime here because ZstdEncoder needs a lifetime argument and we
284    // use `spawn_blocking` in `Encoder::poll_next` that requires `FnOnce() -> R + Send + 'static`.
285    #[cfg(feature = "compress-zstd")]
286    Zstd(ZstdEncoder<'static, Writer>),
287}
288
289impl ContentEncoder {
290    fn select(encoding: ContentEncoding) -> Option<Self> {
291        match encoding {
292            #[cfg(feature = "compress-gzip")]
293            ContentEncoding::Deflate => Some(ContentEncoder::Deflate(ZlibEncoder::new(
294                Writer::new(),
295                flate2::Compression::fast(),
296            ))),
297
298            #[cfg(feature = "compress-gzip")]
299            ContentEncoding::Gzip => Some(ContentEncoder::Gzip(GzEncoder::new(
300                Writer::new(),
301                flate2::Compression::fast(),
302            ))),
303
304            #[cfg(feature = "compress-brotli")]
305            ContentEncoding::Brotli => Some(ContentEncoder::Brotli(new_brotli_compressor())),
306
307            #[cfg(feature = "compress-zstd")]
308            ContentEncoding::Zstd => {
309                let encoder = ZstdEncoder::new(Writer::new(), 3).ok()?;
310                Some(ContentEncoder::Zstd(encoder))
311            }
312
313            _ => None,
314        }
315    }
316
317    #[inline]
318    pub(crate) fn take(&mut self) -> Bytes {
319        match *self {
320            #[cfg(feature = "compress-brotli")]
321            ContentEncoder::Brotli(ref mut encoder) => encoder.get_mut().take(),
322
323            #[cfg(feature = "compress-gzip")]
324            ContentEncoder::Deflate(ref mut encoder) => encoder.get_mut().take(),
325
326            #[cfg(feature = "compress-gzip")]
327            ContentEncoder::Gzip(ref mut encoder) => encoder.get_mut().take(),
328
329            #[cfg(feature = "compress-zstd")]
330            ContentEncoder::Zstd(ref mut encoder) => encoder.get_mut().take(),
331        }
332    }
333
334    fn finish(self) -> Result<Bytes, io::Error> {
335        match self {
336            #[cfg(feature = "compress-brotli")]
337            ContentEncoder::Brotli(mut encoder) => match encoder.flush() {
338                Ok(()) => Ok(encoder.into_inner().buf.freeze()),
339                Err(err) => Err(err),
340            },
341
342            #[cfg(feature = "compress-gzip")]
343            ContentEncoder::Gzip(encoder) => match encoder.finish() {
344                Ok(writer) => Ok(writer.buf.freeze()),
345                Err(err) => Err(err),
346            },
347
348            #[cfg(feature = "compress-gzip")]
349            ContentEncoder::Deflate(encoder) => match encoder.finish() {
350                Ok(writer) => Ok(writer.buf.freeze()),
351                Err(err) => Err(err),
352            },
353
354            #[cfg(feature = "compress-zstd")]
355            ContentEncoder::Zstd(encoder) => match encoder.finish() {
356                Ok(writer) => Ok(writer.buf.freeze()),
357                Err(err) => Err(err),
358            },
359        }
360    }
361
362    fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
363        match *self {
364            #[cfg(feature = "compress-brotli")]
365            ContentEncoder::Brotli(ref mut encoder) => match encoder.write_all(data) {
366                Ok(_) => Ok(()),
367                Err(err) => {
368                    trace!("Error decoding br encoding: {}", err);
369                    Err(err)
370                }
371            },
372
373            #[cfg(feature = "compress-gzip")]
374            ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) {
375                Ok(_) => Ok(()),
376                Err(err) => {
377                    trace!("Error decoding gzip encoding: {}", err);
378                    Err(err)
379                }
380            },
381
382            #[cfg(feature = "compress-gzip")]
383            ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) {
384                Ok(_) => Ok(()),
385                Err(err) => {
386                    trace!("Error decoding deflate encoding: {}", err);
387                    Err(err)
388                }
389            },
390
391            #[cfg(feature = "compress-zstd")]
392            ContentEncoder::Zstd(ref mut encoder) => match encoder.write_all(data) {
393                Ok(_) => Ok(()),
394                Err(err) => {
395                    trace!("Error decoding ztsd encoding: {}", err);
396                    Err(err)
397                }
398            },
399        }
400    }
401}
402
403#[cfg(feature = "compress-brotli")]
404fn new_brotli_compressor() -> Box<brotli::CompressorWriter<Writer>> {
405    Box::new(brotli::CompressorWriter::new(
406        Writer::new(),
407        32 * 1024, // 32 KiB buffer
408        3,         // BROTLI_PARAM_QUALITY
409        22,        // BROTLI_PARAM_LGWIN
410    ))
411}
412
413#[derive(Debug, Display)]
414#[non_exhaustive]
415pub enum EncoderError {
416    /// Wrapped body stream error.
417    #[display("body")]
418    Body(Box<dyn StdError>),
419
420    /// Generic I/O error.
421    #[display("io")]
422    Io(io::Error),
423}
424
425impl StdError for EncoderError {
426    fn source(&self) -> Option<&(dyn StdError + 'static)> {
427        match self {
428            EncoderError::Body(err) => Some(&**err),
429            EncoderError::Io(err) => Some(err),
430        }
431    }
432}
433
434impl From<EncoderError> for crate::Error {
435    fn from(err: EncoderError) -> Self {
436        crate::Error::new_encoder().with_cause(err)
437    }
438}