redis/
cmd.rs

1#[cfg(feature = "aio")]
2use futures_util::{
3    future::BoxFuture,
4    task::{Context, Poll},
5    Stream, StreamExt,
6};
7#[cfg(feature = "aio")]
8use std::pin::Pin;
9#[cfg(feature = "cache-aio")]
10use std::time::Duration;
11use std::{fmt, io, io::Write};
12
13use crate::pipeline::Pipeline;
14use crate::types::{from_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs};
15use crate::{connection::ConnectionLike, ParsingError};
16
17/// An argument to a redis command
18#[derive(Clone, PartialEq, Debug)]
19#[non_exhaustive]
20pub enum Arg<D> {
21    /// A normal argument
22    Simple(D),
23    /// A cursor argument created from `cursor_arg()`
24    Cursor,
25}
26
27/// CommandCacheConfig is used to define caching behaviour of individual commands.
28/// # Example
29/// ```rust
30/// use std::time::Duration;
31/// use redis::{CommandCacheConfig, Cmd};
32///
33/// let ttl = Duration::from_secs(120); // 2 minutes TTL
34/// let config = CommandCacheConfig::new()
35///     .set_enable_cache(true)
36///     .set_client_side_ttl(ttl);
37/// let command = Cmd::new().arg("GET").arg("key").set_cache_config(config);
38/// ```
39#[cfg(feature = "cache-aio")]
40#[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
41#[derive(Clone)]
42pub struct CommandCacheConfig {
43    pub(crate) enable_cache: bool,
44    pub(crate) client_side_ttl: Option<Duration>,
45}
46
47#[cfg(feature = "cache-aio")]
48impl CommandCacheConfig {
49    /// Creates new CommandCacheConfig with enable_cache as true and without client_side_ttl.
50    pub fn new() -> Self {
51        Self {
52            enable_cache: true,
53            client_side_ttl: None,
54        }
55    }
56
57    /// Sets whether the cache should be enabled or not.
58    /// Disabling cache for specific command when using [crate::caching::CacheMode::All] will not work.
59    pub fn set_enable_cache(mut self, enable_cache: bool) -> Self {
60        self.enable_cache = enable_cache;
61        self
62    }
63
64    /// Sets custom client side time to live (TTL).
65    pub fn set_client_side_ttl(mut self, client_side_ttl: Duration) -> Self {
66        self.client_side_ttl = Some(client_side_ttl);
67        self
68    }
69}
70#[cfg(feature = "cache-aio")]
71impl Default for CommandCacheConfig {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77/// Represents redis commands.
78#[derive(Clone)]
79pub struct Cmd {
80    pub(crate) data: Vec<u8>,
81    // Arg::Simple contains the offset that marks the end of the argument
82    args: Vec<Arg<usize>>,
83    cursor: Option<u64>,
84    // If it's true command's response won't be read from socket. Useful for Pub/Sub.
85    no_response: bool,
86    #[cfg(feature = "cache-aio")]
87    cache: Option<CommandCacheConfig>,
88}
89
90/// Represents a redis iterator.
91pub struct Iter<'a, T: FromRedisValue> {
92    iter: CheckedIter<'a, T>,
93}
94impl<T: FromRedisValue> Iterator for Iter<'_, T> {
95    type Item = RedisResult<T>;
96
97    #[inline]
98    fn next(&mut self) -> Option<RedisResult<T>> {
99        self.iter.next()
100    }
101}
102
103/// Represents a safe(r) redis iterator.
104struct CheckedIter<'a, T: FromRedisValue> {
105    batch: std::vec::IntoIter<Result<T, ParsingError>>,
106    con: &'a mut (dyn ConnectionLike + 'a),
107    cmd: Cmd,
108}
109
110impl<T: FromRedisValue> Iterator for CheckedIter<'_, T> {
111    type Item = RedisResult<T>;
112
113    #[inline]
114    fn next(&mut self) -> Option<RedisResult<T>> {
115        // we need to do this in a loop until we produce at least one item
116        // or we find the actual end of the iteration.  This is necessary
117        // because with filtering an iterator it is possible that a whole
118        // chunk is not matching the pattern and thus yielding empty results.
119        loop {
120            if let Some(value) = self.batch.next() {
121                return Some(value.map_err(|err| err.into()));
122            };
123
124            if self.cmd.cursor? == 0 {
125                return None;
126            }
127
128            let (cursor, batch) = match self
129                .con
130                .req_packed_command(&self.cmd.get_packed_command())
131                .and_then(|val| Ok(from_redis_value::<(u64, _)>(val)?))
132            {
133                Ok((cursor, values)) => (cursor, T::from_each_redis_values(values)),
134                Err(e) => return Some(Err(e)),
135            };
136
137            self.cmd.cursor = Some(cursor);
138            self.batch = batch.into_iter();
139        }
140    }
141}
142
143#[cfg(feature = "aio")]
144use crate::aio::ConnectionLike as AsyncConnection;
145
146/// The inner future of AsyncIter
147#[cfg(feature = "aio")]
148struct AsyncIterInner<'a, T: FromRedisValue + 'a> {
149    batch: std::vec::IntoIter<Result<T, ParsingError>>,
150    con: &'a mut (dyn AsyncConnection + Send + 'a),
151    cmd: Cmd,
152}
153
154/// Represents the state of AsyncIter
155#[cfg(feature = "aio")]
156enum IterOrFuture<'a, T: FromRedisValue + 'a> {
157    Iter(AsyncIterInner<'a, T>),
158    Future(BoxFuture<'a, (AsyncIterInner<'a, T>, Option<RedisResult<T>>)>),
159    Empty,
160}
161
162/// Represents a redis iterator that can be used with async connections.
163#[cfg(feature = "aio")]
164pub struct AsyncIter<'a, T: FromRedisValue + 'a> {
165    inner: IterOrFuture<'a, T>,
166}
167
168#[cfg(feature = "aio")]
169impl<'a, T: FromRedisValue + 'a> AsyncIterInner<'a, T> {
170    async fn next_item(&mut self) -> Option<RedisResult<T>> {
171        // we need to do this in a loop until we produce at least one item
172        // or we find the actual end of the iteration.  This is necessary
173        // because with filtering an iterator it is possible that a whole
174        // chunk is not matching the pattern and thus yielding empty results.
175        loop {
176            if let Some(v) = self.batch.next() {
177                return Some(v.map_err(|err| err.into()));
178            };
179
180            if self.cmd.cursor? == 0 {
181                return None;
182            }
183
184            let (cursor, batch) = match self
185                .con
186                .req_packed_command(&self.cmd)
187                .await
188                .and_then(|val| Ok(from_redis_value::<(u64, _)>(val)?))
189            {
190                Ok((cursor, items)) => (cursor, T::from_each_redis_values(items)),
191                Err(e) => return Some(Err(e)),
192            };
193
194            self.cmd.cursor = Some(cursor);
195            self.batch = batch.into_iter();
196        }
197    }
198}
199
200#[cfg(feature = "aio")]
201impl<'a, T: FromRedisValue + 'a + Unpin + Send> AsyncIter<'a, T> {
202    /// ```rust,no_run
203    /// # use redis::AsyncCommands;
204    /// # async fn scan_set() -> redis::RedisResult<()> {
205    /// # let client = redis::Client::open("redis://127.0.0.1/")?;
206    /// # let mut con = client.get_multiplexed_async_connection().await?;
207    /// let _: () = con.sadd("my_set", 42i32).await?;
208    /// let _: () = con.sadd("my_set", 43i32).await?;
209    /// let mut iter: redis::AsyncIter<i32> = con.sscan("my_set").await?;
210    /// while let Some(element) = iter.next_item().await {
211    ///     let element = element?;
212    ///     assert!(element == 42 || element == 43);
213    /// }
214    /// # Ok(())
215    /// # }
216    /// ```
217    #[inline]
218    pub async fn next_item(&mut self) -> Option<RedisResult<T>> {
219        StreamExt::next(self).await
220    }
221}
222
223#[cfg(feature = "aio")]
224impl<'a, T: FromRedisValue + Unpin + Send + 'a> Stream for AsyncIter<'a, T> {
225    type Item = RedisResult<T>;
226
227    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
228        let this = self.get_mut();
229        let inner = std::mem::replace(&mut this.inner, IterOrFuture::Empty);
230        match inner {
231            IterOrFuture::Iter(mut iter) => {
232                let fut = async move {
233                    let next_item = iter.next_item().await;
234                    (iter, next_item)
235                };
236                this.inner = IterOrFuture::Future(Box::pin(fut));
237                Pin::new(this).poll_next(cx)
238            }
239            IterOrFuture::Future(mut fut) => match fut.as_mut().poll(cx) {
240                Poll::Pending => {
241                    this.inner = IterOrFuture::Future(fut);
242                    Poll::Pending
243                }
244                Poll::Ready((iter, value)) => {
245                    this.inner = IterOrFuture::Iter(iter);
246
247                    Poll::Ready(value)
248                }
249            },
250            IterOrFuture::Empty => unreachable!(),
251        }
252    }
253}
254
255fn countdigits(mut v: usize) -> usize {
256    let mut result = 1;
257    loop {
258        if v < 10 {
259            return result;
260        }
261        if v < 100 {
262            return result + 1;
263        }
264        if v < 1000 {
265            return result + 2;
266        }
267        if v < 10000 {
268            return result + 3;
269        }
270
271        v /= 10000;
272        result += 4;
273    }
274}
275
276#[inline]
277fn bulklen(len: usize) -> usize {
278    1 + countdigits(len) + 2 + len + 2
279}
280
281fn args_len<'a, I>(args: I, cursor: u64) -> usize
282where
283    I: IntoIterator<Item = Arg<&'a [u8]>> + ExactSizeIterator,
284{
285    let mut totlen = 1 + countdigits(args.len()) + 2;
286    for item in args {
287        totlen += bulklen(match item {
288            Arg::Cursor => countdigits(cursor as usize),
289            Arg::Simple(val) => val.len(),
290        });
291    }
292    totlen
293}
294
295pub(crate) fn cmd_len(cmd: &Cmd) -> usize {
296    args_len(cmd.args_iter(), cmd.cursor.unwrap_or(0))
297}
298
299fn encode_command<'a, I>(args: I, cursor: u64) -> Vec<u8>
300where
301    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
302{
303    let mut cmd = Vec::new();
304    write_command_to_vec(&mut cmd, args, cursor);
305    cmd
306}
307
308fn write_command_to_vec<'a, I>(cmd: &mut Vec<u8>, args: I, cursor: u64)
309where
310    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
311{
312    let totlen = args_len(args.clone(), cursor);
313
314    cmd.reserve(totlen);
315
316    write_command(cmd, args, cursor).unwrap()
317}
318
319fn write_command<'a, I>(cmd: &mut (impl ?Sized + Write), args: I, cursor: u64) -> io::Result<()>
320where
321    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
322{
323    let mut buf = ::itoa::Buffer::new();
324
325    cmd.write_all(b"*")?;
326    let s = buf.format(args.len());
327    cmd.write_all(s.as_bytes())?;
328    cmd.write_all(b"\r\n")?;
329
330    let mut cursor_bytes = itoa::Buffer::new();
331    for item in args {
332        let bytes = match item {
333            Arg::Cursor => cursor_bytes.format(cursor).as_bytes(),
334            Arg::Simple(val) => val,
335        };
336
337        cmd.write_all(b"$")?;
338        let s = buf.format(bytes.len());
339        cmd.write_all(s.as_bytes())?;
340        cmd.write_all(b"\r\n")?;
341
342        cmd.write_all(bytes)?;
343        cmd.write_all(b"\r\n")?;
344    }
345    Ok(())
346}
347
348impl RedisWrite for Cmd {
349    fn write_arg(&mut self, arg: &[u8]) {
350        self.data.extend_from_slice(arg);
351        self.args.push(Arg::Simple(self.data.len()));
352    }
353
354    fn write_arg_fmt(&mut self, arg: impl fmt::Display) {
355        write!(self.data, "{arg}").unwrap();
356        self.args.push(Arg::Simple(self.data.len()));
357    }
358
359    fn writer_for_next_arg(&mut self) -> impl Write + '_ {
360        struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
361        impl Drop for CmdBufferedArgGuard<'_> {
362            fn drop(&mut self) {
363                self.0.args.push(Arg::Simple(self.0.data.len()));
364            }
365        }
366        impl Write for CmdBufferedArgGuard<'_> {
367            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
368                self.0.data.extend_from_slice(buf);
369                Ok(buf.len())
370            }
371
372            fn flush(&mut self) -> std::io::Result<()> {
373                Ok(())
374            }
375        }
376
377        CmdBufferedArgGuard(self)
378    }
379
380    fn reserve_space_for_args(&mut self, additional: impl IntoIterator<Item = usize>) {
381        let mut capacity = 0;
382        let mut args = 0;
383        for add in additional {
384            capacity += add;
385            args += 1;
386        }
387        self.data.reserve(capacity);
388        self.args.reserve(args);
389    }
390
391    #[cfg(feature = "bytes")]
392    fn bufmut_for_next_arg(&mut self, capacity: usize) -> impl bytes::BufMut + '_ {
393        self.data.reserve(capacity);
394        struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
395        impl Drop for CmdBufferedArgGuard<'_> {
396            fn drop(&mut self) {
397                self.0.args.push(Arg::Simple(self.0.data.len()));
398            }
399        }
400        unsafe impl bytes::BufMut for CmdBufferedArgGuard<'_> {
401            fn remaining_mut(&self) -> usize {
402                self.0.data.remaining_mut()
403            }
404
405            unsafe fn advance_mut(&mut self, cnt: usize) {
406                self.0.data.advance_mut(cnt);
407            }
408
409            fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
410                self.0.data.chunk_mut()
411            }
412
413            // Vec specializes these methods, so we do too
414            fn put<T: bytes::buf::Buf>(&mut self, src: T)
415            where
416                Self: Sized,
417            {
418                self.0.data.put(src);
419            }
420
421            fn put_slice(&mut self, src: &[u8]) {
422                self.0.data.put_slice(src);
423            }
424
425            fn put_bytes(&mut self, val: u8, cnt: usize) {
426                self.0.data.put_bytes(val, cnt);
427            }
428        }
429
430        CmdBufferedArgGuard(self)
431    }
432}
433
434impl Default for Cmd {
435    fn default() -> Cmd {
436        Cmd::new()
437    }
438}
439
440/// A command acts as a builder interface to creating encoded redis
441/// requests.  This allows you to easily assemble a packed command
442/// by chaining arguments together.
443///
444/// Basic example:
445///
446/// ```rust
447/// redis::Cmd::new().arg("SET").arg("my_key").arg(42);
448/// ```
449///
450/// There is also a helper function called `cmd` which makes it a
451/// tiny bit shorter:
452///
453/// ```rust
454/// redis::cmd("SET").arg("my_key").arg(42);
455/// ```
456///
457/// Because Rust currently does not have an ideal system
458/// for lifetimes of temporaries, sometimes you need to hold on to
459/// the initially generated command:
460///
461/// ```rust,no_run
462/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
463/// # let mut con = client.get_connection().unwrap();
464/// let mut cmd = redis::cmd("SMEMBERS");
465/// let mut iter : redis::Iter<i32> = cmd.arg("my_set").clone().iter(&mut con).unwrap();
466/// ```
467impl Cmd {
468    /// Creates a new empty command.
469    pub fn new() -> Cmd {
470        Cmd {
471            data: vec![],
472            args: vec![],
473            cursor: None,
474            no_response: false,
475            #[cfg(feature = "cache-aio")]
476            cache: None,
477        }
478    }
479
480    /// Creates a new empty command, with at least the requested capacity.
481    pub fn with_capacity(arg_count: usize, size_of_data: usize) -> Cmd {
482        Cmd {
483            data: Vec::with_capacity(size_of_data),
484            args: Vec::with_capacity(arg_count),
485            cursor: None,
486            no_response: false,
487            #[cfg(feature = "cache-aio")]
488            cache: None,
489        }
490    }
491
492    /// Get the capacities for the internal buffers.
493    #[cfg(test)]
494    #[allow(dead_code)]
495    pub(crate) fn capacity(&self) -> (usize, usize) {
496        (self.args.capacity(), self.data.capacity())
497    }
498
499    /// Clears the command, resetting it completely.
500    ///
501    /// This is equivalent to [`Cmd::new`], except the buffer capacity is kept.
502    ///
503    /// # Examples
504    ///
505    /// ```rust,no_run
506    /// # use redis::{Client, Cmd};
507    /// # let client = Client::open("redis://127.0.0.1/").unwrap();
508    /// # let mut con = client.get_connection().expect("Failed to connect to Redis");
509    /// let mut cmd = Cmd::new();
510    /// cmd.arg("SET").arg("foo").arg("42");
511    /// cmd.query::<()>(&mut con).expect("Query failed");
512    /// cmd.clear();
513    /// // This reuses the allocations of the previous command
514    /// cmd.arg("SET").arg("bar").arg("42");
515    /// cmd.query::<()>(&mut con).expect("Query failed");
516    /// ```
517    pub fn clear(&mut self) {
518        self.data.clear();
519        self.args.clear();
520        self.cursor = None;
521        self.no_response = false;
522        #[cfg(feature = "cache-aio")]
523        {
524            self.cache = None;
525        }
526    }
527
528    /// Appends an argument to the command.  The argument passed must
529    /// be a type that implements `ToRedisArgs`.  Most primitive types as
530    /// well as vectors of primitive types implement it.
531    ///
532    /// For instance all of the following are valid:
533    ///
534    /// ```rust,no_run
535    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
536    /// # let mut con = client.get_connection().unwrap();
537    /// redis::cmd("SET").arg(&["my_key", "my_value"]);
538    /// redis::cmd("SET").arg("my_key").arg(42);
539    /// redis::cmd("SET").arg("my_key").arg(b"my_value");
540    /// ```
541    #[inline]
542    pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
543        arg.write_redis_args(self);
544        self
545    }
546
547    /// Takes the command out of the mutable reference and returns it as a value
548    ///
549    /// The referenced command is left empty.
550    pub fn take(&mut self) -> Self {
551        std::mem::take(self)
552    }
553
554    /// Works similar to `arg` but adds a cursor argument.
555    ///
556    /// This is always an integer and also flips the command implementation to support a
557    /// different mode for the iterators where the iterator will ask for
558    /// another batch of items when the local data is exhausted.
559    /// Calling this function more than once will overwrite the previous cursor with the latest set value.
560    ///
561    /// ```rust,no_run
562    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
563    /// # let mut con = client.get_connection().unwrap();
564    /// let mut cmd = redis::cmd("SSCAN");
565    /// let mut iter : redis::Iter<isize> =
566    ///     cmd.arg("my_set").cursor_arg(0).clone().iter(&mut con).unwrap();
567    /// for x in iter {
568    ///     // do something with the item
569    /// }
570    /// ```
571    #[inline]
572    pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
573        self.cursor = Some(cursor);
574        self.args.push(Arg::Cursor);
575        self
576    }
577
578    /// Returns the packed command as a byte vector.
579    ///
580    /// This is a wrapper around [`write_packed_command`] that creates a [`Vec`] to write to.
581    ///
582    /// [`write_packed_command`]: Self::write_packed_command
583    #[inline]
584    pub fn get_packed_command(&self) -> Vec<u8> {
585        let mut cmd = Vec::new();
586        if self.is_empty() {
587            return cmd;
588        }
589        self.write_packed_command(&mut cmd);
590        cmd
591    }
592
593    /// Writes the packed command to `dst`.
594    ///
595    /// This will *append* the packed command.
596    ///
597    /// See also [`get_packed_command`].
598    ///
599    /// [`get_packed_command`]: Self::get_packed_command.
600    #[inline]
601    pub fn write_packed_command(&self, dst: &mut Vec<u8>) {
602        write_command_to_vec(dst, self.args_iter(), self.cursor.unwrap_or(0))
603    }
604
605    pub(crate) fn write_packed_command_preallocated(&self, cmd: &mut Vec<u8>) {
606        write_command(cmd, self.args_iter(), self.cursor.unwrap_or(0)).unwrap()
607    }
608
609    /// Returns true if the command is in scan mode.
610    #[inline]
611    pub fn in_scan_mode(&self) -> bool {
612        self.cursor.is_some()
613    }
614
615    /// Sends the command as query to the connection and converts the
616    /// result to the target redis value.  This is the general way how
617    /// you can retrieve data.
618    #[inline]
619    pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
620        match con.req_command(self) {
621            Ok(val) => Ok(from_redis_value(val.extract_error()?)?),
622            Err(e) => Err(e),
623        }
624    }
625
626    /// Async version of `query`.
627    #[inline]
628    #[cfg(feature = "aio")]
629    pub async fn query_async<T: FromRedisValue>(
630        &self,
631        con: &mut impl crate::aio::ConnectionLike,
632    ) -> RedisResult<T> {
633        let val = con.req_packed_command(self).await?;
634        Ok(from_redis_value(val.extract_error()?)?)
635    }
636
637    /// Sets the cursor and converts the passed value to a batch used by the
638    /// iterators.
639    fn set_cursor_and_get_batch<T: FromRedisValue>(
640        &mut self,
641        value: crate::Value,
642    ) -> RedisResult<Vec<Result<T, ParsingError>>> {
643        let (cursor, values) = if value.looks_like_cursor() {
644            let (cursor, values) = from_redis_value::<(u64, _)>(value)?;
645            (cursor, values)
646        } else {
647            (0, from_redis_value(value)?)
648        };
649
650        self.cursor = Some(cursor);
651
652        Ok(T::from_each_redis_values(values))
653    }
654
655    /// Similar to `query()` but returns an iterator over the items of the
656    /// bulk result or iterator.  In normal mode this is not in any way more
657    /// efficient than just querying into a `Vec<T>` as it's internally
658    /// implemented as buffering into a vector.  This however is useful when
659    /// `cursor_arg` was used in which case the iterator will query for more
660    /// items until the server side cursor is exhausted.
661    ///
662    /// This is useful for commands such as `SSCAN`, `SCAN` and others.
663    ///
664    /// One speciality of this function is that it will check if the response
665    /// looks like a cursor or not and always just looks at the payload.
666    /// This way you can use the function the same for responses in the
667    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
668    /// tuple of cursor and list).
669    #[inline]
670    pub fn iter<T: FromRedisValue>(
671        mut self,
672        con: &mut dyn ConnectionLike,
673    ) -> RedisResult<Iter<'_, T>> {
674        let rv = con.req_command(&self)?;
675
676        let batch = self.set_cursor_and_get_batch(rv)?;
677
678        Ok(Iter {
679            iter: CheckedIter {
680                batch: batch.into_iter(),
681                con,
682                cmd: self,
683            },
684        })
685    }
686
687    /// Similar to `iter()` but returns an AsyncIter over the items of the
688    /// bulk result or iterator.  A [futures::Stream](https://docs.rs/futures/0.3.3/futures/stream/trait.Stream.html)
689    /// is implemented on AsyncIter. In normal mode this is not in any way more
690    /// efficient than just querying into a `Vec<T>` as it's internally
691    /// implemented as buffering into a vector.  This however is useful when
692    /// `cursor_arg` was used in which case the stream will query for more
693    /// items until the server side cursor is exhausted.
694    ///
695    /// This is useful for commands such as `SSCAN`, `SCAN` and others in async contexts.
696    ///
697    /// One speciality of this function is that it will check if the response
698    /// looks like a cursor or not and always just looks at the payload.
699    /// This way you can use the function the same for responses in the
700    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
701    /// tuple of cursor and list).
702    #[cfg(feature = "aio")]
703    #[inline]
704    pub async fn iter_async<'a, T: FromRedisValue + 'a>(
705        mut self,
706        con: &'a mut (dyn AsyncConnection + Send),
707    ) -> RedisResult<AsyncIter<'a, T>> {
708        let rv = con.req_packed_command(&self).await?;
709
710        let batch = self.set_cursor_and_get_batch(rv)?;
711
712        Ok(AsyncIter {
713            inner: IterOrFuture::Iter(AsyncIterInner {
714                batch: batch.into_iter(),
715                con,
716                cmd: self,
717            }),
718        })
719    }
720
721    /// This is an alternative to `query`` that can be used if you want to be able to handle a
722    /// command's success or failure but don't care about the command's response. For example,
723    /// this is useful for "SET" commands for which the response's content is not important.
724    /// It avoids the need to define generic bounds for ().
725    #[inline]
726    pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
727        self.query::<()>(con)
728    }
729
730    /// This is an alternative to `query_async` that can be used if you want to be able to handle a
731    /// command's success or failure but don't care about the command's response. For example,
732    /// this is useful for "SET" commands for which the response's content is not important.
733    /// It avoids the need to define generic bounds for ().
734    #[cfg(feature = "aio")]
735    pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
736        self.query_async::<()>(con).await
737    }
738
739    /// Returns an iterator over the arguments in this command (including the command name itself)
740    pub fn args_iter(&self) -> impl Clone + ExactSizeIterator<Item = Arg<&[u8]>> {
741        let mut prev = 0;
742        self.args.iter().map(move |arg| match *arg {
743            Arg::Simple(i) => {
744                let arg = Arg::Simple(&self.data[prev..i]);
745                prev = i;
746                arg
747            }
748
749            Arg::Cursor => Arg::Cursor,
750        })
751    }
752
753    // Get a reference to the argument at `idx`
754    #[cfg(any(feature = "cluster", feature = "cache-aio"))]
755    pub(crate) fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
756        if idx >= self.args.len() {
757            return None;
758        }
759
760        let start = if idx == 0 {
761            0
762        } else {
763            match self.args[idx - 1] {
764                Arg::Simple(n) => n,
765                _ => 0,
766            }
767        };
768        let end = match self.args[idx] {
769            Arg::Simple(n) => n,
770            _ => 0,
771        };
772        if start == 0 && end == 0 {
773            return None;
774        }
775        Some(&self.data[start..end])
776    }
777
778    /// Client won't read and wait for results. Currently only used for Pub/Sub commands in RESP3.
779    ///
780    /// This is mostly set internally. The user can set it if they know that a certain command doesn't return a response, or if they use an async connection and don't want to wait for the server response.
781    /// For sync connections, setting this wrongly can affect the connection's correctness, and should be avoided.
782    #[inline]
783    pub fn set_no_response(&mut self, nr: bool) -> &mut Cmd {
784        self.no_response = nr;
785        self
786    }
787
788    /// Check whether command's result will be waited for.
789    #[inline]
790    pub fn is_no_response(&self) -> bool {
791        self.no_response
792    }
793
794    /// Changes caching behaviour for this specific command.
795    #[cfg(feature = "cache-aio")]
796    #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
797    pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Cmd {
798        self.cache = Some(command_cache_config);
799        self
800    }
801
802    #[cfg(feature = "cache-aio")]
803    #[inline]
804    pub(crate) fn get_cache_config(&self) -> &Option<CommandCacheConfig> {
805        &self.cache
806    }
807
808    pub(crate) fn is_empty(&self) -> bool {
809        self.args.is_empty()
810    }
811}
812
813/// Shortcut function to creating a command with a single argument.
814///
815/// The first argument of a redis command is always the name of the command
816/// which needs to be a string.  This is the recommended way to start a
817/// command pipe.
818///
819/// ```rust
820/// redis::cmd("PING");
821/// ```
822pub fn cmd(name: &str) -> Cmd {
823    let mut rv = Cmd::new();
824    rv.arg(name);
825    rv
826}
827
828/// Packs a bunch of commands into a request.
829///
830/// This is generally a quite useless function as this functionality is
831/// nicely wrapped through the `Cmd` object, but in some cases it can be
832/// useful.  The return value of this can then be send to the low level
833/// `ConnectionLike` methods.
834///
835/// Example:
836///
837/// ```rust
838/// # use redis::ToRedisArgs;
839/// let mut args = vec![];
840/// args.extend("SET".to_redis_args());
841/// args.extend("my_key".to_redis_args());
842/// args.extend(42.to_redis_args());
843/// let cmd = redis::pack_command(&args);
844/// assert_eq!(cmd, b"*3\r\n$3\r\nSET\r\n$6\r\nmy_key\r\n$2\r\n42\r\n".to_vec());
845/// ```
846pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
847    encode_command(args.iter().map(|x| Arg::Simple(&x[..])), 0)
848}
849
850/// Shortcut for creating a new pipeline.
851pub fn pipe() -> Pipeline {
852    Pipeline::new()
853}
854
855#[cfg(test)]
856mod tests {
857    use super::*;
858    #[cfg(feature = "bytes")]
859    use bytes::BufMut;
860
861    fn args_iter_to_str(cmd: &Cmd) -> Vec<String> {
862        cmd.args_iter()
863            .map(|arg| match arg {
864                Arg::Simple(bytes) => String::from_utf8(bytes.to_vec()).unwrap(),
865                Arg::Cursor => "CURSOR".to_string(),
866            })
867            .collect()
868    }
869
870    fn assert_arg_equality(c1: &Cmd, c2: &Cmd) {
871        let v1: Vec<_> = c1.args_iter().collect::<Vec<_>>();
872        let v2: Vec<_> = c2.args_iter().collect::<Vec<_>>();
873        assert_eq!(
874            v1,
875            v2,
876            "{:?} - {:?}",
877            args_iter_to_str(c1),
878            args_iter_to_str(c2)
879        );
880    }
881
882    fn assert_practical_equivalent(c1: Cmd, c2: Cmd) {
883        assert_eq!(c1.get_packed_command(), c2.get_packed_command());
884        assert_arg_equality(&c1, &c2);
885    }
886
887    #[test]
888    fn test_cmd_packed_command_simple_args() {
889        let args: &[&[u8]] = &[b"phone", b"barz"];
890        let mut cmd = cmd("key");
891        cmd.write_arg_fmt("value");
892        cmd.arg(42).arg(args);
893
894        let packed_command = cmd.get_packed_command();
895        assert_eq!(cmd_len(&cmd), packed_command.len());
896        assert_eq!(
897            packed_command,
898            b"*5\r\n$3\r\nkey\r\n$5\r\nvalue\r\n$2\r\n42\r\n$5\r\nphone\r\n$4\r\nbarz\r\n",
899            "{}",
900            String::from_utf8(packed_command.clone()).unwrap()
901        );
902        let args_vec: Vec<&[u8]> = vec![b"key", b"value", b"42", b"phone", b"barz"];
903        let args_vec: Vec<_> = args_vec.into_iter().map(Arg::Simple).collect();
904        assert_eq!(cmd.args_iter().collect::<Vec<_>>(), args_vec);
905    }
906
907    #[test]
908    fn test_cmd_packed_command_with_cursor() {
909        let args: &[&[u8]] = &[b"phone", b"barz"];
910        let mut cmd = cmd("key");
911        cmd.arg("value").arg(42).arg(args).cursor_arg(512);
912
913        let packed_command = cmd.get_packed_command();
914        assert_eq!(cmd_len(&cmd), packed_command.len());
915        assert_eq!(
916            packed_command,
917            b"*6\r\n$3\r\nkey\r\n$5\r\nvalue\r\n$2\r\n42\r\n$5\r\nphone\r\n$4\r\nbarz\r\n$3\r\n512\r\n",
918            "{}",
919            String::from_utf8(packed_command.clone()).unwrap()
920        );
921        let args_vec: Vec<&[u8]> = vec![b"key", b"value", b"42", b"phone", b"barz"];
922        let args_vec: Vec<_> = args_vec
923            .into_iter()
924            .map(Arg::Simple)
925            .chain(std::iter::once(Arg::Cursor))
926            .collect();
927        assert_eq!(cmd.args_iter().collect::<Vec<_>>(), args_vec);
928    }
929
930    #[test]
931    fn test_cmd_clean() {
932        let mut cmd = cmd("key");
933        cmd.arg("value")
934            .cursor_arg(24)
935            .set_no_response(true)
936            .clear();
937
938        // Everything should be reset, but the capacity should still be there
939        assert!(cmd.data.is_empty());
940        assert!(cmd.data.capacity() > 0);
941        assert!(cmd.is_empty());
942        assert!(cmd.args.capacity() > 0);
943        assert_eq!(cmd.cursor, None);
944        assert!(!cmd.no_response);
945        assert_practical_equivalent(cmd, Cmd::new());
946    }
947
948    #[test]
949    #[cfg(feature = "cache-aio")]
950    fn test_cmd_clean_cache_aio() {
951        let mut cmd = cmd("key");
952        cmd.arg("value")
953            .cursor_arg(24)
954            .set_cache_config(crate::CommandCacheConfig::default())
955            .set_no_response(true)
956            .clear();
957
958        // Everything should be reset, but the capacity should still be there
959        assert!(cmd.data.is_empty());
960        assert!(cmd.data.capacity() > 0);
961        assert!(cmd.is_empty());
962        assert!(cmd.args.capacity() > 0);
963        assert_eq!(cmd.cursor, None);
964        assert!(!cmd.no_response);
965        assert!(cmd.cache.is_none());
966    }
967
968    #[test]
969    fn test_cmd_writer_for_next_arg() {
970        // Test that a write split across multiple calls to `write` produces the
971        // same result as a single call to `write_arg`
972        let mut c1 = Cmd::new();
973        {
974            let mut c1_writer = c1.writer_for_next_arg();
975            c1_writer.write_all(b"foo").unwrap();
976            c1_writer.write_all(b"bar").unwrap();
977            c1_writer.flush().unwrap();
978        }
979
980        let mut c2 = Cmd::new();
981        c2.write_arg(b"foobar");
982
983        assert_practical_equivalent(c1, c2);
984    }
985
986    // Test that multiple writers to the same command produce the same
987    // result as the same multiple calls to `write_arg`
988    #[test]
989    fn test_cmd_writer_for_next_arg_multiple() {
990        let mut c1 = Cmd::new();
991        {
992            let mut c1_writer = c1.writer_for_next_arg();
993            c1_writer.write_all(b"foo").unwrap();
994            c1_writer.write_all(b"bar").unwrap();
995            c1_writer.flush().unwrap();
996        }
997        {
998            let mut c1_writer = c1.writer_for_next_arg();
999            c1_writer.write_all(b"baz").unwrap();
1000            c1_writer.write_all(b"qux").unwrap();
1001            c1_writer.flush().unwrap();
1002        }
1003
1004        let mut c2 = Cmd::new();
1005        c2.write_arg(b"foobar");
1006        c2.write_arg(b"bazqux");
1007
1008        assert_practical_equivalent(c1, c2);
1009    }
1010
1011    // Test that an "empty" write produces the equivalent to `write_arg(b"")`
1012    #[test]
1013    fn test_cmd_writer_for_next_arg_empty() {
1014        let mut c1 = Cmd::new();
1015        {
1016            let mut c1_writer = c1.writer_for_next_arg();
1017            c1_writer.flush().unwrap();
1018        }
1019
1020        let mut c2 = Cmd::new();
1021        c2.write_arg(b"");
1022
1023        assert_practical_equivalent(c1, c2);
1024    }
1025
1026    #[cfg(feature = "bytes")]
1027    /// Test that a write split across multiple calls to `write` produces the
1028    /// same result as a single call to `write_arg`
1029    #[test]
1030    fn test_cmd_bufmut_for_next_arg() {
1031        let mut c1 = Cmd::new();
1032        {
1033            let mut c1_writer = c1.bufmut_for_next_arg(6);
1034            c1_writer.put_slice(b"foo");
1035            c1_writer.put_slice(b"bar");
1036        }
1037
1038        let mut c2 = Cmd::new();
1039        c2.write_arg(b"foobar");
1040
1041        assert_practical_equivalent(c1, c2);
1042    }
1043
1044    #[cfg(feature = "bytes")]
1045    /// Test that multiple writers to the same command produce the same
1046    /// result as the same multiple calls to `write_arg`
1047    #[test]
1048    fn test_cmd_bufmut_for_next_arg_multiple() {
1049        let mut c1 = Cmd::new();
1050        {
1051            let mut c1_writer = c1.bufmut_for_next_arg(6);
1052            c1_writer.put_slice(b"foo");
1053            c1_writer.put_slice(b"bar");
1054        }
1055        {
1056            let mut c1_writer = c1.bufmut_for_next_arg(6);
1057            c1_writer.put_slice(b"baz");
1058            c1_writer.put_slice(b"qux");
1059        }
1060
1061        let mut c2 = Cmd::new();
1062        c2.write_arg(b"foobar");
1063        c2.write_arg(b"bazqux");
1064
1065        assert_practical_equivalent(c1, c2);
1066    }
1067
1068    #[cfg(feature = "bytes")]
1069    /// Test that an "empty" write produces the equivalent to `write_arg(b"")`
1070    #[test]
1071    fn test_cmd_bufmut_for_next_arg_empty() {
1072        let mut c1 = Cmd::new();
1073        {
1074            let _c1_writer = c1.bufmut_for_next_arg(0);
1075        }
1076
1077        let mut c2 = Cmd::new();
1078        c2.write_arg(b"");
1079
1080        assert_practical_equivalent(c1, c2);
1081    }
1082
1083    #[test]
1084    #[cfg(feature = "cluster")]
1085    fn test_cmd_arg_idx() {
1086        let mut c = Cmd::new();
1087        assert_eq!(c.arg_idx(0), None);
1088
1089        c.arg("SET");
1090        assert_eq!(c.arg_idx(0), Some(&b"SET"[..]));
1091        assert_eq!(c.arg_idx(1), None);
1092
1093        c.arg("foo").arg("42");
1094        assert_eq!(c.arg_idx(1), Some(&b"foo"[..]));
1095        assert_eq!(c.arg_idx(2), Some(&b"42"[..]));
1096        assert_eq!(c.arg_idx(3), None);
1097        assert_eq!(c.arg_idx(4), None);
1098    }
1099}