1use std::{
2 io::{self, Read},
3 str,
4};
5
6use crate::errors::{ParsingError, RedisError, Repr, ServerError, ServerErrorKind};
7use crate::types::{PushKind, RedisResult, Value, VerbatimFormat};
8
9use combine::{
10 any,
11 error::StreamError,
12 opaque,
13 parser::{
14 byte::{crlf, take_until_bytes},
15 combinator::{any_send_sync_partial_state, AnySendSyncPartialState},
16 range::{recognize, take},
17 },
18 stream::{
19 decoder::{self, Decoder},
20 PointerOffset, RangeStream, StreamErrorFor,
21 },
22 unexpected_any, ParseError, Parser as _,
23};
24
25const MAX_RECURSE_DEPTH: usize = 100;
26
27fn err_parser(line: &str) -> ServerError {
28 let mut pieces = line.splitn(2, ' ');
29 let kind = match pieces.next().unwrap() {
30 "ERR" => ServerErrorKind::ResponseError,
31 "EXECABORT" => ServerErrorKind::ExecAbort,
32 "LOADING" => ServerErrorKind::BusyLoading,
33 "NOSCRIPT" => ServerErrorKind::NoScript,
34 "MOVED" => ServerErrorKind::Moved,
35 "ASK" => ServerErrorKind::Ask,
36 "TRYAGAIN" => ServerErrorKind::TryAgain,
37 "CLUSTERDOWN" => ServerErrorKind::ClusterDown,
38 "CROSSSLOT" => ServerErrorKind::CrossSlot,
39 "MASTERDOWN" => ServerErrorKind::MasterDown,
40 "READONLY" => ServerErrorKind::ReadOnly,
41 "NOTBUSY" => ServerErrorKind::NotBusy,
42 "NOSUB" => ServerErrorKind::NoSub,
43 "NOPERM" => ServerErrorKind::NoPerm,
44 code => {
45 return ServerError(Repr::Extension {
46 code: code.into(),
47 detail: pieces.next().map(|str| str.into()),
48 })
49 }
50 };
51 let detail = pieces.next().map(|str| str.into());
52 ServerError(Repr::Known { kind, detail })
53}
54
55pub fn get_push_kind(kind: String) -> PushKind {
56 match kind.as_str() {
57 "invalidate" => PushKind::Invalidate,
58 "message" => PushKind::Message,
59 "pmessage" => PushKind::PMessage,
60 "smessage" => PushKind::SMessage,
61 "unsubscribe" => PushKind::Unsubscribe,
62 "punsubscribe" => PushKind::PUnsubscribe,
63 "sunsubscribe" => PushKind::SUnsubscribe,
64 "subscribe" => PushKind::Subscribe,
65 "psubscribe" => PushKind::PSubscribe,
66 "ssubscribe" => PushKind::SSubscribe,
67 _ => PushKind::Other(kind),
68 }
69}
70
71fn value<'a, I>(
72 count: Option<usize>,
73) -> impl combine::Parser<I, Output = Value, PartialState = AnySendSyncPartialState>
74where
75 I: RangeStream<Token = u8, Range = &'a [u8]>,
76 I::Error: combine::ParseError<u8, &'a [u8], I::Position>,
77{
78 let count = count.unwrap_or(1);
79
80 opaque!(any_send_sync_partial_state(
81 any()
82 .then_partial(move |&mut b| {
83 if count > MAX_RECURSE_DEPTH {
84 combine::unexpected_any("Maximum recursion depth exceeded").left()
85 } else {
86 combine::value(b).right()
87 }
88 })
89 .then_partial(move |&mut b| {
90 let line = || {
91 recognize(take_until_bytes(&b"\r\n"[..]).with(take(2).map(|_| ()))).and_then(
92 |line: &[u8]| {
93 str::from_utf8(&line[..line.len() - 2])
94 .map_err(StreamErrorFor::<I>::other)
95 },
96 )
97 };
98
99 let simple_string = || {
100 line().map(|line| {
101 if line == "OK" {
102 Value::Okay
103 } else {
104 Value::SimpleString(line.into())
105 }
106 })
107 };
108
109 let int = || {
110 line().and_then(|line| {
111 line.trim().parse::<i64>().map_err(|_| {
112 StreamErrorFor::<I>::message_static_message(
113 "Expected integer, got garbage",
114 )
115 })
116 })
117 };
118
119 let bulk_string = || {
120 int().then_partial(move |size| {
121 if *size < 0 {
122 combine::produce(|| Value::Nil).left()
123 } else {
124 take(*size as usize)
125 .map(|bs: &[u8]| Value::BulkString(bs.to_vec()))
126 .skip(crlf())
127 .right()
128 }
129 })
130 };
131 let blob = || {
132 int().then_partial(move |size| {
133 take(*size as usize)
134 .map(|bs: &[u8]| String::from_utf8_lossy(bs).to_string())
135 .skip(crlf())
136 })
137 };
138
139 let array = || {
140 int().then_partial(move |&mut length| {
141 if length < 0 {
142 combine::produce(|| Value::Nil).left()
143 } else {
144 let length = length as usize;
145 combine::count_min_max(length, length, value(Some(count + 1)))
146 .map(Value::Array)
147 .right()
148 }
149 })
150 };
151
152 let error = || line().map(err_parser);
153 let map = || {
154 int().then_partial(move |&mut kv_length| {
155 match (kv_length as usize).checked_mul(2) {
156 Some(length) => {
157 combine::count_min_max(length, length, value(Some(count + 1)))
158 .map(move |result: Vec<Value>| {
159 let mut it = result.into_iter();
160 let mut x = vec![];
161 for _ in 0..kv_length {
162 if let (Some(k), Some(v)) = (it.next(), it.next()) {
163 x.push((k, v))
164 }
165 }
166 Value::Map(x)
167 })
168 .left()
169 }
170 None => {
171 unexpected_any("Attribute key-value length is too large").right()
172 }
173 }
174 })
175 };
176 let attribute = || {
177 int().then_partial(move |&mut kv_length| {
178 match (kv_length as usize).checked_mul(2) {
179 Some(length) => {
180 let length = length + 1;
182 combine::count_min_max(length, length, value(Some(count + 1)))
183 .map(move |result: Vec<Value>| {
184 let mut it = result.into_iter();
185 let mut attributes = vec![];
186 for _ in 0..kv_length {
187 if let (Some(k), Some(v)) = (it.next(), it.next()) {
188 attributes.push((k, v))
189 }
190 }
191 Value::Attribute {
192 data: Box::new(it.next().unwrap()),
193 attributes,
194 }
195 })
196 .left()
197 }
198 None => {
199 unexpected_any("Attribute key-value length is too large").right()
200 }
201 }
202 })
203 };
204 let set = || {
205 int().then_partial(move |&mut length| {
206 if length < 0 {
207 combine::produce(|| Value::Nil).left()
208 } else {
209 let length = length as usize;
210 combine::count_min_max(length, length, value(Some(count + 1)))
211 .map(Value::Set)
212 .right()
213 }
214 })
215 };
216 let push = || {
217 int().then_partial(move |&mut length| {
218 if length <= 0 {
219 combine::produce(|| Value::Push {
220 kind: PushKind::Other("".to_string()),
221 data: vec![],
222 })
223 .left()
224 } else {
225 let length = length as usize;
226 combine::count_min_max(length, length, value(Some(count + 1)))
227 .and_then(|result: Vec<Value>| {
228 let mut it = result.into_iter();
229 let first = it.next().unwrap_or(Value::Nil);
230 if let Value::BulkString(kind) = first {
231 let push_kind = String::from_utf8(kind)
232 .map_err(StreamErrorFor::<I>::other)?;
233 Ok(Value::Push {
234 kind: get_push_kind(push_kind),
235 data: it.collect(),
236 })
237 } else if let Value::SimpleString(kind) = first {
238 Ok(Value::Push {
239 kind: get_push_kind(kind),
240 data: it.collect(),
241 })
242 } else {
243 Err(StreamErrorFor::<I>::message_static_message(
244 "parse error when decoding push",
245 ))
246 }
247 })
248 .right()
249 }
250 })
251 };
252 let null = || line().map(|_| Value::Nil);
253 let double = || {
254 line().and_then(|line| {
255 line.trim()
256 .parse::<f64>()
257 .map_err(StreamErrorFor::<I>::other)
258 })
259 };
260 let boolean = || {
261 line().and_then(|line: &str| match line {
262 "t" => Ok(true),
263 "f" => Ok(false),
264 _ => Err(StreamErrorFor::<I>::message_static_message(
265 "Expected boolean, got garbage",
266 )),
267 })
268 };
269 let blob_error = || blob().map(|line| err_parser(&line));
270 let verbatim = || {
271 blob().and_then(|line| {
272 if let Some((format, text)) = line.split_once(':') {
273 let format = match format {
274 "txt" => VerbatimFormat::Text,
275 "mkd" => VerbatimFormat::Markdown,
276 x => VerbatimFormat::Unknown(x.to_string()),
277 };
278 Ok(Value::VerbatimString {
279 format,
280 text: text.to_string(),
281 })
282 } else {
283 Err(StreamErrorFor::<I>::message_static_message(
284 "parse error when decoding verbatim string",
285 ))
286 }
287 })
288 };
289 let big_number = || {
290 line().and_then(|line| {
291 #[cfg(not(feature = "num-bigint"))]
292 return Ok::<_, StreamErrorFor<I>>(Value::BigNumber(
293 line.as_bytes().to_vec(),
294 ));
295 #[cfg(feature = "num-bigint")]
296 num_bigint::BigInt::parse_bytes(line.as_bytes(), 10)
297 .ok_or_else(|| {
298 StreamErrorFor::<I>::message_static_message(
299 "Expected bigint, got garbage",
300 )
301 })
302 .map(Value::BigNumber)
303 })
304 };
305 combine::dispatch!(b;
306 b'+' => simple_string(),
307 b':' => int().map(Value::Int),
308 b'$' => bulk_string(),
309 b'*' => array(),
310 b'%' => map(),
311 b'|' => attribute(),
312 b'~' => set(),
313 b'-' => error().map(Value::ServerError),
314 b'_' => null(),
315 b',' => double().map(Value::Double),
316 b'#' => boolean().map(Value::Boolean),
317 b'!' => blob_error().map(Value::ServerError),
318 b'=' => verbatim(),
319 b'(' => big_number(),
320 b'>' => push(),
321 b => combine::unexpected_any(combine::error::Token(b))
322 )
323 })
324 ))
325}
326
327macro_rules! to_redis_err {
329 ($err: expr, $decoder: expr) => {
330 match $err {
331 decoder::Error::Io { error, .. } => error.into(),
332 decoder::Error::Parse(err) => {
333 if err.is_unexpected_end_of_input() {
334 RedisError::from(io::Error::from(io::ErrorKind::UnexpectedEof))
335 } else {
336 let err = err
337 .map_range(|range| format!("{range:?}"))
338 .map_position(|pos| pos.translate_position($decoder.buffer()))
339 .to_string();
340 RedisError::from(ParsingError::from(err))
341 }
342 }
343 }
344 };
345}
346
347#[cfg(feature = "aio")]
348mod aio_support {
349 use super::*;
350
351 use bytes::{Buf, BytesMut};
352 use tokio::io::AsyncRead;
353 use tokio_util::codec::{Decoder, Encoder};
354
355 #[derive(Default)]
356 pub struct ValueCodec {
357 state: AnySendSyncPartialState,
358 }
359
360 impl ValueCodec {
361 fn decode_stream(&mut self, bytes: &mut BytesMut, eof: bool) -> RedisResult<Option<Value>> {
362 let (opt, removed_len) = {
363 let buffer = &bytes[..];
364 let mut stream =
365 combine::easy::Stream(combine::stream::MaybePartialStream(buffer, !eof));
366 match combine::stream::decode_tokio(value(None), &mut stream, &mut self.state) {
367 Ok(x) => x,
368 Err(err) => {
369 let err = err
370 .map_position(|pos| pos.translate_position(buffer))
371 .map_range(|range| format!("{range:?}"))
372 .to_string();
373 return Err(RedisError::from(ParsingError::from(err)));
374 }
375 }
376 };
377
378 bytes.advance(removed_len);
379 match opt {
380 Some(result) => Ok(Some(result)),
381 None => Ok(None),
382 }
383 }
384 }
385
386 impl Encoder<Vec<u8>> for ValueCodec {
387 type Error = RedisError;
388 fn encode(&mut self, item: Vec<u8>, dst: &mut BytesMut) -> Result<(), Self::Error> {
389 dst.extend_from_slice(item.as_ref());
390 Ok(())
391 }
392 }
393
394 impl Decoder for ValueCodec {
395 type Item = Value;
396 type Error = RedisError;
397
398 fn decode(&mut self, bytes: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
399 self.decode_stream(bytes, false)
400 }
401
402 fn decode_eof(&mut self, bytes: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
403 self.decode_stream(bytes, true)
404 }
405 }
406
407 pub async fn parse_redis_value_async<R>(
409 decoder: &mut combine::stream::Decoder<AnySendSyncPartialState, PointerOffset<[u8]>>,
410 read: &mut R,
411 ) -> RedisResult<Value>
412 where
413 R: AsyncRead + std::marker::Unpin,
414 {
415 let result = combine::decode_tokio!(*decoder, *read, value(None), |input, _| {
416 combine::stream::easy::Stream::from(input)
417 });
418 match result {
419 Err(err) => Err(to_redis_err!(err, decoder)),
420 Ok(result) => Ok(result),
421 }
422 }
423}
424
425#[cfg(feature = "aio")]
426#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
427pub use self::aio_support::*;
428
429pub struct Parser {
431 decoder: Decoder<AnySendSyncPartialState, PointerOffset<[u8]>>,
432}
433
434impl Default for Parser {
435 fn default() -> Self {
436 Parser::new()
437 }
438}
439
440impl Parser {
445 pub fn new() -> Parser {
450 Parser {
451 decoder: Decoder::new(),
452 }
453 }
454
455 pub fn parse_value<T: Read>(&mut self, mut reader: T) -> RedisResult<Value> {
459 let mut decoder = &mut self.decoder;
460 let result = combine::decode!(decoder, reader, value(None), |input, _| {
461 combine::stream::easy::Stream::from(input)
462 });
463 match result {
464 Err(err) => Err(to_redis_err!(err, decoder)),
465 Ok(result) => Ok(result),
466 }
467 }
468}
469
470pub fn parse_redis_value(bytes: &[u8]) -> RedisResult<Value> {
475 let mut parser = Parser::new();
476 parser.parse_value(bytes)
477}
478
479#[cfg(test)]
480mod tests {
481 use super::*;
482 use crate::errors::ErrorKind;
483
484 #[cfg(feature = "aio")]
485 #[test]
486 fn decode_eof_returns_none_at_eof() {
487 use tokio_util::codec::Decoder;
488 let mut codec = ValueCodec::default();
489
490 let mut bytes = bytes::BytesMut::from(&b"+GET 123\r\n"[..]);
491 assert_eq!(
492 codec.decode_eof(&mut bytes),
493 Ok(Some(parse_redis_value(b"+GET 123\r\n").unwrap()))
494 );
495 assert_eq!(codec.decode_eof(&mut bytes), Ok(None));
496 assert_eq!(codec.decode_eof(&mut bytes), Ok(None));
497 }
498
499 #[cfg(feature = "aio")]
500 #[test]
501 fn decode_eof_returns_error_inside_array_and_can_parse_more_inputs() {
502 use tokio_util::codec::Decoder;
503 let mut codec = ValueCodec::default();
504
505 let mut bytes =
506 bytes::BytesMut::from(b"*3\r\n+OK\r\n-LOADING server is loading\r\n+OK\r\n".as_slice());
507 let result = codec.decode_eof(&mut bytes).unwrap().unwrap();
508
509 assert_eq!(
510 result,
511 Value::Array(vec![
512 Value::Okay,
513 Value::ServerError(ServerError(Repr::Known {
514 kind: ServerErrorKind::BusyLoading,
515 detail: Some(arcstr::literal!("server is loading"))
516 })),
517 Value::Okay
518 ])
519 );
520
521 let mut bytes = bytes::BytesMut::from(b"+OK\r\n".as_slice());
522 let result = codec.decode_eof(&mut bytes).unwrap().unwrap();
523
524 assert_eq!(result, Value::Okay);
525 }
526
527 #[test]
528 fn parse_nested_error_and_handle_more_inputs() {
529 let bytes = b"*3\r\n+OK\r\n-LOADING server is loading\r\n+OK\r\n";
533 let result = parse_redis_value(bytes);
534
535 assert_eq!(
536 result.unwrap(),
537 Value::Array(vec![
538 Value::Okay,
539 Value::ServerError(ServerError(Repr::Known {
540 kind: ServerErrorKind::BusyLoading,
541 detail: Some(arcstr::literal!("server is loading"))
542 })),
543 Value::Okay
544 ])
545 );
546
547 let result = parse_redis_value(b"+OK\r\n").unwrap();
548
549 assert_eq!(result, Value::Okay);
550 }
551
552 #[test]
553 fn decode_resp3_double() {
554 let val = parse_redis_value(b",1.23\r\n").unwrap();
555 assert_eq!(val, Value::Double(1.23));
556 let val = parse_redis_value(b",nan\r\n").unwrap();
557 if let Value::Double(val) = val {
558 assert!(val.is_sign_positive());
559 assert!(val.is_nan());
560 } else {
561 panic!("expected double");
562 }
563 let val = parse_redis_value(b",-nan\r\n").unwrap();
565 if let Value::Double(val) = val {
566 assert!(val.is_sign_negative());
567 assert!(val.is_nan());
568 } else {
569 panic!("expected double");
570 }
571 let val = parse_redis_value(b",2.67923e+8\r\n").unwrap();
573 assert_eq!(val, Value::Double(267923000.0));
574 let val = parse_redis_value(b",2.67923E+8\r\n").unwrap();
575 assert_eq!(val, Value::Double(267923000.0));
576 let val = parse_redis_value(b",-2.67923E+8\r\n").unwrap();
577 assert_eq!(val, Value::Double(-267923000.0));
578 let val = parse_redis_value(b",2.1E-2\r\n").unwrap();
579 assert_eq!(val, Value::Double(0.021));
580
581 let val = parse_redis_value(b",-inf\r\n").unwrap();
582 assert_eq!(val, Value::Double(-f64::INFINITY));
583 let val = parse_redis_value(b",inf\r\n").unwrap();
584 assert_eq!(val, Value::Double(f64::INFINITY));
585 }
586
587 #[test]
588 fn decode_resp3_map() {
589 let val = parse_redis_value(b"%2\r\n+first\r\n:1\r\n+second\r\n:2\r\n").unwrap();
590 let mut v = val.as_map_iter().unwrap();
591 assert_eq!(
592 (&Value::SimpleString("first".to_string()), &Value::Int(1)),
593 v.next().unwrap()
594 );
595 assert_eq!(
596 (&Value::SimpleString("second".to_string()), &Value::Int(2)),
597 v.next().unwrap()
598 );
599 }
600
601 #[test]
602 fn decode_resp3_boolean() {
603 let val = parse_redis_value(b"#t\r\n").unwrap();
604 assert_eq!(val, Value::Boolean(true));
605 let val = parse_redis_value(b"#f\r\n").unwrap();
606 assert_eq!(val, Value::Boolean(false));
607 let val = parse_redis_value(b"#x\r\n");
608 assert!(val.is_err());
609 let val = parse_redis_value(b"#\r\n");
610 assert!(val.is_err());
611 }
612
613 #[test]
614 fn decode_resp3_blob_error() {
615 let val = parse_redis_value(b"!21\r\nSYNTAX invalid syntax\r\n");
616 assert_eq!(
617 val.unwrap(),
618 Value::ServerError(ServerError(Repr::Extension {
619 code: arcstr::literal!("SYNTAX"),
620 detail: Some(arcstr::literal!("invalid syntax"))
621 }))
622 )
623 }
624
625 #[test]
626 fn decode_resp3_big_number() {
627 let val = parse_redis_value(b"(3492890328409238509324850943850943825024385\r\n").unwrap();
628 #[cfg(feature = "num-bigint")]
629 let expected = Value::BigNumber(
630 num_bigint::BigInt::parse_bytes(b"3492890328409238509324850943850943825024385", 10)
631 .unwrap(),
632 );
633 #[cfg(not(feature = "num-bigint"))]
634 let expected = Value::BigNumber(b"3492890328409238509324850943850943825024385".to_vec());
635 assert_eq!(val, expected);
636 }
637
638 #[test]
639 fn decode_resp3_set() {
640 let val = parse_redis_value(b"~5\r\n+orange\r\n+apple\r\n#t\r\n:100\r\n:999\r\n").unwrap();
641 let v = val.as_sequence().unwrap();
642 assert_eq!(Value::SimpleString("orange".to_string()), v[0]);
643 assert_eq!(Value::SimpleString("apple".to_string()), v[1]);
644 assert_eq!(Value::Boolean(true), v[2]);
645 assert_eq!(Value::Int(100), v[3]);
646 assert_eq!(Value::Int(999), v[4]);
647 }
648
649 #[test]
650 fn decode_resp3_push() {
651 let val = parse_redis_value(b">3\r\n+message\r\n+somechannel\r\n+this is the message\r\n")
652 .unwrap();
653 if let Value::Push { ref kind, ref data } = val {
654 assert_eq!(&PushKind::Message, kind);
655 assert_eq!(Value::SimpleString("somechannel".to_string()), data[0]);
656 assert_eq!(
657 Value::SimpleString("this is the message".to_string()),
658 data[1]
659 );
660 } else {
661 panic!("Expected Value::Push")
662 }
663 }
664
665 #[test]
666 fn test_max_recursion_depth_set_and_array() {
667 for test_byte in ["*", "~"] {
668 let initial = format!("{test_byte}1\r\n").as_bytes().to_vec();
669 let end = format!("{test_byte}0\r\n").as_bytes().to_vec();
670
671 let mut ba = initial.repeat(MAX_RECURSE_DEPTH - 1).to_vec();
672 ba.extend(end.clone());
673 match parse_redis_value(&ba) {
674 Ok(Value::Array(a)) => assert_eq!(a.len(), 1),
675 Ok(Value::Set(s)) => assert_eq!(s.len(), 1),
676 _ => panic!("Expected valid array or set"),
677 }
678
679 let mut ba = initial.repeat(MAX_RECURSE_DEPTH).to_vec();
680 ba.extend(end);
681 match parse_redis_value(&ba) {
682 Ok(_) => panic!("Expected ParseError"),
683 Err(e) => assert!(matches!(e.kind(), ErrorKind::Parse)),
684 }
685 }
686 }
687
688 #[test]
689 fn test_max_recursion_depth_map() {
690 let initial = b"%1\r\n+a\r\n";
691 let end = b"%0\r\n";
692
693 let mut ba = initial.repeat(MAX_RECURSE_DEPTH - 1).to_vec();
694 ba.extend(*end);
695 match parse_redis_value(&ba) {
696 Ok(Value::Map(m)) => assert_eq!(m.len(), 1),
697 Ok(Value::Set(s)) => assert_eq!(s.len(), 1),
698 _ => panic!("Expected valid array or set"),
699 }
700
701 let mut ba = initial.repeat(MAX_RECURSE_DEPTH).to_vec();
702 ba.extend(end);
703 match parse_redis_value(&ba) {
704 Ok(_) => panic!("Expected ParseError"),
705 Err(e) => assert!(matches!(e.kind(), ErrorKind::Parse)),
706 }
707 }
708}