actix_http/encoding/
decoder.rs1use std::{
4 future::Future,
5 io::{self, Write as _},
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use actix_rt::task::{spawn_blocking, JoinHandle};
11use bytes::Bytes;
12#[cfg(feature = "compress-gzip")]
13use flate2::write::{GzDecoder, ZlibDecoder};
14use futures_core::{ready, Stream};
15#[cfg(feature = "compress-zstd")]
16use zstd::stream::write::Decoder as ZstdDecoder;
17
18use crate::{
19 encoding::Writer,
20 error::PayloadError,
21 header::{ContentEncoding, HeaderMap, CONTENT_ENCODING},
22};
23
24const MAX_CHUNK_SIZE_DECODE_IN_PLACE: usize = 2049;
25
26pin_project_lite::pin_project! {
27 pub struct Decoder<S> {
28 decoder: Option<ContentDecoder>,
29 #[pin]
30 stream: S,
31 eof: bool,
32 fut: Option<JoinHandle<Result<(Option<Bytes>, ContentDecoder), io::Error>>>,
33 }
34}
35
36impl<S> Decoder<S>
37where
38 S: Stream<Item = Result<Bytes, PayloadError>>,
39{
40 #[inline]
42 pub fn new(stream: S, encoding: ContentEncoding) -> Decoder<S> {
43 let decoder = match encoding {
44 #[cfg(feature = "compress-brotli")]
45 ContentEncoding::Brotli => Some(ContentDecoder::Brotli(Box::new(
46 brotli::DecompressorWriter::new(Writer::new(), 8_096),
47 ))),
48
49 #[cfg(feature = "compress-gzip")]
50 ContentEncoding::Deflate => Some(ContentDecoder::Deflate(Box::new(ZlibDecoder::new(
51 Writer::new(),
52 )))),
53
54 #[cfg(feature = "compress-gzip")]
55 ContentEncoding::Gzip => Some(ContentDecoder::Gzip(Box::new(GzDecoder::new(
56 Writer::new(),
57 )))),
58
59 #[cfg(feature = "compress-zstd")]
60 ContentEncoding::Zstd => Some(ContentDecoder::Zstd(Box::new(
61 ZstdDecoder::new(Writer::new()).expect(
62 "Failed to create zstd decoder. This is a bug. \
63 Please report it to the actix-web repository.",
64 ),
65 ))),
66 _ => None,
67 };
68
69 Decoder {
70 decoder,
71 stream,
72 fut: None,
73 eof: false,
74 }
75 }
76
77 #[inline]
79 pub fn from_headers(stream: S, headers: &HeaderMap) -> Decoder<S> {
80 let encoding = headers
82 .get(&CONTENT_ENCODING)
83 .and_then(|val| val.to_str().ok())
84 .and_then(|x| x.parse().ok())
85 .unwrap_or(ContentEncoding::Identity);
86
87 Self::new(stream, encoding)
88 }
89}
90
91impl<S> Stream for Decoder<S>
92where
93 S: Stream<Item = Result<Bytes, PayloadError>>,
94{
95 type Item = Result<Bytes, PayloadError>;
96
97 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
98 let mut this = self.project();
99
100 loop {
101 if let Some(ref mut fut) = this.fut {
102 let (chunk, decoder) = ready!(Pin::new(fut).poll(cx)).map_err(|_| {
103 PayloadError::Io(io::Error::other("Blocking task was cancelled unexpectedly"))
104 })??;
105
106 *this.decoder = Some(decoder);
107 this.fut.take();
108
109 if let Some(chunk) = chunk {
110 return Poll::Ready(Some(Ok(chunk)));
111 }
112 }
113
114 if *this.eof {
115 return Poll::Ready(None);
116 }
117
118 match ready!(this.stream.as_mut().poll_next(cx)) {
119 Some(Err(err)) => return Poll::Ready(Some(Err(err))),
120
121 Some(Ok(chunk)) => {
122 if let Some(mut decoder) = this.decoder.take() {
123 if chunk.len() < MAX_CHUNK_SIZE_DECODE_IN_PLACE {
124 let chunk = decoder.feed_data(chunk)?;
125 *this.decoder = Some(decoder);
126
127 if let Some(chunk) = chunk {
128 return Poll::Ready(Some(Ok(chunk)));
129 }
130 } else {
131 *this.fut = Some(spawn_blocking(move || {
132 let chunk = decoder.feed_data(chunk)?;
133 Ok((chunk, decoder))
134 }));
135 }
136
137 continue;
138 } else {
139 return Poll::Ready(Some(Ok(chunk)));
140 }
141 }
142
143 None => {
144 *this.eof = true;
145
146 return if let Some(mut decoder) = this.decoder.take() {
147 match decoder.feed_eof() {
148 Ok(Some(res)) => Poll::Ready(Some(Ok(res))),
149 Ok(None) => Poll::Ready(None),
150 Err(err) => Poll::Ready(Some(Err(err.into()))),
151 }
152 } else {
153 Poll::Ready(None)
154 };
155 }
156 }
157 }
158 }
159}
160
161enum ContentDecoder {
162 #[cfg(feature = "compress-gzip")]
163 Deflate(Box<ZlibDecoder<Writer>>),
164
165 #[cfg(feature = "compress-gzip")]
166 Gzip(Box<GzDecoder<Writer>>),
167
168 #[cfg(feature = "compress-brotli")]
169 Brotli(Box<brotli::DecompressorWriter<Writer>>),
170
171 #[cfg(feature = "compress-zstd")]
174 Zstd(Box<ZstdDecoder<'static, Writer>>),
175}
176
177impl ContentDecoder {
178 fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
179 match self {
180 #[cfg(feature = "compress-brotli")]
181 ContentDecoder::Brotli(ref mut decoder) => match decoder.flush() {
182 Ok(()) => {
183 let b = decoder.get_mut().take();
184
185 if !b.is_empty() {
186 Ok(Some(b))
187 } else {
188 Ok(None)
189 }
190 }
191 Err(err) => Err(err),
192 },
193
194 #[cfg(feature = "compress-gzip")]
195 ContentDecoder::Gzip(ref mut decoder) => match decoder.try_finish() {
196 Ok(_) => {
197 let b = decoder.get_mut().take();
198
199 if !b.is_empty() {
200 Ok(Some(b))
201 } else {
202 Ok(None)
203 }
204 }
205 Err(err) => Err(err),
206 },
207
208 #[cfg(feature = "compress-gzip")]
209 ContentDecoder::Deflate(ref mut decoder) => match decoder.try_finish() {
210 Ok(_) => {
211 let b = decoder.get_mut().take();
212 if !b.is_empty() {
213 Ok(Some(b))
214 } else {
215 Ok(None)
216 }
217 }
218 Err(err) => Err(err),
219 },
220
221 #[cfg(feature = "compress-zstd")]
222 ContentDecoder::Zstd(ref mut decoder) => match decoder.flush() {
223 Ok(_) => {
224 let b = decoder.get_mut().take();
225 if !b.is_empty() {
226 Ok(Some(b))
227 } else {
228 Ok(None)
229 }
230 }
231 Err(err) => Err(err),
232 },
233 }
234 }
235
236 fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
237 match self {
238 #[cfg(feature = "compress-brotli")]
239 ContentDecoder::Brotli(ref mut decoder) => match decoder.write_all(&data) {
240 Ok(_) => {
241 decoder.flush()?;
242 let b = decoder.get_mut().take();
243
244 if !b.is_empty() {
245 Ok(Some(b))
246 } else {
247 Ok(None)
248 }
249 }
250 Err(err) => Err(err),
251 },
252
253 #[cfg(feature = "compress-gzip")]
254 ContentDecoder::Gzip(ref mut decoder) => match decoder.write_all(&data) {
255 Ok(_) => {
256 decoder.flush()?;
257 let b = decoder.get_mut().take();
258
259 if !b.is_empty() {
260 Ok(Some(b))
261 } else {
262 Ok(None)
263 }
264 }
265 Err(err) => Err(err),
266 },
267
268 #[cfg(feature = "compress-gzip")]
269 ContentDecoder::Deflate(ref mut decoder) => match decoder.write_all(&data) {
270 Ok(_) => {
271 decoder.flush()?;
272
273 let b = decoder.get_mut().take();
274 if !b.is_empty() {
275 Ok(Some(b))
276 } else {
277 Ok(None)
278 }
279 }
280 Err(err) => Err(err),
281 },
282
283 #[cfg(feature = "compress-zstd")]
284 ContentDecoder::Zstd(ref mut decoder) => match decoder.write_all(&data) {
285 Ok(_) => {
286 decoder.flush()?;
287
288 let b = decoder.get_mut().take();
289 if !b.is_empty() {
290 Ok(Some(b))
291 } else {
292 Ok(None)
293 }
294 }
295 Err(err) => Err(err),
296 },
297 }
298 }
299}