1use crate::{
4 from_redis_value, types::HashMap, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs, Value,
5};
6
7use std::io::Error;
8
9macro_rules! invalid_type_error {
10 ($v:expr, $det:expr) => {{
11 fail!((
12 $crate::ErrorKind::TypeError,
13 "Response was of incompatible type",
14 format!("{:?} (response was {:?})", $det, $v)
15 ));
16 }};
17}
18
19#[derive(PartialEq, Eq, Clone, Debug, Copy)]
25pub enum StreamMaxlen {
26 Equals(usize),
28 Approx(usize),
30}
31
32impl ToRedisArgs for StreamMaxlen {
33 fn write_redis_args<W>(&self, out: &mut W)
34 where
35 W: ?Sized + RedisWrite,
36 {
37 let (ch, val) = match *self {
38 StreamMaxlen::Equals(v) => ("=", v),
39 StreamMaxlen::Approx(v) => ("~", v),
40 };
41 out.write_arg(b"MAXLEN");
42 out.write_arg(ch.as_bytes());
43 val.write_redis_args(out);
44 }
45}
46
47#[derive(Debug)]
50pub enum StreamTrimmingMode {
51 Exact,
53 Approx,
55}
56
57impl ToRedisArgs for StreamTrimmingMode {
58 fn write_redis_args<W>(&self, out: &mut W)
59 where
60 W: ?Sized + RedisWrite,
61 {
62 match self {
63 Self::Exact => out.write_arg(b"="),
64 Self::Approx => out.write_arg(b"~"),
65 };
66 }
67}
68
69#[derive(Debug)]
73pub enum StreamTrimStrategy {
74 MaxLen(StreamTrimmingMode, usize, Option<usize>),
76 MinId(StreamTrimmingMode, String, Option<usize>),
78}
79
80impl StreamTrimStrategy {
81 pub fn maxlen(trim: StreamTrimmingMode, max_entries: usize) -> Self {
83 Self::MaxLen(trim, max_entries, None)
84 }
85
86 pub fn minid(trim: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
88 Self::MinId(trim, stream_id.into(), None)
89 }
90
91 pub fn limit(self, limit: usize) -> Self {
93 match self {
94 StreamTrimStrategy::MaxLen(m, t, _) => StreamTrimStrategy::MaxLen(m, t, Some(limit)),
95 StreamTrimStrategy::MinId(m, t, _) => StreamTrimStrategy::MinId(m, t, Some(limit)),
96 }
97 }
98}
99
100impl ToRedisArgs for StreamTrimStrategy {
101 fn write_redis_args<W>(&self, out: &mut W)
102 where
103 W: ?Sized + RedisWrite,
104 {
105 let limit = match self {
106 StreamTrimStrategy::MaxLen(m, t, limit) => {
107 out.write_arg(b"MAXLEN");
108 m.write_redis_args(out);
109 t.write_redis_args(out);
110 limit
111 }
112 StreamTrimStrategy::MinId(m, t, limit) => {
113 out.write_arg(b"MINID");
114 m.write_redis_args(out);
115 t.write_redis_args(out);
116 limit
117 }
118 };
119 if let Some(limit) = limit {
120 out.write_arg(b"LIMIT");
121 limit.write_redis_args(out);
122 }
123 }
124}
125
126#[derive(Debug)]
131pub struct StreamTrimOptions {
132 strategy: StreamTrimStrategy,
133}
134
135impl StreamTrimOptions {
136 pub fn maxlen(mode: StreamTrimmingMode, max_entries: usize) -> Self {
138 Self {
139 strategy: StreamTrimStrategy::maxlen(mode, max_entries),
140 }
141 }
142
143 pub fn minid(mode: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
145 Self {
146 strategy: StreamTrimStrategy::minid(mode, stream_id),
147 }
148 }
149
150 pub fn limit(mut self, limit: usize) -> Self {
152 self.strategy = self.strategy.limit(limit);
153 self
154 }
155}
156
157impl ToRedisArgs for StreamTrimOptions {
158 fn write_redis_args<W>(&self, out: &mut W)
159 where
160 W: ?Sized + RedisWrite,
161 {
162 self.strategy.write_redis_args(out);
163 }
164}
165
166#[derive(Default, Debug)]
171pub struct StreamAddOptions {
172 nomkstream: bool,
173 trim: Option<StreamTrimStrategy>,
174}
175
176impl StreamAddOptions {
177 pub fn nomkstream(mut self) -> Self {
179 self.nomkstream = true;
180 self
181 }
182
183 pub fn trim(mut self, trim: StreamTrimStrategy) -> Self {
185 self.trim = Some(trim);
186 self
187 }
188}
189
190impl ToRedisArgs for StreamAddOptions {
191 fn write_redis_args<W>(&self, out: &mut W)
192 where
193 W: ?Sized + RedisWrite,
194 {
195 if self.nomkstream {
196 out.write_arg(b"NOMKSTREAM");
197 }
198 if let Some(strategy) = self.trim.as_ref() {
199 strategy.write_redis_args(out);
200 }
201 }
202}
203
204#[derive(Default, Debug)]
209pub struct StreamAutoClaimOptions {
210 count: Option<usize>,
211 justid: bool,
212}
213
214impl StreamAutoClaimOptions {
215 pub fn count(mut self, n: usize) -> Self {
217 self.count = Some(n);
218 self
219 }
220
221 pub fn with_justid(mut self) -> Self {
224 self.justid = true;
225 self
226 }
227}
228
229impl ToRedisArgs for StreamAutoClaimOptions {
230 fn write_redis_args<W>(&self, out: &mut W)
231 where
232 W: ?Sized + RedisWrite,
233 {
234 if let Some(ref count) = self.count {
235 out.write_arg(b"COUNT");
236 out.write_arg(format!("{count}").as_bytes());
237 }
238 if self.justid {
239 out.write_arg(b"JUSTID");
240 }
241 }
242}
243
244#[derive(Default, Debug)]
249pub struct StreamClaimOptions {
250 idle: Option<usize>,
252 time: Option<usize>,
254 retry: Option<usize>,
256 force: bool,
258 justid: bool,
261 lastid: Option<String>,
263}
264
265impl StreamClaimOptions {
266 pub fn idle(mut self, ms: usize) -> Self {
268 self.idle = Some(ms);
269 self
270 }
271
272 pub fn time(mut self, ms_time: usize) -> Self {
274 self.time = Some(ms_time);
275 self
276 }
277
278 pub fn retry(mut self, count: usize) -> Self {
280 self.retry = Some(count);
281 self
282 }
283
284 pub fn with_force(mut self) -> Self {
286 self.force = true;
287 self
288 }
289
290 pub fn with_justid(mut self) -> Self {
293 self.justid = true;
294 self
295 }
296
297 pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
299 self.lastid = Some(lastid.into());
300 self
301 }
302}
303
304impl ToRedisArgs for StreamClaimOptions {
305 fn write_redis_args<W>(&self, out: &mut W)
306 where
307 W: ?Sized + RedisWrite,
308 {
309 if let Some(ref ms) = self.idle {
310 out.write_arg(b"IDLE");
311 out.write_arg(format!("{ms}").as_bytes());
312 }
313 if let Some(ref ms_time) = self.time {
314 out.write_arg(b"TIME");
315 out.write_arg(format!("{ms_time}").as_bytes());
316 }
317 if let Some(ref count) = self.retry {
318 out.write_arg(b"RETRYCOUNT");
319 out.write_arg(format!("{count}").as_bytes());
320 }
321 if self.force {
322 out.write_arg(b"FORCE");
323 }
324 if self.justid {
325 out.write_arg(b"JUSTID");
326 }
327 if let Some(ref lastid) = self.lastid {
328 out.write_arg(b"LASTID");
329 lastid.write_redis_args(out);
330 }
331 }
332}
333
334type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
338#[derive(Default, Debug)]
343pub struct StreamReadOptions {
344 block: Option<usize>,
346 count: Option<usize>,
348 noack: Option<bool>,
350 group: SRGroup,
353}
354
355impl StreamReadOptions {
356 pub fn read_only(&self) -> bool {
359 self.group.is_none()
360 }
361
362 pub fn noack(mut self) -> Self {
366 self.noack = Some(true);
367 self
368 }
369
370 pub fn block(mut self, ms: usize) -> Self {
372 self.block = Some(ms);
373 self
374 }
375
376 pub fn count(mut self, n: usize) -> Self {
378 self.count = Some(n);
379 self
380 }
381
382 pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
384 mut self,
385 group_name: GN,
386 consumer_name: CN,
387 ) -> Self {
388 self.group = Some((
389 ToRedisArgs::to_redis_args(&group_name),
390 ToRedisArgs::to_redis_args(&consumer_name),
391 ));
392 self
393 }
394}
395
396impl ToRedisArgs for StreamReadOptions {
397 fn write_redis_args<W>(&self, out: &mut W)
398 where
399 W: ?Sized + RedisWrite,
400 {
401 if let Some(ref group) = self.group {
402 out.write_arg(b"GROUP");
403 for i in &group.0 {
404 out.write_arg(i);
405 }
406 for i in &group.1 {
407 out.write_arg(i);
408 }
409 }
410
411 if let Some(ref ms) = self.block {
412 out.write_arg(b"BLOCK");
413 out.write_arg(format!("{ms}").as_bytes());
414 }
415
416 if let Some(ref n) = self.count {
417 out.write_arg(b"COUNT");
418 out.write_arg(format!("{n}").as_bytes());
419 }
420
421 if self.group.is_some() {
422 if self.noack == Some(true) {
424 out.write_arg(b"NOACK");
425 }
426 }
427 }
428}
429
430#[derive(Default, Debug, Clone)]
435pub struct StreamAutoClaimReply {
436 pub next_stream_id: String,
438 pub claimed: Vec<StreamId>,
440 pub deleted_ids: Vec<String>,
442}
443
444#[derive(Default, Debug, Clone)]
450pub struct StreamReadReply {
451 pub keys: Vec<StreamKey>,
453}
454
455#[derive(Default, Debug, Clone)]
467pub struct StreamRangeReply {
468 pub ids: Vec<StreamId>,
470}
471
472#[derive(Default, Debug, Clone)]
479pub struct StreamClaimReply {
480 pub ids: Vec<StreamId>,
482}
483
484#[derive(Debug, Clone, Default)]
492pub enum StreamPendingReply {
493 #[default]
495 Empty,
496 Data(StreamPendingData),
498}
499
500impl StreamPendingReply {
501 pub fn count(&self) -> usize {
503 match self {
504 StreamPendingReply::Empty => 0,
505 StreamPendingReply::Data(x) => x.count,
506 }
507 }
508}
509
510#[derive(Default, Debug, Clone)]
514pub struct StreamPendingData {
515 pub count: usize,
517 pub start_id: String,
519 pub end_id: String,
521 pub consumers: Vec<StreamInfoConsumer>,
525}
526
527#[derive(Default, Debug, Clone)]
537pub struct StreamPendingCountReply {
538 pub ids: Vec<StreamPendingId>,
542}
543
544#[derive(Default, Debug, Clone)]
553pub struct StreamInfoStreamReply {
554 pub last_generated_id: String,
557 pub radix_tree_keys: usize,
560 pub groups: usize,
562 pub length: usize,
564 pub first_entry: StreamId,
566 pub last_entry: StreamId,
568}
569
570#[derive(Default, Debug, Clone)]
576pub struct StreamInfoConsumersReply {
577 pub consumers: Vec<StreamInfoConsumer>,
579}
580
581#[derive(Default, Debug, Clone)]
589pub struct StreamInfoGroupsReply {
590 pub groups: Vec<StreamInfoGroup>,
592}
593
594#[derive(Default, Debug, Clone)]
599pub struct StreamInfoConsumer {
600 pub name: String,
602 pub pending: usize,
604 pub idle: usize,
606}
607
608#[derive(Default, Debug, Clone)]
613pub struct StreamInfoGroup {
614 pub name: String,
616 pub consumers: usize,
618 pub pending: usize,
620 pub last_delivered_id: String,
622 pub entries_read: Option<usize>,
625 pub lag: Option<usize>,
628}
629
630#[derive(Default, Debug, Clone)]
634pub struct StreamPendingId {
635 pub id: String,
637 pub consumer: String,
641 pub last_delivered_ms: usize,
644 pub times_delivered: usize,
646}
647
648#[derive(Default, Debug, Clone)]
650pub struct StreamKey {
651 pub key: String,
653 pub ids: Vec<StreamId>,
655}
656
657#[derive(Default, Debug, Clone)]
659pub struct StreamId {
660 pub id: String,
662 pub map: HashMap<String, Value>,
664}
665
666impl StreamId {
667 fn from_array_value(v: &Value) -> RedisResult<Self> {
669 let mut stream_id = StreamId::default();
670 if let Value::Array(ref values) = *v {
671 if let Some(v) = values.first() {
672 stream_id.id = from_redis_value(v)?;
673 }
674 if let Some(v) = values.get(1) {
675 stream_id.map = from_redis_value(v)?;
676 }
677 }
678
679 Ok(stream_id)
680 }
681
682 pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
685 match self.map.get(key) {
686 Some(x) => from_redis_value(x).ok(),
687 None => None,
688 }
689 }
690
691 pub fn contains_key(&self, key: &str) -> bool {
693 self.map.contains_key(key)
694 }
695
696 pub fn len(&self) -> usize {
698 self.map.len()
699 }
700
701 pub fn is_empty(&self) -> bool {
703 self.len() == 0
704 }
705}
706
707type SACRows = Vec<HashMap<String, HashMap<String, Value>>>;
708
709impl FromRedisValue for StreamAutoClaimReply {
710 fn from_redis_value(v: &Value) -> RedisResult<Self> {
711 match *v {
712 Value::Array(ref items) => {
713 if let 2..=3 = items.len() {
714 let deleted_ids = if let Some(o) = items.get(2) {
715 from_redis_value(o)?
716 } else {
717 Vec::new()
718 };
719
720 let claimed: Vec<StreamId> = match &items[1] {
721 Value::Array(x)
723 if matches!(x.first(), None | Some(Value::BulkString(_))) =>
724 {
725 let ids: Vec<String> = from_redis_value(&items[1])?;
726
727 ids.into_iter()
728 .map(|id| StreamId {
729 id,
730 ..Default::default()
731 })
732 .collect()
733 }
734 Value::Array(x) if matches!(x.first(), Some(Value::Array(_))) => {
736 let rows: SACRows = from_redis_value(&items[1])?;
737
738 rows.into_iter()
739 .flat_map(|id_row| {
740 id_row.into_iter().map(|(id, map)| StreamId { id, map })
741 })
742 .collect()
743 }
744 _ => invalid_type_error!("Incorrect type", &items[1]),
745 };
746
747 Ok(Self {
748 next_stream_id: from_redis_value(&items[0])?,
749 claimed,
750 deleted_ids,
751 })
752 } else {
753 invalid_type_error!("Wrong number of entries in array response", v)
754 }
755 }
756 _ => invalid_type_error!("Not a array response", v),
757 }
758 }
759}
760
761type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
762impl FromRedisValue for StreamReadReply {
763 fn from_redis_value(v: &Value) -> RedisResult<Self> {
764 let rows: SRRows = from_redis_value(v)?;
765 let keys = rows
766 .into_iter()
767 .flat_map(|row| {
768 row.into_iter().map(|(key, entry)| {
769 let ids = entry
770 .into_iter()
771 .flat_map(|id_row| id_row.into_iter().map(|(id, map)| StreamId { id, map }))
772 .collect();
773 StreamKey { key, ids }
774 })
775 })
776 .collect();
777 Ok(StreamReadReply { keys })
778 }
779}
780
781impl FromRedisValue for StreamRangeReply {
782 fn from_redis_value(v: &Value) -> RedisResult<Self> {
783 let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
784 let ids: Vec<StreamId> = rows
785 .into_iter()
786 .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
787 .collect();
788 Ok(StreamRangeReply { ids })
789 }
790}
791
792impl FromRedisValue for StreamClaimReply {
793 fn from_redis_value(v: &Value) -> RedisResult<Self> {
794 let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
795 let ids: Vec<StreamId> = rows
796 .into_iter()
797 .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
798 .collect();
799 Ok(StreamClaimReply { ids })
800 }
801}
802
803type SPRInner = (
804 usize,
805 Option<String>,
806 Option<String>,
807 Vec<Option<(String, String)>>,
808);
809impl FromRedisValue for StreamPendingReply {
810 fn from_redis_value(v: &Value) -> RedisResult<Self> {
811 let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
812
813 if count == 0 {
814 Ok(StreamPendingReply::Empty)
815 } else {
816 let mut result = StreamPendingData::default();
817
818 let start_id = start
819 .ok_or_else(|| Error::other("IllegalState: Non-zero pending expects start id"))?;
820
821 let end_id =
822 end.ok_or_else(|| Error::other("IllegalState: Non-zero pending expects end id"))?;
823
824 result.count = count;
825 result.start_id = start_id;
826 result.end_id = end_id;
827
828 result.consumers = consumer_data
829 .into_iter()
830 .flatten()
831 .map(|(name, pending)| StreamInfoConsumer {
832 name,
833 pending: pending.parse().unwrap_or_default(),
834 ..Default::default()
835 })
836 .collect();
837
838 Ok(StreamPendingReply::Data(result))
839 }
840 }
841}
842
843impl FromRedisValue for StreamPendingCountReply {
844 fn from_redis_value(v: &Value) -> RedisResult<Self> {
845 let mut reply = StreamPendingCountReply::default();
846 match v {
847 Value::Array(outer_tuple) => {
848 for outer in outer_tuple {
849 match outer {
850 Value::Array(inner_tuple) => match &inner_tuple[..] {
851 [Value::BulkString(id_bytes), Value::BulkString(consumer_bytes), Value::Int(last_delivered_ms_u64), Value::Int(times_delivered_u64)] =>
852 {
853 let id = String::from_utf8(id_bytes.to_vec())?;
854 let consumer = String::from_utf8(consumer_bytes.to_vec())?;
855 let last_delivered_ms = *last_delivered_ms_u64 as usize;
856 let times_delivered = *times_delivered_u64 as usize;
857 reply.ids.push(StreamPendingId {
858 id,
859 consumer,
860 last_delivered_ms,
861 times_delivered,
862 });
863 }
864 _ => fail!((
865 crate::types::ErrorKind::TypeError,
866 "Cannot parse redis data (3)"
867 )),
868 },
869 _ => fail!((
870 crate::types::ErrorKind::TypeError,
871 "Cannot parse redis data (2)"
872 )),
873 }
874 }
875 }
876 _ => fail!((
877 crate::types::ErrorKind::TypeError,
878 "Cannot parse redis data (1)"
879 )),
880 };
881 Ok(reply)
882 }
883}
884
885impl FromRedisValue for StreamInfoStreamReply {
886 fn from_redis_value(v: &Value) -> RedisResult<Self> {
887 let map: HashMap<String, Value> = from_redis_value(v)?;
888 let mut reply = StreamInfoStreamReply::default();
889 if let Some(v) = &map.get("last-generated-id") {
890 reply.last_generated_id = from_redis_value(v)?;
891 }
892 if let Some(v) = &map.get("radix-tree-nodes") {
893 reply.radix_tree_keys = from_redis_value(v)?;
894 }
895 if let Some(v) = &map.get("groups") {
896 reply.groups = from_redis_value(v)?;
897 }
898 if let Some(v) = &map.get("length") {
899 reply.length = from_redis_value(v)?;
900 }
901 if let Some(v) = &map.get("first-entry") {
902 reply.first_entry = StreamId::from_array_value(v)?;
903 }
904 if let Some(v) = &map.get("last-entry") {
905 reply.last_entry = StreamId::from_array_value(v)?;
906 }
907 Ok(reply)
908 }
909}
910
911impl FromRedisValue for StreamInfoConsumersReply {
912 fn from_redis_value(v: &Value) -> RedisResult<Self> {
913 let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
914 let mut reply = StreamInfoConsumersReply::default();
915 for map in consumers {
916 let mut c = StreamInfoConsumer::default();
917 if let Some(v) = &map.get("name") {
918 c.name = from_redis_value(v)?;
919 }
920 if let Some(v) = &map.get("pending") {
921 c.pending = from_redis_value(v)?;
922 }
923 if let Some(v) = &map.get("idle") {
924 c.idle = from_redis_value(v)?;
925 }
926 reply.consumers.push(c);
927 }
928
929 Ok(reply)
930 }
931}
932
933impl FromRedisValue for StreamInfoGroupsReply {
934 fn from_redis_value(v: &Value) -> RedisResult<Self> {
935 let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
936 let mut reply = StreamInfoGroupsReply::default();
937 for map in groups {
938 let mut g = StreamInfoGroup::default();
939 if let Some(v) = &map.get("name") {
940 g.name = from_redis_value(v)?;
941 }
942 if let Some(v) = &map.get("pending") {
943 g.pending = from_redis_value(v)?;
944 }
945 if let Some(v) = &map.get("consumers") {
946 g.consumers = from_redis_value(v)?;
947 }
948 if let Some(v) = &map.get("last-delivered-id") {
949 g.last_delivered_id = from_redis_value(v)?;
950 }
951 if let Some(v) = &map.get("entries-read") {
952 g.entries_read = if let Value::Nil = v {
953 None
954 } else {
955 Some(from_redis_value(v)?)
956 };
957 }
958 if let Some(v) = &map.get("lag") {
959 g.lag = if let Value::Nil = v {
960 None
961 } else {
962 Some(from_redis_value(v)?)
963 };
964 }
965 reply.groups.push(g);
966 }
967 Ok(reply)
968 }
969}
970
971#[cfg(test)]
972mod tests {
973 use super::*;
974
975 fn assert_command_eq(object: impl ToRedisArgs, expected: &[u8]) {
976 let mut out: Vec<Vec<u8>> = Vec::new();
977
978 object.write_redis_args(&mut out);
979
980 let mut cmd: Vec<u8> = Vec::new();
981
982 out.iter_mut().for_each(|item| {
983 cmd.append(item);
984 cmd.push(b' ');
985 });
986
987 cmd.pop();
988
989 assert_eq!(cmd, expected);
990 }
991
992 mod stream_auto_claim_reply {
993 use super::*;
994 use crate::Value;
995
996 #[test]
997 fn short_response() {
998 let value = Value::Array(vec![Value::BulkString("1713465536578-0".into())]);
999
1000 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1001
1002 assert!(reply.is_err());
1003 }
1004
1005 #[test]
1006 fn parses_none_claimed_response() {
1007 let value = Value::Array(vec![
1008 Value::BulkString("0-0".into()),
1009 Value::Array(vec![]),
1010 Value::Array(vec![]),
1011 ]);
1012
1013 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1014
1015 assert!(reply.is_ok());
1016
1017 let reply = reply.unwrap();
1018
1019 assert_eq!(reply.next_stream_id.as_str(), "0-0");
1020 assert_eq!(reply.claimed.len(), 0);
1021 assert_eq!(reply.deleted_ids.len(), 0);
1022 }
1023
1024 #[test]
1025 fn parses_response() {
1026 let value = Value::Array(vec![
1027 Value::BulkString("1713465536578-0".into()),
1028 Value::Array(vec![
1029 Value::Array(vec![
1030 Value::BulkString("1713465533411-0".into()),
1031 Value::Array(vec![
1033 Value::BulkString("name".into()),
1034 Value::BulkString("test".into()),
1035 Value::BulkString("other".into()),
1036 Value::BulkString("whaterver".into()),
1037 ]),
1038 ]),
1039 Value::Array(vec![
1040 Value::BulkString("1713465536069-0".into()),
1041 Value::Array(vec![
1042 Value::BulkString("name".into()),
1043 Value::BulkString("another test".into()),
1044 Value::BulkString("other".into()),
1045 Value::BulkString("something".into()),
1046 ]),
1047 ]),
1048 ]),
1049 Value::Array(vec![Value::BulkString("123456789-0".into())]),
1050 ]);
1051
1052 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1053
1054 assert!(reply.is_ok());
1055
1056 let reply = reply.unwrap();
1057
1058 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1059 assert_eq!(reply.claimed.len(), 2);
1060 assert_eq!(reply.claimed[0].id.as_str(), "1713465533411-0");
1061 assert!(
1062 matches!(reply.claimed[0].map.get("name"), Some(Value::BulkString(v)) if v == "test".as_bytes())
1063 );
1064 assert_eq!(reply.claimed[1].id.as_str(), "1713465536069-0");
1065 assert_eq!(reply.deleted_ids.len(), 1);
1066 assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1067 }
1068
1069 #[test]
1070 fn parses_v6_response() {
1071 let value = Value::Array(vec![
1072 Value::BulkString("1713465536578-0".into()),
1073 Value::Array(vec![
1074 Value::Array(vec![
1075 Value::BulkString("1713465533411-0".into()),
1076 Value::Array(vec![
1077 Value::BulkString("name".into()),
1078 Value::BulkString("test".into()),
1079 Value::BulkString("other".into()),
1080 Value::BulkString("whaterver".into()),
1081 ]),
1082 ]),
1083 Value::Array(vec![
1084 Value::BulkString("1713465536069-0".into()),
1085 Value::Array(vec![
1086 Value::BulkString("name".into()),
1087 Value::BulkString("another test".into()),
1088 Value::BulkString("other".into()),
1089 Value::BulkString("something".into()),
1090 ]),
1091 ]),
1092 ]),
1093 ]);
1095
1096 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1097
1098 assert!(reply.is_ok());
1099
1100 let reply = reply.unwrap();
1101
1102 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1103 assert_eq!(reply.claimed.len(), 2);
1104 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1105 assert!(ids.contains(&"1713465533411-0"));
1106 assert!(ids.contains(&"1713465536069-0"));
1107 assert_eq!(reply.deleted_ids.len(), 0);
1108 }
1109
1110 #[test]
1111 fn parses_justid_response() {
1112 let value = Value::Array(vec![
1113 Value::BulkString("1713465536578-0".into()),
1114 Value::Array(vec![
1115 Value::BulkString("1713465533411-0".into()),
1116 Value::BulkString("1713465536069-0".into()),
1117 ]),
1118 Value::Array(vec![Value::BulkString("123456789-0".into())]),
1119 ]);
1120
1121 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1122
1123 assert!(reply.is_ok());
1124
1125 let reply = reply.unwrap();
1126
1127 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1128 assert_eq!(reply.claimed.len(), 2);
1129 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1130 assert!(ids.contains(&"1713465533411-0"));
1131 assert!(ids.contains(&"1713465536069-0"));
1132 assert_eq!(reply.deleted_ids.len(), 1);
1133 assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1134 }
1135
1136 #[test]
1137 fn parses_v6_justid_response() {
1138 let value = Value::Array(vec![
1139 Value::BulkString("1713465536578-0".into()),
1140 Value::Array(vec![
1141 Value::BulkString("1713465533411-0".into()),
1142 Value::BulkString("1713465536069-0".into()),
1143 ]),
1144 ]);
1146
1147 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1148
1149 assert!(reply.is_ok());
1150
1151 let reply = reply.unwrap();
1152
1153 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1154 assert_eq!(reply.claimed.len(), 2);
1155 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1156 assert!(ids.contains(&"1713465533411-0"));
1157 assert!(ids.contains(&"1713465536069-0"));
1158 assert_eq!(reply.deleted_ids.len(), 0);
1159 }
1160 }
1161
1162 mod stream_trim_options {
1163 use super::*;
1164
1165 #[test]
1166 fn maxlen_trim() {
1167 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10);
1168
1169 assert_command_eq(options, b"MAXLEN ~ 10");
1170 }
1171
1172 #[test]
1173 fn maxlen_exact_trim() {
1174 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10);
1175
1176 assert_command_eq(options, b"MAXLEN = 10");
1177 }
1178
1179 #[test]
1180 fn maxlen_trim_limit() {
1181 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10).limit(5);
1182
1183 assert_command_eq(options, b"MAXLEN ~ 10 LIMIT 5");
1184 }
1185 #[test]
1186 fn minid_trim_limit() {
1187 let options = StreamTrimOptions::minid(StreamTrimmingMode::Exact, "123456-7").limit(5);
1188
1189 assert_command_eq(options, b"MINID = 123456-7 LIMIT 5");
1190 }
1191 }
1192
1193 mod stream_add_options {
1194 use super::*;
1195
1196 #[test]
1197 fn the_default() {
1198 let options = StreamAddOptions::default();
1199
1200 assert_command_eq(options, b"");
1201 }
1202
1203 #[test]
1204 fn with_maxlen_trim() {
1205 let options = StreamAddOptions::default()
1206 .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1207
1208 assert_command_eq(options, b"MAXLEN = 10");
1209 }
1210
1211 #[test]
1212 fn with_nomkstream() {
1213 let options = StreamAddOptions::default().nomkstream();
1214
1215 assert_command_eq(options, b"NOMKSTREAM");
1216 }
1217
1218 #[test]
1219 fn with_nomkstream_and_maxlen_trim() {
1220 let options = StreamAddOptions::default()
1221 .nomkstream()
1222 .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1223
1224 assert_command_eq(options, b"NOMKSTREAM MAXLEN = 10");
1225 }
1226 }
1227}