1use std::{
4 error::Error as StdError,
5 future::Future,
6 io::{self, Write as _},
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use actix_rt::task::{spawn_blocking, JoinHandle};
12use bytes::Bytes;
13use derive_more::Display;
14#[cfg(feature = "compress-gzip")]
15use flate2::write::{GzEncoder, ZlibEncoder};
16use futures_core::ready;
17use pin_project_lite::pin_project;
18use tracing::trace;
19#[cfg(feature = "compress-zstd")]
20use zstd::stream::write::Encoder as ZstdEncoder;
21
22use super::Writer;
23use crate::{
24 body::{self, BodySize, MessageBody},
25 header::{self, ContentEncoding, HeaderValue, CONTENT_ENCODING},
26 ResponseHead, StatusCode,
27};
28
29const MAX_CHUNK_SIZE_ENCODE_IN_PLACE: usize = 1024;
30
31pin_project! {
32 pub struct Encoder<B> {
33 #[pin]
34 body: EncoderBody<B>,
35 encoder: Option<ContentEncoder>,
36 fut: Option<JoinHandle<Result<ContentEncoder, io::Error>>>,
37 eof: bool,
38 }
39}
40
41impl<B: MessageBody> Encoder<B> {
42 fn none() -> Self {
43 Encoder {
44 body: EncoderBody::None {
45 body: body::None::new(),
46 },
47 encoder: None,
48 fut: None,
49 eof: true,
50 }
51 }
52
53 fn empty() -> Self {
54 Encoder {
55 body: EncoderBody::Full { body: Bytes::new() },
56 encoder: None,
57 fut: None,
58 eof: true,
59 }
60 }
61
62 pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, body: B) -> Self {
63 match body.size() {
65 BodySize::None => return Self::none(),
66 BodySize::Sized(0) => return Self::empty(),
67 _ => {}
68 }
69
70 let should_encode = !(head.headers().contains_key(&CONTENT_ENCODING)
71 || head.status == StatusCode::SWITCHING_PROTOCOLS
72 || head.status == StatusCode::NO_CONTENT
73 || encoding == ContentEncoding::Identity);
74
75 let body = match body.try_into_bytes() {
76 Ok(body) => EncoderBody::Full { body },
77 Err(body) => EncoderBody::Stream { body },
78 };
79
80 if should_encode {
81 if let Some(enc) = ContentEncoder::select(encoding) {
83 update_head(encoding, head);
84
85 return Encoder {
86 body,
87 encoder: Some(enc),
88 fut: None,
89 eof: false,
90 };
91 }
92 }
93
94 Encoder {
95 body,
96 encoder: None,
97 fut: None,
98 eof: false,
99 }
100 }
101}
102
103pin_project! {
104 #[project = EncoderBodyProj]
105 enum EncoderBody<B> {
106 None { body: body::None },
107 Full { body: Bytes },
108 Stream { #[pin] body: B },
109 }
110}
111
112impl<B> MessageBody for EncoderBody<B>
113where
114 B: MessageBody,
115{
116 type Error = EncoderError;
117
118 #[inline]
119 fn size(&self) -> BodySize {
120 match self {
121 EncoderBody::None { body } => body.size(),
122 EncoderBody::Full { body } => body.size(),
123 EncoderBody::Stream { body } => body.size(),
124 }
125 }
126
127 fn poll_next(
128 self: Pin<&mut Self>,
129 cx: &mut Context<'_>,
130 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
131 match self.project() {
132 EncoderBodyProj::None { body } => {
133 Pin::new(body).poll_next(cx).map_err(|err| match err {})
134 }
135 EncoderBodyProj::Full { body } => {
136 Pin::new(body).poll_next(cx).map_err(|err| match err {})
137 }
138 EncoderBodyProj::Stream { body } => body
139 .poll_next(cx)
140 .map_err(|err| EncoderError::Body(err.into())),
141 }
142 }
143
144 #[inline]
145 fn try_into_bytes(self) -> Result<Bytes, Self>
146 where
147 Self: Sized,
148 {
149 match self {
150 EncoderBody::None { body } => Ok(body.try_into_bytes().unwrap()),
151 EncoderBody::Full { body } => Ok(body.try_into_bytes().unwrap()),
152 _ => Err(self),
153 }
154 }
155}
156
157impl<B> MessageBody for Encoder<B>
158where
159 B: MessageBody,
160{
161 type Error = EncoderError;
162
163 #[inline]
164 fn size(&self) -> BodySize {
165 if self.encoder.is_some() {
166 BodySize::Stream
167 } else {
168 self.body.size()
169 }
170 }
171
172 fn poll_next(
173 self: Pin<&mut Self>,
174 cx: &mut Context<'_>,
175 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
176 let mut this = self.project();
177
178 loop {
179 if *this.eof {
180 return Poll::Ready(None);
181 }
182
183 if let Some(ref mut fut) = this.fut {
184 let mut encoder = ready!(Pin::new(fut).poll(cx))
185 .map_err(|_| {
186 EncoderError::Io(io::Error::other(
187 "Blocking task was cancelled unexpectedly",
188 ))
189 })?
190 .map_err(EncoderError::Io)?;
191
192 let chunk = encoder.take();
193 *this.encoder = Some(encoder);
194 this.fut.take();
195
196 if !chunk.is_empty() {
197 return Poll::Ready(Some(Ok(chunk)));
198 }
199 }
200
201 let result = ready!(this.body.as_mut().poll_next(cx));
202
203 match result {
204 Some(Err(err)) => return Poll::Ready(Some(Err(err))),
205
206 Some(Ok(chunk)) => {
207 if let Some(mut encoder) = this.encoder.take() {
208 if chunk.len() < MAX_CHUNK_SIZE_ENCODE_IN_PLACE {
209 encoder.write(&chunk).map_err(EncoderError::Io)?;
210 let chunk = encoder.take();
211 *this.encoder = Some(encoder);
212
213 if !chunk.is_empty() {
214 return Poll::Ready(Some(Ok(chunk)));
215 }
216 } else {
217 *this.fut = Some(spawn_blocking(move || {
218 encoder.write(&chunk)?;
219 Ok(encoder)
220 }));
221 }
222 } else {
223 return Poll::Ready(Some(Ok(chunk)));
224 }
225 }
226
227 None => {
228 if let Some(encoder) = this.encoder.take() {
229 let chunk = encoder.finish().map_err(EncoderError::Io)?;
230
231 if chunk.is_empty() {
232 return Poll::Ready(None);
233 } else {
234 *this.eof = true;
235 return Poll::Ready(Some(Ok(chunk)));
236 }
237 } else {
238 return Poll::Ready(None);
239 }
240 }
241 }
242 }
243 }
244
245 #[inline]
246 fn try_into_bytes(mut self) -> Result<Bytes, Self>
247 where
248 Self: Sized,
249 {
250 if self.encoder.is_some() {
251 Err(self)
252 } else {
253 match self.body.try_into_bytes() {
254 Ok(body) => Ok(body),
255 Err(body) => {
256 self.body = body;
257 Err(self)
258 }
259 }
260 }
261 }
262}
263
264fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) {
265 head.headers_mut()
266 .insert(header::CONTENT_ENCODING, encoding.to_header_value());
267 head.headers_mut()
268 .append(header::VARY, HeaderValue::from_static("accept-encoding"));
269
270 head.no_chunking(false);
271}
272
273enum ContentEncoder {
274 #[cfg(feature = "compress-gzip")]
275 Deflate(ZlibEncoder<Writer>),
276
277 #[cfg(feature = "compress-gzip")]
278 Gzip(GzEncoder<Writer>),
279
280 #[cfg(feature = "compress-brotli")]
281 Brotli(Box<brotli::CompressorWriter<Writer>>),
282
283 #[cfg(feature = "compress-zstd")]
286 Zstd(ZstdEncoder<'static, Writer>),
287}
288
289impl ContentEncoder {
290 fn select(encoding: ContentEncoding) -> Option<Self> {
291 match encoding {
292 #[cfg(feature = "compress-gzip")]
293 ContentEncoding::Deflate => Some(ContentEncoder::Deflate(ZlibEncoder::new(
294 Writer::new(),
295 flate2::Compression::fast(),
296 ))),
297
298 #[cfg(feature = "compress-gzip")]
299 ContentEncoding::Gzip => Some(ContentEncoder::Gzip(GzEncoder::new(
300 Writer::new(),
301 flate2::Compression::fast(),
302 ))),
303
304 #[cfg(feature = "compress-brotli")]
305 ContentEncoding::Brotli => Some(ContentEncoder::Brotli(new_brotli_compressor())),
306
307 #[cfg(feature = "compress-zstd")]
308 ContentEncoding::Zstd => {
309 let encoder = ZstdEncoder::new(Writer::new(), 3).ok()?;
310 Some(ContentEncoder::Zstd(encoder))
311 }
312
313 _ => None,
314 }
315 }
316
317 #[inline]
318 pub(crate) fn take(&mut self) -> Bytes {
319 match *self {
320 #[cfg(feature = "compress-brotli")]
321 ContentEncoder::Brotli(ref mut encoder) => encoder.get_mut().take(),
322
323 #[cfg(feature = "compress-gzip")]
324 ContentEncoder::Deflate(ref mut encoder) => encoder.get_mut().take(),
325
326 #[cfg(feature = "compress-gzip")]
327 ContentEncoder::Gzip(ref mut encoder) => encoder.get_mut().take(),
328
329 #[cfg(feature = "compress-zstd")]
330 ContentEncoder::Zstd(ref mut encoder) => encoder.get_mut().take(),
331 }
332 }
333
334 fn finish(self) -> Result<Bytes, io::Error> {
335 match self {
336 #[cfg(feature = "compress-brotli")]
337 ContentEncoder::Brotli(mut encoder) => match encoder.flush() {
338 Ok(()) => Ok(encoder.into_inner().buf.freeze()),
339 Err(err) => Err(err),
340 },
341
342 #[cfg(feature = "compress-gzip")]
343 ContentEncoder::Gzip(encoder) => match encoder.finish() {
344 Ok(writer) => Ok(writer.buf.freeze()),
345 Err(err) => Err(err),
346 },
347
348 #[cfg(feature = "compress-gzip")]
349 ContentEncoder::Deflate(encoder) => match encoder.finish() {
350 Ok(writer) => Ok(writer.buf.freeze()),
351 Err(err) => Err(err),
352 },
353
354 #[cfg(feature = "compress-zstd")]
355 ContentEncoder::Zstd(encoder) => match encoder.finish() {
356 Ok(writer) => Ok(writer.buf.freeze()),
357 Err(err) => Err(err),
358 },
359 }
360 }
361
362 fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
363 match *self {
364 #[cfg(feature = "compress-brotli")]
365 ContentEncoder::Brotli(ref mut encoder) => match encoder.write_all(data) {
366 Ok(_) => Ok(()),
367 Err(err) => {
368 trace!("Error decoding br encoding: {}", err);
369 Err(err)
370 }
371 },
372
373 #[cfg(feature = "compress-gzip")]
374 ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) {
375 Ok(_) => Ok(()),
376 Err(err) => {
377 trace!("Error decoding gzip encoding: {}", err);
378 Err(err)
379 }
380 },
381
382 #[cfg(feature = "compress-gzip")]
383 ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) {
384 Ok(_) => Ok(()),
385 Err(err) => {
386 trace!("Error decoding deflate encoding: {}", err);
387 Err(err)
388 }
389 },
390
391 #[cfg(feature = "compress-zstd")]
392 ContentEncoder::Zstd(ref mut encoder) => match encoder.write_all(data) {
393 Ok(_) => Ok(()),
394 Err(err) => {
395 trace!("Error decoding ztsd encoding: {}", err);
396 Err(err)
397 }
398 },
399 }
400 }
401}
402
403#[cfg(feature = "compress-brotli")]
404fn new_brotli_compressor() -> Box<brotli::CompressorWriter<Writer>> {
405 Box::new(brotli::CompressorWriter::new(
406 Writer::new(),
407 32 * 1024, 3, 22, ))
411}
412
413#[derive(Debug, Display)]
414#[non_exhaustive]
415pub enum EncoderError {
416 #[display("body")]
418 Body(Box<dyn StdError>),
419
420 #[display("io")]
422 Io(io::Error),
423}
424
425impl StdError for EncoderError {
426 fn source(&self) -> Option<&(dyn StdError + 'static)> {
427 match self {
428 EncoderError::Body(err) => Some(&**err),
429 EncoderError::Io(err) => Some(err),
430 }
431 }
432}
433
434impl From<EncoderError> for crate::Error {
435 fn from(err: EncoderError) -> Self {
436 crate::Error::new_encoder().with_cause(err)
437 }
438}