redis/
streams.rs

1//! Defines types to use with the streams commands.
2
3use crate::{
4    from_redis_value, types::HashMap, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs, Value,
5};
6
7use std::io::Error;
8
9macro_rules! invalid_type_error {
10    ($v:expr, $det:expr) => {{
11        fail!((
12            $crate::ErrorKind::TypeError,
13            "Response was of incompatible type",
14            format!("{:?} (response was {:?})", $det, $v)
15        ));
16    }};
17}
18
19// Stream Maxlen Enum
20
21/// Utility enum for passing `MAXLEN [= or ~] [COUNT]`
22/// arguments into `StreamCommands`.
23/// The enum value represents the count.
24#[derive(PartialEq, Eq, Clone, Debug, Copy)]
25pub enum StreamMaxlen {
26    /// Match an exact count
27    Equals(usize),
28    /// Match an approximate count
29    Approx(usize),
30}
31
32impl ToRedisArgs for StreamMaxlen {
33    fn write_redis_args<W>(&self, out: &mut W)
34    where
35        W: ?Sized + RedisWrite,
36    {
37        let (ch, val) = match *self {
38            StreamMaxlen::Equals(v) => ("=", v),
39            StreamMaxlen::Approx(v) => ("~", v),
40        };
41        out.write_arg(b"MAXLEN");
42        out.write_arg(ch.as_bytes());
43        val.write_redis_args(out);
44    }
45}
46
47/// Utility enum for passing the trim mode`[=|~]`
48/// arguments into `StreamCommands`.
49#[derive(Debug)]
50pub enum StreamTrimmingMode {
51    /// Match an exact count
52    Exact,
53    /// Match an approximate count
54    Approx,
55}
56
57impl ToRedisArgs for StreamTrimmingMode {
58    fn write_redis_args<W>(&self, out: &mut W)
59    where
60        W: ?Sized + RedisWrite,
61    {
62        match self {
63            Self::Exact => out.write_arg(b"="),
64            Self::Approx => out.write_arg(b"~"),
65        };
66    }
67}
68
69/// Utility enum for passing `<MAXLEN|MINID> [=|~] threshold [LIMIT count]`
70/// arguments into `StreamCommands`.
71/// The enum values the trimming mode (=|~), the threshold, and the optional limit
72#[derive(Debug)]
73pub enum StreamTrimStrategy {
74    /// Evicts entries as long as the streams length exceeds threshold.  With an optional limit.
75    MaxLen(StreamTrimmingMode, usize, Option<usize>),
76    /// Evicts entries with IDs lower than threshold, where threshold is a stream ID With an optional limit.
77    MinId(StreamTrimmingMode, String, Option<usize>),
78}
79
80impl StreamTrimStrategy {
81    /// Define a MAXLEN trim strategy with the given maximum number of entries
82    pub fn maxlen(trim: StreamTrimmingMode, max_entries: usize) -> Self {
83        Self::MaxLen(trim, max_entries, None)
84    }
85
86    /// Defines a MINID trim strategy with the given minimum stream ID
87    pub fn minid(trim: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
88        Self::MinId(trim, stream_id.into(), None)
89    }
90
91    /// Set a limit to the number of records to trim in a single operation
92    pub fn limit(self, limit: usize) -> Self {
93        match self {
94            StreamTrimStrategy::MaxLen(m, t, _) => StreamTrimStrategy::MaxLen(m, t, Some(limit)),
95            StreamTrimStrategy::MinId(m, t, _) => StreamTrimStrategy::MinId(m, t, Some(limit)),
96        }
97    }
98}
99
100impl ToRedisArgs for StreamTrimStrategy {
101    fn write_redis_args<W>(&self, out: &mut W)
102    where
103        W: ?Sized + RedisWrite,
104    {
105        let limit = match self {
106            StreamTrimStrategy::MaxLen(m, t, limit) => {
107                out.write_arg(b"MAXLEN");
108                m.write_redis_args(out);
109                t.write_redis_args(out);
110                limit
111            }
112            StreamTrimStrategy::MinId(m, t, limit) => {
113                out.write_arg(b"MINID");
114                m.write_redis_args(out);
115                t.write_redis_args(out);
116                limit
117            }
118        };
119        if let Some(limit) = limit {
120            out.write_arg(b"LIMIT");
121            limit.write_redis_args(out);
122        }
123    }
124}
125
126/// Builder options for [`xtrim_options`] command
127///
128/// [`xtrim_options`]: ../trait.Commands.html#method.xtrim_options
129///
130#[derive(Debug)]
131pub struct StreamTrimOptions {
132    strategy: StreamTrimStrategy,
133}
134
135impl StreamTrimOptions {
136    /// Define a MAXLEN trim strategy with the given maximum number of entries
137    pub fn maxlen(mode: StreamTrimmingMode, max_entries: usize) -> Self {
138        Self {
139            strategy: StreamTrimStrategy::maxlen(mode, max_entries),
140        }
141    }
142
143    /// Defines a MINID trim strategy with the given minimum stream ID
144    pub fn minid(mode: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
145        Self {
146            strategy: StreamTrimStrategy::minid(mode, stream_id),
147        }
148    }
149
150    /// Set a limit to the number of records to trim in a single operation
151    pub fn limit(mut self, limit: usize) -> Self {
152        self.strategy = self.strategy.limit(limit);
153        self
154    }
155}
156
157impl ToRedisArgs for StreamTrimOptions {
158    fn write_redis_args<W>(&self, out: &mut W)
159    where
160        W: ?Sized + RedisWrite,
161    {
162        self.strategy.write_redis_args(out);
163    }
164}
165
166/// Builder options for [`xadd_options`] command
167///
168/// [`xadd_options`]: ../trait.Commands.html#method.xadd_options
169///
170#[derive(Default, Debug)]
171pub struct StreamAddOptions {
172    nomkstream: bool,
173    trim: Option<StreamTrimStrategy>,
174}
175
176impl StreamAddOptions {
177    /// Set the NOMKSTREAM flag on which prevents creating a stream for the XADD operation
178    pub fn nomkstream(mut self) -> Self {
179        self.nomkstream = true;
180        self
181    }
182
183    /// Enable trimming when adding using the given trim strategy
184    pub fn trim(mut self, trim: StreamTrimStrategy) -> Self {
185        self.trim = Some(trim);
186        self
187    }
188}
189
190impl ToRedisArgs for StreamAddOptions {
191    fn write_redis_args<W>(&self, out: &mut W)
192    where
193        W: ?Sized + RedisWrite,
194    {
195        if self.nomkstream {
196            out.write_arg(b"NOMKSTREAM");
197        }
198        if let Some(strategy) = self.trim.as_ref() {
199            strategy.write_redis_args(out);
200        }
201    }
202}
203
204/// Builder options for [`xautoclaim_options`] command.
205///
206/// [`xautoclaim_options`]: ../trait.Commands.html#method.xautoclaim_options
207///
208#[derive(Default, Debug)]
209pub struct StreamAutoClaimOptions {
210    count: Option<usize>,
211    justid: bool,
212}
213
214impl StreamAutoClaimOptions {
215    /// Sets the maximum number of elements to claim per stream.
216    pub fn count(mut self, n: usize) -> Self {
217        self.count = Some(n);
218        self
219    }
220
221    /// Set `JUSTID` cmd arg to true. Be advised: the response
222    /// type changes with this option.
223    pub fn with_justid(mut self) -> Self {
224        self.justid = true;
225        self
226    }
227}
228
229impl ToRedisArgs for StreamAutoClaimOptions {
230    fn write_redis_args<W>(&self, out: &mut W)
231    where
232        W: ?Sized + RedisWrite,
233    {
234        if let Some(ref count) = self.count {
235            out.write_arg(b"COUNT");
236            out.write_arg(format!("{count}").as_bytes());
237        }
238        if self.justid {
239            out.write_arg(b"JUSTID");
240        }
241    }
242}
243
244/// Builder options for [`xclaim_options`] command.
245///
246/// [`xclaim_options`]: ../trait.Commands.html#method.xclaim_options
247///
248#[derive(Default, Debug)]
249pub struct StreamClaimOptions {
250    /// Set `IDLE <milliseconds>` cmd arg.
251    idle: Option<usize>,
252    /// Set `TIME <Unix epoch milliseconds>` cmd arg.
253    time: Option<usize>,
254    /// Set `RETRYCOUNT <count>` cmd arg.
255    retry: Option<usize>,
256    /// Set `FORCE` cmd arg.
257    force: bool,
258    /// Set `JUSTID` cmd arg. Be advised: the response
259    /// type changes with this option.
260    justid: bool,
261    /// Set `LASTID <lastid>` cmd arg.
262    lastid: Option<String>,
263}
264
265impl StreamClaimOptions {
266    /// Set `IDLE <milliseconds>` cmd arg.
267    pub fn idle(mut self, ms: usize) -> Self {
268        self.idle = Some(ms);
269        self
270    }
271
272    /// Set `TIME <Unix epoch milliseconds>` cmd arg.
273    pub fn time(mut self, ms_time: usize) -> Self {
274        self.time = Some(ms_time);
275        self
276    }
277
278    /// Set `RETRYCOUNT <count>` cmd arg.
279    pub fn retry(mut self, count: usize) -> Self {
280        self.retry = Some(count);
281        self
282    }
283
284    /// Set `FORCE` cmd arg to true.
285    pub fn with_force(mut self) -> Self {
286        self.force = true;
287        self
288    }
289
290    /// Set `JUSTID` cmd arg to true. Be advised: the response
291    /// type changes with this option.
292    pub fn with_justid(mut self) -> Self {
293        self.justid = true;
294        self
295    }
296
297    /// Set `LASTID <lastid>` cmd arg.
298    pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
299        self.lastid = Some(lastid.into());
300        self
301    }
302}
303
304impl ToRedisArgs for StreamClaimOptions {
305    fn write_redis_args<W>(&self, out: &mut W)
306    where
307        W: ?Sized + RedisWrite,
308    {
309        if let Some(ref ms) = self.idle {
310            out.write_arg(b"IDLE");
311            out.write_arg(format!("{ms}").as_bytes());
312        }
313        if let Some(ref ms_time) = self.time {
314            out.write_arg(b"TIME");
315            out.write_arg(format!("{ms_time}").as_bytes());
316        }
317        if let Some(ref count) = self.retry {
318            out.write_arg(b"RETRYCOUNT");
319            out.write_arg(format!("{count}").as_bytes());
320        }
321        if self.force {
322            out.write_arg(b"FORCE");
323        }
324        if self.justid {
325            out.write_arg(b"JUSTID");
326        }
327        if let Some(ref lastid) = self.lastid {
328            out.write_arg(b"LASTID");
329            lastid.write_redis_args(out);
330        }
331    }
332}
333
334/// Argument to `StreamReadOptions`
335/// Represents the Redis `GROUP <groupname> <consumername>` cmd arg.
336/// This option will toggle the cmd from `XREAD` to `XREADGROUP`
337type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
338/// Builder options for [`xread_options`] command.
339///
340/// [`xread_options`]: ../trait.Commands.html#method.xread_options
341///
342#[derive(Default, Debug)]
343pub struct StreamReadOptions {
344    /// Set the `BLOCK <milliseconds>` cmd arg.
345    block: Option<usize>,
346    /// Set the `COUNT <count>` cmd arg.
347    count: Option<usize>,
348    /// Set the `NOACK` cmd arg.
349    noack: Option<bool>,
350    /// Set the `GROUP <groupname> <consumername>` cmd arg.
351    /// This option will toggle the cmd from XREAD to XREADGROUP.
352    group: SRGroup,
353}
354
355impl StreamReadOptions {
356    /// Indicates whether the command is participating in a group
357    /// and generating ACKs
358    pub fn read_only(&self) -> bool {
359        self.group.is_none()
360    }
361
362    /// Sets the command so that it avoids adding the message
363    /// to the PEL in cases where reliability is not a requirement
364    /// and the occasional message loss is acceptable.
365    pub fn noack(mut self) -> Self {
366        self.noack = Some(true);
367        self
368    }
369
370    /// Sets the block time in milliseconds.
371    pub fn block(mut self, ms: usize) -> Self {
372        self.block = Some(ms);
373        self
374    }
375
376    /// Sets the maximum number of elements to return per stream.
377    pub fn count(mut self, n: usize) -> Self {
378        self.count = Some(n);
379        self
380    }
381
382    /// Sets the name of a consumer group associated to the stream.
383    pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
384        mut self,
385        group_name: GN,
386        consumer_name: CN,
387    ) -> Self {
388        self.group = Some((
389            ToRedisArgs::to_redis_args(&group_name),
390            ToRedisArgs::to_redis_args(&consumer_name),
391        ));
392        self
393    }
394}
395
396impl ToRedisArgs for StreamReadOptions {
397    fn write_redis_args<W>(&self, out: &mut W)
398    where
399        W: ?Sized + RedisWrite,
400    {
401        if let Some(ref group) = self.group {
402            out.write_arg(b"GROUP");
403            for i in &group.0 {
404                out.write_arg(i);
405            }
406            for i in &group.1 {
407                out.write_arg(i);
408            }
409        }
410
411        if let Some(ref ms) = self.block {
412            out.write_arg(b"BLOCK");
413            out.write_arg(format!("{ms}").as_bytes());
414        }
415
416        if let Some(ref n) = self.count {
417            out.write_arg(b"COUNT");
418            out.write_arg(format!("{n}").as_bytes());
419        }
420
421        if self.group.is_some() {
422            // noack is only available w/ xreadgroup
423            if self.noack == Some(true) {
424                out.write_arg(b"NOACK");
425            }
426        }
427    }
428}
429
430/// Reply type used with the [`xautoclaim_options`] command.
431///
432/// [`xautoclaim_options`]: ../trait.Commands.html#method.xautoclaim_options
433///
434#[derive(Default, Debug, Clone)]
435pub struct StreamAutoClaimReply {
436    /// The next stream id to use as the start argument for the next xautoclaim
437    pub next_stream_id: String,
438    /// The entries claimed for the consumer. When JUSTID is enabled the map in each entry is blank
439    pub claimed: Vec<StreamId>,
440    /// The list of stream ids that were removed due to no longer being in the stream
441    pub deleted_ids: Vec<String>,
442}
443
444/// Reply type used with [`xread`] or [`xread_options`] commands.
445///
446/// [`xread`]: ../trait.Commands.html#method.xread
447/// [`xread_options`]: ../trait.Commands.html#method.xread_options
448///
449#[derive(Default, Debug, Clone)]
450pub struct StreamReadReply {
451    /// Complex data structure containing a payload for each key in this array
452    pub keys: Vec<StreamKey>,
453}
454
455/// Reply type used with [`xrange`], [`xrange_count`], [`xrange_all`], [`xrevrange`], [`xrevrange_count`], [`xrevrange_all`] commands.
456///
457/// Represents stream entries matching a given range of `id`'s.
458///
459/// [`xrange`]: ../trait.Commands.html#method.xrange
460/// [`xrange_count`]: ../trait.Commands.html#method.xrange_count
461/// [`xrange_all`]: ../trait.Commands.html#method.xrange_all
462/// [`xrevrange`]: ../trait.Commands.html#method.xrevrange
463/// [`xrevrange_count`]: ../trait.Commands.html#method.xrevrange_count
464/// [`xrevrange_all`]: ../trait.Commands.html#method.xrevrange_all
465///
466#[derive(Default, Debug, Clone)]
467pub struct StreamRangeReply {
468    /// Complex data structure containing a payload for each ID in this array
469    pub ids: Vec<StreamId>,
470}
471
472/// Reply type used with [`xclaim`] command.
473///
474/// Represents that ownership of the specified messages was changed.
475///
476/// [`xclaim`]: ../trait.Commands.html#method.xclaim
477///
478#[derive(Default, Debug, Clone)]
479pub struct StreamClaimReply {
480    /// Complex data structure containing a payload for each ID in this array
481    pub ids: Vec<StreamId>,
482}
483
484/// Reply type used with [`xpending`] command.
485///
486/// Data returned here were fetched from the stream without
487/// having been acknowledged.
488///
489/// [`xpending`]: ../trait.Commands.html#method.xpending
490///
491#[derive(Debug, Clone, Default)]
492pub enum StreamPendingReply {
493    /// The stream is empty.
494    #[default]
495    Empty,
496    /// Data with payload exists in the stream.
497    Data(StreamPendingData),
498}
499
500impl StreamPendingReply {
501    /// Returns how many records are in the reply.
502    pub fn count(&self) -> usize {
503        match self {
504            StreamPendingReply::Empty => 0,
505            StreamPendingReply::Data(x) => x.count,
506        }
507    }
508}
509
510/// Inner reply type when an [`xpending`] command has data.
511///
512/// [`xpending`]: ../trait.Commands.html#method.xpending
513#[derive(Default, Debug, Clone)]
514pub struct StreamPendingData {
515    /// Limit on the number of messages to return per call.
516    pub count: usize,
517    /// ID for the first pending record.
518    pub start_id: String,
519    /// ID for the final pending record.
520    pub end_id: String,
521    /// Every consumer in the consumer group with at
522    /// least one pending message,
523    /// and the number of pending messages it has.
524    pub consumers: Vec<StreamInfoConsumer>,
525}
526
527/// Reply type used with [`xpending_count`] and
528/// [`xpending_consumer_count`] commands.
529///
530/// Data returned here have been fetched from the stream without
531/// any acknowledgement.
532///
533/// [`xpending_count`]: ../trait.Commands.html#method.xpending_count
534/// [`xpending_consumer_count`]: ../trait.Commands.html#method.xpending_consumer_count
535///
536#[derive(Default, Debug, Clone)]
537pub struct StreamPendingCountReply {
538    /// An array of structs containing information about
539    /// message IDs yet to be acknowledged by various consumers,
540    /// time since last ack, and total number of acks by that consumer.
541    pub ids: Vec<StreamPendingId>,
542}
543
544/// Reply type used with [`xinfo_stream`] command, containing
545/// general information about the stream stored at the specified key.
546///
547/// The very first and last IDs in the stream are shown,
548/// in order to give some sense about what is the stream content.
549///
550/// [`xinfo_stream`]: ../trait.Commands.html#method.xinfo_stream
551///
552#[derive(Default, Debug, Clone)]
553pub struct StreamInfoStreamReply {
554    /// The last generated ID that may not be the same as the last
555    /// entry ID in case some entry was deleted.
556    pub last_generated_id: String,
557    /// Details about the radix tree representing the stream mostly
558    /// useful for optimization and debugging tasks.
559    pub radix_tree_keys: usize,
560    /// The number of consumer groups associated with the stream.
561    pub groups: usize,
562    /// Number of elements of the stream.
563    pub length: usize,
564    /// The very first entry in the stream.
565    pub first_entry: StreamId,
566    /// The very last entry in the stream.
567    pub last_entry: StreamId,
568}
569
570/// Reply type used with [`xinfo_consumer`] command, an array of every
571/// consumer in a specific consumer group.
572///
573/// [`xinfo_consumer`]: ../trait.Commands.html#method.xinfo_consumer
574///
575#[derive(Default, Debug, Clone)]
576pub struct StreamInfoConsumersReply {
577    /// An array of every consumer in a specific consumer group.
578    pub consumers: Vec<StreamInfoConsumer>,
579}
580
581/// Reply type used with [`xinfo_groups`] command.
582///
583/// This output represents all the consumer groups associated with
584/// the stream.
585///
586/// [`xinfo_groups`]: ../trait.Commands.html#method.xinfo_groups
587///
588#[derive(Default, Debug, Clone)]
589pub struct StreamInfoGroupsReply {
590    /// All the consumer groups associated with the stream.
591    pub groups: Vec<StreamInfoGroup>,
592}
593
594/// A consumer parsed from [`xinfo_consumers`] command.
595///
596/// [`xinfo_consumers`]: ../trait.Commands.html#method.xinfo_consumers
597///
598#[derive(Default, Debug, Clone)]
599pub struct StreamInfoConsumer {
600    /// Name of the consumer group.
601    pub name: String,
602    /// Number of pending messages for this specific consumer.
603    pub pending: usize,
604    /// This consumer's idle time in milliseconds.
605    pub idle: usize,
606}
607
608/// A group parsed from [`xinfo_groups`] command.
609///
610/// [`xinfo_groups`]: ../trait.Commands.html#method.xinfo_groups
611///
612#[derive(Default, Debug, Clone)]
613pub struct StreamInfoGroup {
614    /// The group name.
615    pub name: String,
616    /// Number of consumers known in the group.
617    pub consumers: usize,
618    /// Number of pending messages (delivered but not yet acknowledged) in the group.
619    pub pending: usize,
620    /// Last ID delivered to this group.
621    pub last_delivered_id: String,
622    /// The logical "read counter" of the last entry delivered to group's consumers
623    /// (or `None` if the server does not provide the value).
624    pub entries_read: Option<usize>,
625    /// The number of entries in the stream that are still waiting to be delivered to the
626    /// group's consumers, or a `None` when that number can't be determined.
627    pub lag: Option<usize>,
628}
629
630/// Represents a pending message parsed from [`xpending`] methods.
631///
632/// [`xpending`]: ../trait.Commands.html#method.xpending
633#[derive(Default, Debug, Clone)]
634pub struct StreamPendingId {
635    /// The ID of the message.
636    pub id: String,
637    /// The name of the consumer that fetched the message and has
638    /// still to acknowledge it. We call it the current owner
639    /// of the message.
640    pub consumer: String,
641    /// The number of milliseconds that elapsed since the
642    /// last time this message was delivered to this consumer.
643    pub last_delivered_ms: usize,
644    /// The number of times this message was delivered.
645    pub times_delivered: usize,
646}
647
648/// Represents a stream `key` and its `id`'s parsed from `xread` methods.
649#[derive(Default, Debug, Clone)]
650pub struct StreamKey {
651    /// The stream `key`.
652    pub key: String,
653    /// The parsed stream `id`'s.
654    pub ids: Vec<StreamId>,
655}
656
657/// Represents a stream `id` and its field/values as a `HashMap`
658#[derive(Default, Debug, Clone)]
659pub struct StreamId {
660    /// The stream `id` (entry ID) of this particular message.
661    pub id: String,
662    /// All fields in this message, associated with their respective values.
663    pub map: HashMap<String, Value>,
664}
665
666impl StreamId {
667    /// Converts a `Value::Array` into a `StreamId`.
668    fn from_array_value(v: &Value) -> RedisResult<Self> {
669        let mut stream_id = StreamId::default();
670        if let Value::Array(ref values) = *v {
671            if let Some(v) = values.first() {
672                stream_id.id = from_redis_value(v)?;
673            }
674            if let Some(v) = values.get(1) {
675                stream_id.map = from_redis_value(v)?;
676            }
677        }
678
679        Ok(stream_id)
680    }
681
682    /// Fetches value of a given field and converts it to the specified
683    /// type.
684    pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
685        match self.map.get(key) {
686            Some(x) => from_redis_value(x).ok(),
687            None => None,
688        }
689    }
690
691    /// Does the message contain a particular field?
692    pub fn contains_key(&self, key: &str) -> bool {
693        self.map.contains_key(key)
694    }
695
696    /// Returns how many field/value pairs exist in this message.
697    pub fn len(&self) -> usize {
698        self.map.len()
699    }
700
701    /// Returns true if there are no field/value pairs in this message.
702    pub fn is_empty(&self) -> bool {
703        self.len() == 0
704    }
705}
706
707type SACRows = Vec<HashMap<String, HashMap<String, Value>>>;
708
709impl FromRedisValue for StreamAutoClaimReply {
710    fn from_redis_value(v: &Value) -> RedisResult<Self> {
711        match *v {
712            Value::Array(ref items) => {
713                if let 2..=3 = items.len() {
714                    let deleted_ids = if let Some(o) = items.get(2) {
715                        from_redis_value(o)?
716                    } else {
717                        Vec::new()
718                    };
719
720                    let claimed: Vec<StreamId> = match &items[1] {
721                        // JUSTID response
722                        Value::Array(x)
723                            if matches!(x.first(), None | Some(Value::BulkString(_))) =>
724                        {
725                            let ids: Vec<String> = from_redis_value(&items[1])?;
726
727                            ids.into_iter()
728                                .map(|id| StreamId {
729                                    id,
730                                    ..Default::default()
731                                })
732                                .collect()
733                        }
734                        // full response
735                        Value::Array(x) if matches!(x.first(), Some(Value::Array(_))) => {
736                            let rows: SACRows = from_redis_value(&items[1])?;
737
738                            rows.into_iter()
739                                .flat_map(|id_row| {
740                                    id_row.into_iter().map(|(id, map)| StreamId { id, map })
741                                })
742                                .collect()
743                        }
744                        _ => invalid_type_error!("Incorrect type", &items[1]),
745                    };
746
747                    Ok(Self {
748                        next_stream_id: from_redis_value(&items[0])?,
749                        claimed,
750                        deleted_ids,
751                    })
752                } else {
753                    invalid_type_error!("Wrong number of entries in array response", v)
754                }
755            }
756            _ => invalid_type_error!("Not a array response", v),
757        }
758    }
759}
760
761type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
762impl FromRedisValue for StreamReadReply {
763    fn from_redis_value(v: &Value) -> RedisResult<Self> {
764        let rows: SRRows = from_redis_value(v)?;
765        let keys = rows
766            .into_iter()
767            .flat_map(|row| {
768                row.into_iter().map(|(key, entry)| {
769                    let ids = entry
770                        .into_iter()
771                        .flat_map(|id_row| id_row.into_iter().map(|(id, map)| StreamId { id, map }))
772                        .collect();
773                    StreamKey { key, ids }
774                })
775            })
776            .collect();
777        Ok(StreamReadReply { keys })
778    }
779}
780
781impl FromRedisValue for StreamRangeReply {
782    fn from_redis_value(v: &Value) -> RedisResult<Self> {
783        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
784        let ids: Vec<StreamId> = rows
785            .into_iter()
786            .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
787            .collect();
788        Ok(StreamRangeReply { ids })
789    }
790}
791
792impl FromRedisValue for StreamClaimReply {
793    fn from_redis_value(v: &Value) -> RedisResult<Self> {
794        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
795        let ids: Vec<StreamId> = rows
796            .into_iter()
797            .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
798            .collect();
799        Ok(StreamClaimReply { ids })
800    }
801}
802
803type SPRInner = (
804    usize,
805    Option<String>,
806    Option<String>,
807    Vec<Option<(String, String)>>,
808);
809impl FromRedisValue for StreamPendingReply {
810    fn from_redis_value(v: &Value) -> RedisResult<Self> {
811        let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
812
813        if count == 0 {
814            Ok(StreamPendingReply::Empty)
815        } else {
816            let mut result = StreamPendingData::default();
817
818            let start_id = start
819                .ok_or_else(|| Error::other("IllegalState: Non-zero pending expects start id"))?;
820
821            let end_id =
822                end.ok_or_else(|| Error::other("IllegalState: Non-zero pending expects end id"))?;
823
824            result.count = count;
825            result.start_id = start_id;
826            result.end_id = end_id;
827
828            result.consumers = consumer_data
829                .into_iter()
830                .flatten()
831                .map(|(name, pending)| StreamInfoConsumer {
832                    name,
833                    pending: pending.parse().unwrap_or_default(),
834                    ..Default::default()
835                })
836                .collect();
837
838            Ok(StreamPendingReply::Data(result))
839        }
840    }
841}
842
843impl FromRedisValue for StreamPendingCountReply {
844    fn from_redis_value(v: &Value) -> RedisResult<Self> {
845        let mut reply = StreamPendingCountReply::default();
846        match v {
847            Value::Array(outer_tuple) => {
848                for outer in outer_tuple {
849                    match outer {
850                        Value::Array(inner_tuple) => match &inner_tuple[..] {
851                            [Value::BulkString(id_bytes), Value::BulkString(consumer_bytes), Value::Int(last_delivered_ms_u64), Value::Int(times_delivered_u64)] =>
852                            {
853                                let id = String::from_utf8(id_bytes.to_vec())?;
854                                let consumer = String::from_utf8(consumer_bytes.to_vec())?;
855                                let last_delivered_ms = *last_delivered_ms_u64 as usize;
856                                let times_delivered = *times_delivered_u64 as usize;
857                                reply.ids.push(StreamPendingId {
858                                    id,
859                                    consumer,
860                                    last_delivered_ms,
861                                    times_delivered,
862                                });
863                            }
864                            _ => fail!((
865                                crate::types::ErrorKind::TypeError,
866                                "Cannot parse redis data (3)"
867                            )),
868                        },
869                        _ => fail!((
870                            crate::types::ErrorKind::TypeError,
871                            "Cannot parse redis data (2)"
872                        )),
873                    }
874                }
875            }
876            _ => fail!((
877                crate::types::ErrorKind::TypeError,
878                "Cannot parse redis data (1)"
879            )),
880        };
881        Ok(reply)
882    }
883}
884
885impl FromRedisValue for StreamInfoStreamReply {
886    fn from_redis_value(v: &Value) -> RedisResult<Self> {
887        let map: HashMap<String, Value> = from_redis_value(v)?;
888        let mut reply = StreamInfoStreamReply::default();
889        if let Some(v) = &map.get("last-generated-id") {
890            reply.last_generated_id = from_redis_value(v)?;
891        }
892        if let Some(v) = &map.get("radix-tree-nodes") {
893            reply.radix_tree_keys = from_redis_value(v)?;
894        }
895        if let Some(v) = &map.get("groups") {
896            reply.groups = from_redis_value(v)?;
897        }
898        if let Some(v) = &map.get("length") {
899            reply.length = from_redis_value(v)?;
900        }
901        if let Some(v) = &map.get("first-entry") {
902            reply.first_entry = StreamId::from_array_value(v)?;
903        }
904        if let Some(v) = &map.get("last-entry") {
905            reply.last_entry = StreamId::from_array_value(v)?;
906        }
907        Ok(reply)
908    }
909}
910
911impl FromRedisValue for StreamInfoConsumersReply {
912    fn from_redis_value(v: &Value) -> RedisResult<Self> {
913        let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
914        let mut reply = StreamInfoConsumersReply::default();
915        for map in consumers {
916            let mut c = StreamInfoConsumer::default();
917            if let Some(v) = &map.get("name") {
918                c.name = from_redis_value(v)?;
919            }
920            if let Some(v) = &map.get("pending") {
921                c.pending = from_redis_value(v)?;
922            }
923            if let Some(v) = &map.get("idle") {
924                c.idle = from_redis_value(v)?;
925            }
926            reply.consumers.push(c);
927        }
928
929        Ok(reply)
930    }
931}
932
933impl FromRedisValue for StreamInfoGroupsReply {
934    fn from_redis_value(v: &Value) -> RedisResult<Self> {
935        let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
936        let mut reply = StreamInfoGroupsReply::default();
937        for map in groups {
938            let mut g = StreamInfoGroup::default();
939            if let Some(v) = &map.get("name") {
940                g.name = from_redis_value(v)?;
941            }
942            if let Some(v) = &map.get("pending") {
943                g.pending = from_redis_value(v)?;
944            }
945            if let Some(v) = &map.get("consumers") {
946                g.consumers = from_redis_value(v)?;
947            }
948            if let Some(v) = &map.get("last-delivered-id") {
949                g.last_delivered_id = from_redis_value(v)?;
950            }
951            if let Some(v) = &map.get("entries-read") {
952                g.entries_read = if let Value::Nil = v {
953                    None
954                } else {
955                    Some(from_redis_value(v)?)
956                };
957            }
958            if let Some(v) = &map.get("lag") {
959                g.lag = if let Value::Nil = v {
960                    None
961                } else {
962                    Some(from_redis_value(v)?)
963                };
964            }
965            reply.groups.push(g);
966        }
967        Ok(reply)
968    }
969}
970
971#[cfg(test)]
972mod tests {
973    use super::*;
974
975    fn assert_command_eq(object: impl ToRedisArgs, expected: &[u8]) {
976        let mut out: Vec<Vec<u8>> = Vec::new();
977
978        object.write_redis_args(&mut out);
979
980        let mut cmd: Vec<u8> = Vec::new();
981
982        out.iter_mut().for_each(|item| {
983            cmd.append(item);
984            cmd.push(b' ');
985        });
986
987        cmd.pop();
988
989        assert_eq!(cmd, expected);
990    }
991
992    mod stream_auto_claim_reply {
993        use super::*;
994        use crate::Value;
995
996        #[test]
997        fn short_response() {
998            let value = Value::Array(vec![Value::BulkString("1713465536578-0".into())]);
999
1000            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1001
1002            assert!(reply.is_err());
1003        }
1004
1005        #[test]
1006        fn parses_none_claimed_response() {
1007            let value = Value::Array(vec![
1008                Value::BulkString("0-0".into()),
1009                Value::Array(vec![]),
1010                Value::Array(vec![]),
1011            ]);
1012
1013            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1014
1015            assert!(reply.is_ok());
1016
1017            let reply = reply.unwrap();
1018
1019            assert_eq!(reply.next_stream_id.as_str(), "0-0");
1020            assert_eq!(reply.claimed.len(), 0);
1021            assert_eq!(reply.deleted_ids.len(), 0);
1022        }
1023
1024        #[test]
1025        fn parses_response() {
1026            let value = Value::Array(vec![
1027                Value::BulkString("1713465536578-0".into()),
1028                Value::Array(vec![
1029                    Value::Array(vec![
1030                        Value::BulkString("1713465533411-0".into()),
1031                        // Both RESP2 and RESP3 expose this map as an array of key/values
1032                        Value::Array(vec![
1033                            Value::BulkString("name".into()),
1034                            Value::BulkString("test".into()),
1035                            Value::BulkString("other".into()),
1036                            Value::BulkString("whaterver".into()),
1037                        ]),
1038                    ]),
1039                    Value::Array(vec![
1040                        Value::BulkString("1713465536069-0".into()),
1041                        Value::Array(vec![
1042                            Value::BulkString("name".into()),
1043                            Value::BulkString("another test".into()),
1044                            Value::BulkString("other".into()),
1045                            Value::BulkString("something".into()),
1046                        ]),
1047                    ]),
1048                ]),
1049                Value::Array(vec![Value::BulkString("123456789-0".into())]),
1050            ]);
1051
1052            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1053
1054            assert!(reply.is_ok());
1055
1056            let reply = reply.unwrap();
1057
1058            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1059            assert_eq!(reply.claimed.len(), 2);
1060            assert_eq!(reply.claimed[0].id.as_str(), "1713465533411-0");
1061            assert!(
1062                matches!(reply.claimed[0].map.get("name"), Some(Value::BulkString(v)) if v == "test".as_bytes())
1063            );
1064            assert_eq!(reply.claimed[1].id.as_str(), "1713465536069-0");
1065            assert_eq!(reply.deleted_ids.len(), 1);
1066            assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1067        }
1068
1069        #[test]
1070        fn parses_v6_response() {
1071            let value = Value::Array(vec![
1072                Value::BulkString("1713465536578-0".into()),
1073                Value::Array(vec![
1074                    Value::Array(vec![
1075                        Value::BulkString("1713465533411-0".into()),
1076                        Value::Array(vec![
1077                            Value::BulkString("name".into()),
1078                            Value::BulkString("test".into()),
1079                            Value::BulkString("other".into()),
1080                            Value::BulkString("whaterver".into()),
1081                        ]),
1082                    ]),
1083                    Value::Array(vec![
1084                        Value::BulkString("1713465536069-0".into()),
1085                        Value::Array(vec![
1086                            Value::BulkString("name".into()),
1087                            Value::BulkString("another test".into()),
1088                            Value::BulkString("other".into()),
1089                            Value::BulkString("something".into()),
1090                        ]),
1091                    ]),
1092                ]),
1093                // V6 and lower lack the deleted_ids array
1094            ]);
1095
1096            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1097
1098            assert!(reply.is_ok());
1099
1100            let reply = reply.unwrap();
1101
1102            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1103            assert_eq!(reply.claimed.len(), 2);
1104            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1105            assert!(ids.contains(&"1713465533411-0"));
1106            assert!(ids.contains(&"1713465536069-0"));
1107            assert_eq!(reply.deleted_ids.len(), 0);
1108        }
1109
1110        #[test]
1111        fn parses_justid_response() {
1112            let value = Value::Array(vec![
1113                Value::BulkString("1713465536578-0".into()),
1114                Value::Array(vec![
1115                    Value::BulkString("1713465533411-0".into()),
1116                    Value::BulkString("1713465536069-0".into()),
1117                ]),
1118                Value::Array(vec![Value::BulkString("123456789-0".into())]),
1119            ]);
1120
1121            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1122
1123            assert!(reply.is_ok());
1124
1125            let reply = reply.unwrap();
1126
1127            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1128            assert_eq!(reply.claimed.len(), 2);
1129            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1130            assert!(ids.contains(&"1713465533411-0"));
1131            assert!(ids.contains(&"1713465536069-0"));
1132            assert_eq!(reply.deleted_ids.len(), 1);
1133            assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1134        }
1135
1136        #[test]
1137        fn parses_v6_justid_response() {
1138            let value = Value::Array(vec![
1139                Value::BulkString("1713465536578-0".into()),
1140                Value::Array(vec![
1141                    Value::BulkString("1713465533411-0".into()),
1142                    Value::BulkString("1713465536069-0".into()),
1143                ]),
1144                // V6 and lower lack the deleted_ids array
1145            ]);
1146
1147            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1148
1149            assert!(reply.is_ok());
1150
1151            let reply = reply.unwrap();
1152
1153            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1154            assert_eq!(reply.claimed.len(), 2);
1155            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1156            assert!(ids.contains(&"1713465533411-0"));
1157            assert!(ids.contains(&"1713465536069-0"));
1158            assert_eq!(reply.deleted_ids.len(), 0);
1159        }
1160    }
1161
1162    mod stream_trim_options {
1163        use super::*;
1164
1165        #[test]
1166        fn maxlen_trim() {
1167            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10);
1168
1169            assert_command_eq(options, b"MAXLEN ~ 10");
1170        }
1171
1172        #[test]
1173        fn maxlen_exact_trim() {
1174            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10);
1175
1176            assert_command_eq(options, b"MAXLEN = 10");
1177        }
1178
1179        #[test]
1180        fn maxlen_trim_limit() {
1181            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10).limit(5);
1182
1183            assert_command_eq(options, b"MAXLEN ~ 10 LIMIT 5");
1184        }
1185        #[test]
1186        fn minid_trim_limit() {
1187            let options = StreamTrimOptions::minid(StreamTrimmingMode::Exact, "123456-7").limit(5);
1188
1189            assert_command_eq(options, b"MINID = 123456-7 LIMIT 5");
1190        }
1191    }
1192
1193    mod stream_add_options {
1194        use super::*;
1195
1196        #[test]
1197        fn the_default() {
1198            let options = StreamAddOptions::default();
1199
1200            assert_command_eq(options, b"");
1201        }
1202
1203        #[test]
1204        fn with_maxlen_trim() {
1205            let options = StreamAddOptions::default()
1206                .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1207
1208            assert_command_eq(options, b"MAXLEN = 10");
1209        }
1210
1211        #[test]
1212        fn with_nomkstream() {
1213            let options = StreamAddOptions::default().nomkstream();
1214
1215            assert_command_eq(options, b"NOMKSTREAM");
1216        }
1217
1218        #[test]
1219        fn with_nomkstream_and_maxlen_trim() {
1220            let options = StreamAddOptions::default()
1221                .nomkstream()
1222                .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1223
1224            assert_command_eq(options, b"NOMKSTREAM MAXLEN = 10");
1225        }
1226    }
1227}