redis/
parser.rs

1use std::{
2    io::{self, Read},
3    str,
4};
5
6use crate::errors::{ParsingError, RedisError, Repr, ServerError, ServerErrorKind};
7use crate::types::{PushKind, RedisResult, Value, VerbatimFormat};
8
9use combine::{
10    any,
11    error::StreamError,
12    opaque,
13    parser::{
14        byte::{crlf, take_until_bytes},
15        combinator::{any_send_sync_partial_state, AnySendSyncPartialState},
16        range::{recognize, take},
17    },
18    stream::{
19        decoder::{self, Decoder},
20        PointerOffset, RangeStream, StreamErrorFor,
21    },
22    unexpected_any, ParseError, Parser as _,
23};
24
25const MAX_RECURSE_DEPTH: usize = 100;
26
27fn err_parser(line: &str) -> ServerError {
28    let mut pieces = line.splitn(2, ' ');
29    let kind = match pieces.next().unwrap() {
30        "ERR" => ServerErrorKind::ResponseError,
31        "EXECABORT" => ServerErrorKind::ExecAbort,
32        "LOADING" => ServerErrorKind::BusyLoading,
33        "NOSCRIPT" => ServerErrorKind::NoScript,
34        "MOVED" => ServerErrorKind::Moved,
35        "ASK" => ServerErrorKind::Ask,
36        "TRYAGAIN" => ServerErrorKind::TryAgain,
37        "CLUSTERDOWN" => ServerErrorKind::ClusterDown,
38        "CROSSSLOT" => ServerErrorKind::CrossSlot,
39        "MASTERDOWN" => ServerErrorKind::MasterDown,
40        "READONLY" => ServerErrorKind::ReadOnly,
41        "NOTBUSY" => ServerErrorKind::NotBusy,
42        "NOSUB" => ServerErrorKind::NoSub,
43        "NOPERM" => ServerErrorKind::NoPerm,
44        code => {
45            return ServerError(Repr::Extension {
46                code: code.into(),
47                detail: pieces.next().map(|str| str.into()),
48            })
49        }
50    };
51    let detail = pieces.next().map(|str| str.into());
52    ServerError(Repr::Known { kind, detail })
53}
54
55pub fn get_push_kind(kind: String) -> PushKind {
56    match kind.as_str() {
57        "invalidate" => PushKind::Invalidate,
58        "message" => PushKind::Message,
59        "pmessage" => PushKind::PMessage,
60        "smessage" => PushKind::SMessage,
61        "unsubscribe" => PushKind::Unsubscribe,
62        "punsubscribe" => PushKind::PUnsubscribe,
63        "sunsubscribe" => PushKind::SUnsubscribe,
64        "subscribe" => PushKind::Subscribe,
65        "psubscribe" => PushKind::PSubscribe,
66        "ssubscribe" => PushKind::SSubscribe,
67        _ => PushKind::Other(kind),
68    }
69}
70
71fn value<'a, I>(
72    count: Option<usize>,
73) -> impl combine::Parser<I, Output = Value, PartialState = AnySendSyncPartialState>
74where
75    I: RangeStream<Token = u8, Range = &'a [u8]>,
76    I::Error: combine::ParseError<u8, &'a [u8], I::Position>,
77{
78    let count = count.unwrap_or(1);
79
80    opaque!(any_send_sync_partial_state(
81        any()
82            .then_partial(move |&mut b| {
83                if count > MAX_RECURSE_DEPTH {
84                    combine::unexpected_any("Maximum recursion depth exceeded").left()
85                } else {
86                    combine::value(b).right()
87                }
88            })
89            .then_partial(move |&mut b| {
90                let line = || {
91                    recognize(take_until_bytes(&b"\r\n"[..]).with(take(2).map(|_| ()))).and_then(
92                        |line: &[u8]| {
93                            str::from_utf8(&line[..line.len() - 2])
94                                .map_err(StreamErrorFor::<I>::other)
95                        },
96                    )
97                };
98
99                let simple_string = || {
100                    line().map(|line| {
101                        if line == "OK" {
102                            Value::Okay
103                        } else {
104                            Value::SimpleString(line.into())
105                        }
106                    })
107                };
108
109                let int = || {
110                    line().and_then(|line| {
111                        line.trim().parse::<i64>().map_err(|_| {
112                            StreamErrorFor::<I>::message_static_message(
113                                "Expected integer, got garbage",
114                            )
115                        })
116                    })
117                };
118
119                let bulk_string = || {
120                    int().then_partial(move |size| {
121                        if *size < 0 {
122                            combine::produce(|| Value::Nil).left()
123                        } else {
124                            take(*size as usize)
125                                .map(|bs: &[u8]| Value::BulkString(bs.to_vec()))
126                                .skip(crlf())
127                                .right()
128                        }
129                    })
130                };
131                let blob = || {
132                    int().then_partial(move |size| {
133                        take(*size as usize)
134                            .map(|bs: &[u8]| String::from_utf8_lossy(bs).to_string())
135                            .skip(crlf())
136                    })
137                };
138
139                let array = || {
140                    int().then_partial(move |&mut length| {
141                        if length < 0 {
142                            combine::produce(|| Value::Nil).left()
143                        } else {
144                            let length = length as usize;
145                            combine::count_min_max(length, length, value(Some(count + 1)))
146                                .map(Value::Array)
147                                .right()
148                        }
149                    })
150                };
151
152                let error = || line().map(err_parser);
153                let map = || {
154                    int().then_partial(move |&mut kv_length| {
155                        match (kv_length as usize).checked_mul(2) {
156                            Some(length) => {
157                                combine::count_min_max(length, length, value(Some(count + 1)))
158                                    .map(move |result: Vec<Value>| {
159                                        let mut it = result.into_iter();
160                                        let mut x = vec![];
161                                        for _ in 0..kv_length {
162                                            if let (Some(k), Some(v)) = (it.next(), it.next()) {
163                                                x.push((k, v))
164                                            }
165                                        }
166                                        Value::Map(x)
167                                    })
168                                    .left()
169                            }
170                            None => {
171                                unexpected_any("Attribute key-value length is too large").right()
172                            }
173                        }
174                    })
175                };
176                let attribute = || {
177                    int().then_partial(move |&mut kv_length| {
178                        match (kv_length as usize).checked_mul(2) {
179                            Some(length) => {
180                                // + 1 is for data!
181                                let length = length + 1;
182                                combine::count_min_max(length, length, value(Some(count + 1)))
183                                    .map(move |result: Vec<Value>| {
184                                        let mut it = result.into_iter();
185                                        let mut attributes = vec![];
186                                        for _ in 0..kv_length {
187                                            if let (Some(k), Some(v)) = (it.next(), it.next()) {
188                                                attributes.push((k, v))
189                                            }
190                                        }
191                                        Value::Attribute {
192                                            data: Box::new(it.next().unwrap()),
193                                            attributes,
194                                        }
195                                    })
196                                    .left()
197                            }
198                            None => {
199                                unexpected_any("Attribute key-value length is too large").right()
200                            }
201                        }
202                    })
203                };
204                let set = || {
205                    int().then_partial(move |&mut length| {
206                        if length < 0 {
207                            combine::produce(|| Value::Nil).left()
208                        } else {
209                            let length = length as usize;
210                            combine::count_min_max(length, length, value(Some(count + 1)))
211                                .map(Value::Set)
212                                .right()
213                        }
214                    })
215                };
216                let push = || {
217                    int().then_partial(move |&mut length| {
218                        if length <= 0 {
219                            combine::produce(|| Value::Push {
220                                kind: PushKind::Other("".to_string()),
221                                data: vec![],
222                            })
223                            .left()
224                        } else {
225                            let length = length as usize;
226                            combine::count_min_max(length, length, value(Some(count + 1)))
227                                .and_then(|result: Vec<Value>| {
228                                    let mut it = result.into_iter();
229                                    let first = it.next().unwrap_or(Value::Nil);
230                                    if let Value::BulkString(kind) = first {
231                                        let push_kind = String::from_utf8(kind)
232                                            .map_err(StreamErrorFor::<I>::other)?;
233                                        Ok(Value::Push {
234                                            kind: get_push_kind(push_kind),
235                                            data: it.collect(),
236                                        })
237                                    } else if let Value::SimpleString(kind) = first {
238                                        Ok(Value::Push {
239                                            kind: get_push_kind(kind),
240                                            data: it.collect(),
241                                        })
242                                    } else {
243                                        Err(StreamErrorFor::<I>::message_static_message(
244                                            "parse error when decoding push",
245                                        ))
246                                    }
247                                })
248                                .right()
249                        }
250                    })
251                };
252                let null = || line().map(|_| Value::Nil);
253                let double = || {
254                    line().and_then(|line| {
255                        line.trim()
256                            .parse::<f64>()
257                            .map_err(StreamErrorFor::<I>::other)
258                    })
259                };
260                let boolean = || {
261                    line().and_then(|line: &str| match line {
262                        "t" => Ok(true),
263                        "f" => Ok(false),
264                        _ => Err(StreamErrorFor::<I>::message_static_message(
265                            "Expected boolean, got garbage",
266                        )),
267                    })
268                };
269                let blob_error = || blob().map(|line| err_parser(&line));
270                let verbatim = || {
271                    blob().and_then(|line| {
272                        if let Some((format, text)) = line.split_once(':') {
273                            let format = match format {
274                                "txt" => VerbatimFormat::Text,
275                                "mkd" => VerbatimFormat::Markdown,
276                                x => VerbatimFormat::Unknown(x.to_string()),
277                            };
278                            Ok(Value::VerbatimString {
279                                format,
280                                text: text.to_string(),
281                            })
282                        } else {
283                            Err(StreamErrorFor::<I>::message_static_message(
284                                "parse error when decoding verbatim string",
285                            ))
286                        }
287                    })
288                };
289                let big_number = || {
290                    line().and_then(|line| {
291                        #[cfg(not(feature = "num-bigint"))]
292                        return Ok::<_, StreamErrorFor<I>>(Value::BigNumber(
293                            line.as_bytes().to_vec(),
294                        ));
295                        #[cfg(feature = "num-bigint")]
296                        num_bigint::BigInt::parse_bytes(line.as_bytes(), 10)
297                            .ok_or_else(|| {
298                                StreamErrorFor::<I>::message_static_message(
299                                    "Expected bigint, got garbage",
300                                )
301                            })
302                            .map(Value::BigNumber)
303                    })
304                };
305                combine::dispatch!(b;
306                    b'+' => simple_string(),
307                    b':' => int().map(Value::Int),
308                    b'$' => bulk_string(),
309                    b'*' => array(),
310                    b'%' => map(),
311                    b'|' => attribute(),
312                    b'~' => set(),
313                    b'-' => error().map(Value::ServerError),
314                    b'_' => null(),
315                    b',' => double().map(Value::Double),
316                    b'#' => boolean().map(Value::Boolean),
317                    b'!' => blob_error().map(Value::ServerError),
318                    b'=' => verbatim(),
319                    b'(' => big_number(),
320                    b'>' => push(),
321                    b => combine::unexpected_any(combine::error::Token(b))
322                )
323            })
324    ))
325}
326
327// a macro is needed because of lifetime shenanigans with `decoder`.
328macro_rules! to_redis_err {
329    ($err: expr, $decoder: expr) => {
330        match $err {
331            decoder::Error::Io { error, .. } => error.into(),
332            decoder::Error::Parse(err) => {
333                if err.is_unexpected_end_of_input() {
334                    RedisError::from(io::Error::from(io::ErrorKind::UnexpectedEof))
335                } else {
336                    let err = err
337                        .map_range(|range| format!("{range:?}"))
338                        .map_position(|pos| pos.translate_position($decoder.buffer()))
339                        .to_string();
340                    RedisError::from(ParsingError::from(err))
341                }
342            }
343        }
344    };
345}
346
347#[cfg(feature = "aio")]
348mod aio_support {
349    use super::*;
350
351    use bytes::{Buf, BytesMut};
352    use tokio::io::AsyncRead;
353    use tokio_util::codec::{Decoder, Encoder};
354
355    #[derive(Default)]
356    pub struct ValueCodec {
357        state: AnySendSyncPartialState,
358    }
359
360    impl ValueCodec {
361        fn decode_stream(&mut self, bytes: &mut BytesMut, eof: bool) -> RedisResult<Option<Value>> {
362            let (opt, removed_len) = {
363                let buffer = &bytes[..];
364                let mut stream =
365                    combine::easy::Stream(combine::stream::MaybePartialStream(buffer, !eof));
366                match combine::stream::decode_tokio(value(None), &mut stream, &mut self.state) {
367                    Ok(x) => x,
368                    Err(err) => {
369                        let err = err
370                            .map_position(|pos| pos.translate_position(buffer))
371                            .map_range(|range| format!("{range:?}"))
372                            .to_string();
373                        return Err(RedisError::from(ParsingError::from(err)));
374                    }
375                }
376            };
377
378            bytes.advance(removed_len);
379            match opt {
380                Some(result) => Ok(Some(result)),
381                None => Ok(None),
382            }
383        }
384    }
385
386    impl Encoder<Vec<u8>> for ValueCodec {
387        type Error = RedisError;
388        fn encode(&mut self, item: Vec<u8>, dst: &mut BytesMut) -> Result<(), Self::Error> {
389            dst.extend_from_slice(item.as_ref());
390            Ok(())
391        }
392    }
393
394    impl Decoder for ValueCodec {
395        type Item = Value;
396        type Error = RedisError;
397
398        fn decode(&mut self, bytes: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
399            self.decode_stream(bytes, false)
400        }
401
402        fn decode_eof(&mut self, bytes: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
403            self.decode_stream(bytes, true)
404        }
405    }
406
407    /// Parses a redis value asynchronously.
408    pub async fn parse_redis_value_async<R>(
409        decoder: &mut combine::stream::Decoder<AnySendSyncPartialState, PointerOffset<[u8]>>,
410        read: &mut R,
411    ) -> RedisResult<Value>
412    where
413        R: AsyncRead + std::marker::Unpin,
414    {
415        let result = combine::decode_tokio!(*decoder, *read, value(None), |input, _| {
416            combine::stream::easy::Stream::from(input)
417        });
418        match result {
419            Err(err) => Err(to_redis_err!(err, decoder)),
420            Ok(result) => Ok(result),
421        }
422    }
423}
424
425#[cfg(feature = "aio")]
426#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
427pub use self::aio_support::*;
428
429/// The internal redis response parser.
430pub struct Parser {
431    decoder: Decoder<AnySendSyncPartialState, PointerOffset<[u8]>>,
432}
433
434impl Default for Parser {
435    fn default() -> Self {
436        Parser::new()
437    }
438}
439
440/// The parser can be used to parse redis responses into values.  Generally
441/// you normally do not use this directly as it's already done for you by
442/// the client but in some more complex situations it might be useful to be
443/// able to parse the redis responses.
444impl Parser {
445    /// Creates a new parser that parses the data behind the reader.  More
446    /// than one value can be behind the reader in which case the parser can
447    /// be invoked multiple times.  In other words: the stream does not have
448    /// to be terminated.
449    pub fn new() -> Parser {
450        Parser {
451            decoder: Decoder::new(),
452        }
453    }
454
455    // public api
456
457    /// Parses synchronously into a single value from the reader.
458    pub fn parse_value<T: Read>(&mut self, mut reader: T) -> RedisResult<Value> {
459        let mut decoder = &mut self.decoder;
460        let result = combine::decode!(decoder, reader, value(None), |input, _| {
461            combine::stream::easy::Stream::from(input)
462        });
463        match result {
464            Err(err) => Err(to_redis_err!(err, decoder)),
465            Ok(result) => Ok(result),
466        }
467    }
468}
469
470/// Parses bytes into a redis value.
471///
472/// This is the most straightforward way to parse something into a low
473/// level redis value instead of having to use a whole parser.
474pub fn parse_redis_value(bytes: &[u8]) -> RedisResult<Value> {
475    let mut parser = Parser::new();
476    parser.parse_value(bytes)
477}
478
479#[cfg(test)]
480mod tests {
481    use super::*;
482    use crate::errors::ErrorKind;
483
484    #[cfg(feature = "aio")]
485    #[test]
486    fn decode_eof_returns_none_at_eof() {
487        use tokio_util::codec::Decoder;
488        let mut codec = ValueCodec::default();
489
490        let mut bytes = bytes::BytesMut::from(&b"+GET 123\r\n"[..]);
491        assert_eq!(
492            codec.decode_eof(&mut bytes),
493            Ok(Some(parse_redis_value(b"+GET 123\r\n").unwrap()))
494        );
495        assert_eq!(codec.decode_eof(&mut bytes), Ok(None));
496        assert_eq!(codec.decode_eof(&mut bytes), Ok(None));
497    }
498
499    #[cfg(feature = "aio")]
500    #[test]
501    fn decode_eof_returns_error_inside_array_and_can_parse_more_inputs() {
502        use tokio_util::codec::Decoder;
503        let mut codec = ValueCodec::default();
504
505        let mut bytes =
506            bytes::BytesMut::from(b"*3\r\n+OK\r\n-LOADING server is loading\r\n+OK\r\n".as_slice());
507        let result = codec.decode_eof(&mut bytes).unwrap().unwrap();
508
509        assert_eq!(
510            result,
511            Value::Array(vec![
512                Value::Okay,
513                Value::ServerError(ServerError(Repr::Known {
514                    kind: ServerErrorKind::BusyLoading,
515                    detail: Some(arcstr::literal!("server is loading"))
516                })),
517                Value::Okay
518            ])
519        );
520
521        let mut bytes = bytes::BytesMut::from(b"+OK\r\n".as_slice());
522        let result = codec.decode_eof(&mut bytes).unwrap().unwrap();
523
524        assert_eq!(result, Value::Okay);
525    }
526
527    #[test]
528    fn parse_nested_error_and_handle_more_inputs() {
529        // from https://redis.io/docs/interact/transactions/ -
530        // "EXEC returned two-element bulk string reply where one is an OK code and the other an error reply. It's up to the client library to find a sensible way to provide the error to the user."
531
532        let bytes = b"*3\r\n+OK\r\n-LOADING server is loading\r\n+OK\r\n";
533        let result = parse_redis_value(bytes);
534
535        assert_eq!(
536            result.unwrap(),
537            Value::Array(vec![
538                Value::Okay,
539                Value::ServerError(ServerError(Repr::Known {
540                    kind: ServerErrorKind::BusyLoading,
541                    detail: Some(arcstr::literal!("server is loading"))
542                })),
543                Value::Okay
544            ])
545        );
546
547        let result = parse_redis_value(b"+OK\r\n").unwrap();
548
549        assert_eq!(result, Value::Okay);
550    }
551
552    #[test]
553    fn decode_resp3_double() {
554        let val = parse_redis_value(b",1.23\r\n").unwrap();
555        assert_eq!(val, Value::Double(1.23));
556        let val = parse_redis_value(b",nan\r\n").unwrap();
557        if let Value::Double(val) = val {
558            assert!(val.is_sign_positive());
559            assert!(val.is_nan());
560        } else {
561            panic!("expected double");
562        }
563        // -nan is supported prior to redis 7.2
564        let val = parse_redis_value(b",-nan\r\n").unwrap();
565        if let Value::Double(val) = val {
566            assert!(val.is_sign_negative());
567            assert!(val.is_nan());
568        } else {
569            panic!("expected double");
570        }
571        //Allow doubles in scientific E notation
572        let val = parse_redis_value(b",2.67923e+8\r\n").unwrap();
573        assert_eq!(val, Value::Double(267923000.0));
574        let val = parse_redis_value(b",2.67923E+8\r\n").unwrap();
575        assert_eq!(val, Value::Double(267923000.0));
576        let val = parse_redis_value(b",-2.67923E+8\r\n").unwrap();
577        assert_eq!(val, Value::Double(-267923000.0));
578        let val = parse_redis_value(b",2.1E-2\r\n").unwrap();
579        assert_eq!(val, Value::Double(0.021));
580
581        let val = parse_redis_value(b",-inf\r\n").unwrap();
582        assert_eq!(val, Value::Double(-f64::INFINITY));
583        let val = parse_redis_value(b",inf\r\n").unwrap();
584        assert_eq!(val, Value::Double(f64::INFINITY));
585    }
586
587    #[test]
588    fn decode_resp3_map() {
589        let val = parse_redis_value(b"%2\r\n+first\r\n:1\r\n+second\r\n:2\r\n").unwrap();
590        let mut v = val.as_map_iter().unwrap();
591        assert_eq!(
592            (&Value::SimpleString("first".to_string()), &Value::Int(1)),
593            v.next().unwrap()
594        );
595        assert_eq!(
596            (&Value::SimpleString("second".to_string()), &Value::Int(2)),
597            v.next().unwrap()
598        );
599    }
600
601    #[test]
602    fn decode_resp3_boolean() {
603        let val = parse_redis_value(b"#t\r\n").unwrap();
604        assert_eq!(val, Value::Boolean(true));
605        let val = parse_redis_value(b"#f\r\n").unwrap();
606        assert_eq!(val, Value::Boolean(false));
607        let val = parse_redis_value(b"#x\r\n");
608        assert!(val.is_err());
609        let val = parse_redis_value(b"#\r\n");
610        assert!(val.is_err());
611    }
612
613    #[test]
614    fn decode_resp3_blob_error() {
615        let val = parse_redis_value(b"!21\r\nSYNTAX invalid syntax\r\n");
616        assert_eq!(
617            val.unwrap(),
618            Value::ServerError(ServerError(Repr::Extension {
619                code: arcstr::literal!("SYNTAX"),
620                detail: Some(arcstr::literal!("invalid syntax"))
621            }))
622        )
623    }
624
625    #[test]
626    fn decode_resp3_big_number() {
627        let val = parse_redis_value(b"(3492890328409238509324850943850943825024385\r\n").unwrap();
628        #[cfg(feature = "num-bigint")]
629        let expected = Value::BigNumber(
630            num_bigint::BigInt::parse_bytes(b"3492890328409238509324850943850943825024385", 10)
631                .unwrap(),
632        );
633        #[cfg(not(feature = "num-bigint"))]
634        let expected = Value::BigNumber(b"3492890328409238509324850943850943825024385".to_vec());
635        assert_eq!(val, expected);
636    }
637
638    #[test]
639    fn decode_resp3_set() {
640        let val = parse_redis_value(b"~5\r\n+orange\r\n+apple\r\n#t\r\n:100\r\n:999\r\n").unwrap();
641        let v = val.as_sequence().unwrap();
642        assert_eq!(Value::SimpleString("orange".to_string()), v[0]);
643        assert_eq!(Value::SimpleString("apple".to_string()), v[1]);
644        assert_eq!(Value::Boolean(true), v[2]);
645        assert_eq!(Value::Int(100), v[3]);
646        assert_eq!(Value::Int(999), v[4]);
647    }
648
649    #[test]
650    fn decode_resp3_push() {
651        let val = parse_redis_value(b">3\r\n+message\r\n+somechannel\r\n+this is the message\r\n")
652            .unwrap();
653        if let Value::Push { ref kind, ref data } = val {
654            assert_eq!(&PushKind::Message, kind);
655            assert_eq!(Value::SimpleString("somechannel".to_string()), data[0]);
656            assert_eq!(
657                Value::SimpleString("this is the message".to_string()),
658                data[1]
659            );
660        } else {
661            panic!("Expected Value::Push")
662        }
663    }
664
665    #[test]
666    fn test_max_recursion_depth_set_and_array() {
667        for test_byte in ["*", "~"] {
668            let initial = format!("{test_byte}1\r\n").as_bytes().to_vec();
669            let end = format!("{test_byte}0\r\n").as_bytes().to_vec();
670
671            let mut ba = initial.repeat(MAX_RECURSE_DEPTH - 1).to_vec();
672            ba.extend(end.clone());
673            match parse_redis_value(&ba) {
674                Ok(Value::Array(a)) => assert_eq!(a.len(), 1),
675                Ok(Value::Set(s)) => assert_eq!(s.len(), 1),
676                _ => panic!("Expected valid array or set"),
677            }
678
679            let mut ba = initial.repeat(MAX_RECURSE_DEPTH).to_vec();
680            ba.extend(end);
681            match parse_redis_value(&ba) {
682                Ok(_) => panic!("Expected ParseError"),
683                Err(e) => assert!(matches!(e.kind(), ErrorKind::Parse)),
684            }
685        }
686    }
687
688    #[test]
689    fn test_max_recursion_depth_map() {
690        let initial = b"%1\r\n+a\r\n";
691        let end = b"%0\r\n";
692
693        let mut ba = initial.repeat(MAX_RECURSE_DEPTH - 1).to_vec();
694        ba.extend(*end);
695        match parse_redis_value(&ba) {
696            Ok(Value::Map(m)) => assert_eq!(m.len(), 1),
697            Ok(Value::Set(s)) => assert_eq!(s.len(), 1),
698            _ => panic!("Expected valid array or set"),
699        }
700
701        let mut ba = initial.repeat(MAX_RECURSE_DEPTH).to_vec();
702        ba.extend(end);
703        match parse_redis_value(&ba) {
704            Ok(_) => panic!("Expected ParseError"),
705            Err(e) => assert!(matches!(e.kind(), ErrorKind::Parse)),
706        }
707    }
708}