redis/
cmd.rs

1#[cfg(feature = "aio")]
2use futures_util::{
3    future::BoxFuture,
4    task::{Context, Poll},
5    Stream, StreamExt,
6};
7#[cfg(feature = "aio")]
8use std::pin::Pin;
9#[cfg(feature = "cache-aio")]
10use std::time::Duration;
11use std::{fmt, io};
12
13use crate::connection::ConnectionLike;
14use crate::pipeline::Pipeline;
15use crate::types::{from_owned_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs};
16
17/// An argument to a redis command
18#[derive(Clone)]
19pub enum Arg<D> {
20    /// A normal argument
21    Simple(D),
22    /// A cursor argument created from `cursor_arg()`
23    Cursor,
24}
25
26/// CommandCacheConfig is used to define caching behaviour of individual commands.
27/// # Example
28/// ```rust
29/// use std::time::Duration;
30/// use redis::{CommandCacheConfig, Cmd};
31///
32/// let ttl = Duration::from_secs(120); // 2 minutes TTL
33/// let config = CommandCacheConfig::new()
34///     .set_enable_cache(true)
35///     .set_client_side_ttl(ttl);
36/// let command = Cmd::new().arg("GET").arg("key").set_cache_config(config);
37/// ```
38#[cfg(feature = "cache-aio")]
39#[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
40#[derive(Clone)]
41pub struct CommandCacheConfig {
42    pub(crate) enable_cache: bool,
43    pub(crate) client_side_ttl: Option<Duration>,
44}
45
46#[cfg(feature = "cache-aio")]
47impl CommandCacheConfig {
48    /// Creates new CommandCacheConfig with enable_cache as true and without client_side_ttl.
49    pub fn new() -> Self {
50        Self {
51            enable_cache: true,
52            client_side_ttl: None,
53        }
54    }
55
56    /// Sets whether the cache should be enabled or not.
57    /// Disabling cache for specific command when using [crate::caching::CacheMode::All] will not work.
58    pub fn set_enable_cache(mut self, enable_cache: bool) -> Self {
59        self.enable_cache = enable_cache;
60        self
61    }
62
63    /// Sets custom client side time to live (TTL).
64    pub fn set_client_side_ttl(mut self, client_side_ttl: Duration) -> Self {
65        self.client_side_ttl = Some(client_side_ttl);
66        self
67    }
68}
69#[cfg(feature = "cache-aio")]
70impl Default for CommandCacheConfig {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76/// Represents redis commands.
77#[derive(Clone)]
78pub struct Cmd {
79    pub(crate) data: Vec<u8>,
80    // Arg::Simple contains the offset that marks the end of the argument
81    args: Vec<Arg<usize>>,
82    cursor: Option<u64>,
83    // If it's true command's response won't be read from socket. Useful for Pub/Sub.
84    no_response: bool,
85    #[cfg(feature = "cache-aio")]
86    cache: Option<CommandCacheConfig>,
87}
88
89#[cfg_attr(
90    not(feature = "safe_iterators"),
91    deprecated(
92        note = "Deprecated due to the fact that this implementation silently stops at the first value that can't be converted to T. Enable the feature `safe_iterators` for a safe version."
93    )
94)]
95/// Represents a redis iterator.
96pub struct Iter<'a, T: FromRedisValue> {
97    iter: CheckedIter<'a, T>,
98}
99
100#[cfg(not(feature = "safe_iterators"))]
101impl<T: FromRedisValue> Iterator for Iter<'_, T> {
102    type Item = T;
103
104    #[inline]
105    fn next(&mut self) -> Option<T> {
106        // use the checked iterator, but keep the behavior of the deprecated
107        // iterator.  This will return silently `None` if an error occurs.
108        self.iter.next()?.ok()
109    }
110}
111
112#[cfg(feature = "safe_iterators")]
113impl<T: FromRedisValue> Iterator for Iter<'_, T> {
114    type Item = RedisResult<T>;
115
116    #[inline]
117    fn next(&mut self) -> Option<RedisResult<T>> {
118        self.iter.next()
119    }
120}
121
122/// Represents a safe(r) redis iterator.
123struct CheckedIter<'a, T: FromRedisValue> {
124    batch: std::vec::IntoIter<RedisResult<T>>,
125    con: &'a mut (dyn ConnectionLike + 'a),
126    cmd: Cmd,
127}
128
129impl<T: FromRedisValue> Iterator for CheckedIter<'_, T> {
130    type Item = RedisResult<T>;
131
132    #[inline]
133    fn next(&mut self) -> Option<RedisResult<T>> {
134        // we need to do this in a loop until we produce at least one item
135        // or we find the actual end of the iteration.  This is necessary
136        // because with filtering an iterator it is possible that a whole
137        // chunk is not matching the pattern and thus yielding empty results.
138        loop {
139            if let Some(value) = self.batch.next() {
140                return Some(value);
141            };
142
143            if self.cmd.cursor? == 0 {
144                return None;
145            }
146
147            let (cursor, batch) = match self
148                .con
149                .req_packed_command(&self.cmd.get_packed_command())
150                .and_then(from_owned_redis_value::<(u64, _)>)
151            {
152                Ok((cursor, values)) => (cursor, T::from_each_owned_redis_values(values)),
153                Err(e) => return Some(Err(e)),
154            };
155
156            self.cmd.cursor = Some(cursor);
157            self.batch = batch.into_iter();
158        }
159    }
160}
161
162#[cfg(feature = "aio")]
163use crate::aio::ConnectionLike as AsyncConnection;
164
165/// The inner future of AsyncIter
166#[cfg(feature = "aio")]
167struct AsyncIterInner<'a, T: FromRedisValue + 'a> {
168    batch: std::vec::IntoIter<RedisResult<T>>,
169    con: &'a mut (dyn AsyncConnection + Send + 'a),
170    cmd: Cmd,
171}
172
173/// Represents the state of AsyncIter
174#[cfg(feature = "aio")]
175enum IterOrFuture<'a, T: FromRedisValue + 'a> {
176    Iter(AsyncIterInner<'a, T>),
177    Future(BoxFuture<'a, (AsyncIterInner<'a, T>, Option<RedisResult<T>>)>),
178    Empty,
179}
180
181/// Represents a redis iterator that can be used with async connections.
182#[cfg(feature = "aio")]
183#[cfg_attr(
184    all(feature = "aio", not(feature = "safe_iterators")),
185    deprecated(
186        note = "Deprecated due to the fact that this implementation silently stops at the first value that can't be converted to T. Enable the feature `safe_iterators` for a safe version."
187    )
188)]
189pub struct AsyncIter<'a, T: FromRedisValue + 'a> {
190    inner: IterOrFuture<'a, T>,
191}
192
193#[cfg(feature = "aio")]
194impl<'a, T: FromRedisValue + 'a> AsyncIterInner<'a, T> {
195    async fn next_item(&mut self) -> Option<RedisResult<T>> {
196        // we need to do this in a loop until we produce at least one item
197        // or we find the actual end of the iteration.  This is necessary
198        // because with filtering an iterator it is possible that a whole
199        // chunk is not matching the pattern and thus yielding empty results.
200        loop {
201            if let Some(v) = self.batch.next() {
202                return Some(v);
203            };
204
205            if self.cmd.cursor? == 0 {
206                return None;
207            }
208
209            let (cursor, batch) = match self
210                .con
211                .req_packed_command(&self.cmd)
212                .await
213                .and_then(from_owned_redis_value::<(u64, _)>)
214            {
215                Ok((cursor, items)) => (cursor, T::from_each_owned_redis_values(items)),
216                Err(e) => return Some(Err(e)),
217            };
218
219            self.cmd.cursor = Some(cursor);
220            self.batch = batch.into_iter();
221        }
222    }
223}
224
225#[cfg(feature = "aio")]
226impl<'a, T: FromRedisValue + 'a + Unpin + Send> AsyncIter<'a, T> {
227    /// ```rust,no_run
228    /// # use redis::AsyncCommands;
229    /// # async fn scan_set() -> redis::RedisResult<()> {
230    /// # let client = redis::Client::open("redis://127.0.0.1/")?;
231    /// # let mut con = client.get_multiplexed_async_connection().await?;
232    /// let _: () = con.sadd("my_set", 42i32).await?;
233    /// let _: () = con.sadd("my_set", 43i32).await?;
234    /// let mut iter: redis::AsyncIter<i32> = con.sscan("my_set").await?;
235    /// while let Some(element) = iter.next_item().await {
236    ///     let element = element?;
237    ///     assert!(element == 42 || element == 43);
238    /// }
239    /// # Ok(())
240    /// # }
241    /// ```
242    #[cfg(feature = "safe_iterators")]
243    #[inline]
244    pub async fn next_item(&mut self) -> Option<RedisResult<T>> {
245        StreamExt::next(self).await
246    }
247
248    /// ```rust,no_run
249    /// # use redis::AsyncCommands;
250    /// # async fn scan_set() -> redis::RedisResult<()> {
251    /// # let client = redis::Client::open("redis://127.0.0.1/")?;
252    /// # let mut con = client.get_multiplexed_async_connection().await?;
253    /// let _: () = con.sadd("my_set", 42i32).await?;
254    /// let _: () = con.sadd("my_set", 43i32).await?;
255    /// let mut iter: redis::AsyncIter<i32> = con.sscan("my_set").await?;
256    /// while let Some(element) = iter.next_item().await {
257    ///     assert!(element == 42 || element == 43);
258    /// }
259    /// # Ok(())
260    /// # }
261    /// ```
262    #[cfg(not(feature = "safe_iterators"))]
263    #[inline]
264    pub async fn next_item(&mut self) -> Option<T> {
265        StreamExt::next(self).await
266    }
267}
268
269#[cfg(feature = "aio")]
270impl<'a, T: FromRedisValue + Unpin + Send + 'a> Stream for AsyncIter<'a, T> {
271    #[cfg(feature = "safe_iterators")]
272    type Item = RedisResult<T>;
273    #[cfg(not(feature = "safe_iterators"))]
274    type Item = T;
275
276    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
277        let this = self.get_mut();
278        let inner = std::mem::replace(&mut this.inner, IterOrFuture::Empty);
279        match inner {
280            IterOrFuture::Iter(mut iter) => {
281                let fut = async move {
282                    let next_item = iter.next_item().await;
283                    (iter, next_item)
284                };
285                this.inner = IterOrFuture::Future(Box::pin(fut));
286                Pin::new(this).poll_next(cx)
287            }
288            IterOrFuture::Future(mut fut) => match fut.as_mut().poll(cx) {
289                Poll::Pending => {
290                    this.inner = IterOrFuture::Future(fut);
291                    Poll::Pending
292                }
293                Poll::Ready((iter, value)) => {
294                    this.inner = IterOrFuture::Iter(iter);
295
296                    #[cfg(feature = "safe_iterators")]
297                    return Poll::Ready(value);
298                    #[cfg(not(feature = "safe_iterators"))]
299                    Poll::Ready(value.map(|res| res.ok()).flatten())
300                }
301            },
302            IterOrFuture::Empty => unreachable!(),
303        }
304    }
305}
306
307fn countdigits(mut v: usize) -> usize {
308    let mut result = 1;
309    loop {
310        if v < 10 {
311            return result;
312        }
313        if v < 100 {
314            return result + 1;
315        }
316        if v < 1000 {
317            return result + 2;
318        }
319        if v < 10000 {
320            return result + 3;
321        }
322
323        v /= 10000;
324        result += 4;
325    }
326}
327
328#[inline]
329fn bulklen(len: usize) -> usize {
330    1 + countdigits(len) + 2 + len + 2
331}
332
333fn args_len<'a, I>(args: I, cursor: u64) -> usize
334where
335    I: IntoIterator<Item = Arg<&'a [u8]>> + ExactSizeIterator,
336{
337    let mut totlen = 1 + countdigits(args.len()) + 2;
338    for item in args {
339        totlen += bulklen(match item {
340            Arg::Cursor => countdigits(cursor as usize),
341            Arg::Simple(val) => val.len(),
342        });
343    }
344    totlen
345}
346
347pub(crate) fn cmd_len(cmd: &Cmd) -> usize {
348    args_len(cmd.args_iter(), cmd.cursor.unwrap_or(0))
349}
350
351fn encode_command<'a, I>(args: I, cursor: u64) -> Vec<u8>
352where
353    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
354{
355    let mut cmd = Vec::new();
356    write_command_to_vec(&mut cmd, args, cursor);
357    cmd
358}
359
360fn write_command_to_vec<'a, I>(cmd: &mut Vec<u8>, args: I, cursor: u64)
361where
362    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
363{
364    let totlen = args_len(args.clone(), cursor);
365
366    cmd.reserve(totlen);
367
368    write_command(cmd, args, cursor).unwrap()
369}
370
371fn write_command<'a, I>(cmd: &mut (impl ?Sized + io::Write), args: I, cursor: u64) -> io::Result<()>
372where
373    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
374{
375    let mut buf = ::itoa::Buffer::new();
376
377    cmd.write_all(b"*")?;
378    let s = buf.format(args.len());
379    cmd.write_all(s.as_bytes())?;
380    cmd.write_all(b"\r\n")?;
381
382    let mut cursor_bytes = itoa::Buffer::new();
383    for item in args {
384        let bytes = match item {
385            Arg::Cursor => cursor_bytes.format(cursor).as_bytes(),
386            Arg::Simple(val) => val,
387        };
388
389        cmd.write_all(b"$")?;
390        let s = buf.format(bytes.len());
391        cmd.write_all(s.as_bytes())?;
392        cmd.write_all(b"\r\n")?;
393
394        cmd.write_all(bytes)?;
395        cmd.write_all(b"\r\n")?;
396    }
397    Ok(())
398}
399
400impl RedisWrite for Cmd {
401    fn write_arg(&mut self, arg: &[u8]) {
402        self.data.extend_from_slice(arg);
403        self.args.push(Arg::Simple(self.data.len()));
404    }
405
406    fn write_arg_fmt(&mut self, arg: impl fmt::Display) {
407        use std::io::Write;
408        write!(self.data, "{arg}").unwrap();
409        self.args.push(Arg::Simple(self.data.len()));
410    }
411
412    fn writer_for_next_arg(&mut self) -> impl std::io::Write + '_ {
413        struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
414        impl Drop for CmdBufferedArgGuard<'_> {
415            fn drop(&mut self) {
416                self.0.args.push(Arg::Simple(self.0.data.len()));
417            }
418        }
419        impl std::io::Write for CmdBufferedArgGuard<'_> {
420            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
421                self.0.data.extend_from_slice(buf);
422                Ok(buf.len())
423            }
424
425            fn flush(&mut self) -> std::io::Result<()> {
426                Ok(())
427            }
428        }
429
430        CmdBufferedArgGuard(self)
431    }
432
433    fn reserve_space_for_args(&mut self, additional: impl IntoIterator<Item = usize>) {
434        let mut capacity = 0;
435        let mut args = 0;
436        for add in additional {
437            capacity += add;
438            args += 1;
439        }
440        self.data.reserve(capacity);
441        self.args.reserve(args);
442    }
443
444    #[cfg(feature = "bytes")]
445    fn bufmut_for_next_arg(&mut self, capacity: usize) -> impl bytes::BufMut + '_ {
446        self.data.reserve(capacity);
447        struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
448        impl Drop for CmdBufferedArgGuard<'_> {
449            fn drop(&mut self) {
450                self.0.args.push(Arg::Simple(self.0.data.len()));
451            }
452        }
453        unsafe impl bytes::BufMut for CmdBufferedArgGuard<'_> {
454            fn remaining_mut(&self) -> usize {
455                self.0.data.remaining_mut()
456            }
457
458            unsafe fn advance_mut(&mut self, cnt: usize) {
459                self.0.data.advance_mut(cnt);
460            }
461
462            fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
463                self.0.data.chunk_mut()
464            }
465
466            // Vec specializes these methods, so we do too
467            fn put<T: bytes::buf::Buf>(&mut self, src: T)
468            where
469                Self: Sized,
470            {
471                self.0.data.put(src);
472            }
473
474            fn put_slice(&mut self, src: &[u8]) {
475                self.0.data.put_slice(src);
476            }
477
478            fn put_bytes(&mut self, val: u8, cnt: usize) {
479                self.0.data.put_bytes(val, cnt);
480            }
481        }
482
483        CmdBufferedArgGuard(self)
484    }
485}
486
487impl Default for Cmd {
488    fn default() -> Cmd {
489        Cmd::new()
490    }
491}
492
493/// A command acts as a builder interface to creating encoded redis
494/// requests.  This allows you to easily assemble a packed command
495/// by chaining arguments together.
496///
497/// Basic example:
498///
499/// ```rust
500/// redis::Cmd::new().arg("SET").arg("my_key").arg(42);
501/// ```
502///
503/// There is also a helper function called `cmd` which makes it a
504/// tiny bit shorter:
505///
506/// ```rust
507/// redis::cmd("SET").arg("my_key").arg(42);
508/// ```
509///
510/// Because Rust currently does not have an ideal system
511/// for lifetimes of temporaries, sometimes you need to hold on to
512/// the initially generated command:
513///
514/// ```rust,no_run
515/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
516/// # let mut con = client.get_connection().unwrap();
517/// let mut cmd = redis::cmd("SMEMBERS");
518/// let mut iter : redis::Iter<i32> = cmd.arg("my_set").clone().iter(&mut con).unwrap();
519/// ```
520impl Cmd {
521    /// Creates a new empty command.
522    pub fn new() -> Cmd {
523        Cmd {
524            data: vec![],
525            args: vec![],
526            cursor: None,
527            no_response: false,
528            #[cfg(feature = "cache-aio")]
529            cache: None,
530        }
531    }
532
533    /// Creates a new empty command, with at least the requested capacity.
534    pub fn with_capacity(arg_count: usize, size_of_data: usize) -> Cmd {
535        Cmd {
536            data: Vec::with_capacity(size_of_data),
537            args: Vec::with_capacity(arg_count),
538            cursor: None,
539            no_response: false,
540            #[cfg(feature = "cache-aio")]
541            cache: None,
542        }
543    }
544
545    /// Get the capacities for the internal buffers.
546    #[cfg(test)]
547    #[allow(dead_code)]
548    pub(crate) fn capacity(&self) -> (usize, usize) {
549        (self.args.capacity(), self.data.capacity())
550    }
551
552    /// Clears the command, resetting it completely.
553    ///
554    /// This is equivalent to [`Cmd::new`], except the buffer capacity is kept.
555    ///
556    /// # Examples
557    ///
558    /// ```rust,no_run
559    /// # use redis::{Client, Cmd};
560    /// # let client = Client::open("redis://127.0.0.1/").unwrap();
561    /// # let mut con = client.get_connection().expect("Failed to connect to Redis");
562    /// let mut cmd = Cmd::new();
563    /// cmd.arg("SET").arg("foo").arg("42");
564    /// cmd.query::<()>(&mut con).expect("Query failed");
565    /// cmd.clear();
566    /// // This reuses the allocations of the previous command
567    /// cmd.arg("SET").arg("bar").arg("42");
568    /// cmd.query::<()>(&mut con).expect("Query failed");
569    /// ```
570    pub fn clear(&mut self) {
571        self.data.clear();
572        self.args.clear();
573        self.cursor = None;
574        self.no_response = false;
575        #[cfg(feature = "cache-aio")]
576        {
577            self.cache = None;
578        }
579    }
580
581    /// Appends an argument to the command.  The argument passed must
582    /// be a type that implements `ToRedisArgs`.  Most primitive types as
583    /// well as vectors of primitive types implement it.
584    ///
585    /// For instance all of the following are valid:
586    ///
587    /// ```rust,no_run
588    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
589    /// # let mut con = client.get_connection().unwrap();
590    /// redis::cmd("SET").arg(&["my_key", "my_value"]);
591    /// redis::cmd("SET").arg("my_key").arg(42);
592    /// redis::cmd("SET").arg("my_key").arg(b"my_value");
593    /// ```
594    #[inline]
595    pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
596        arg.write_redis_args(self);
597        self
598    }
599
600    /// Works similar to `arg` but adds a cursor argument.  This is always
601    /// an integer and also flips the command implementation to support a
602    /// different mode for the iterators where the iterator will ask for
603    /// another batch of items when the local data is exhausted.
604    ///
605    /// ```rust,no_run
606    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
607    /// # let mut con = client.get_connection().unwrap();
608    /// let mut cmd = redis::cmd("SSCAN");
609    /// let mut iter : redis::Iter<isize> =
610    ///     cmd.arg("my_set").cursor_arg(0).clone().iter(&mut con).unwrap();
611    /// for x in iter {
612    ///     // do something with the item
613    /// }
614    /// ```
615    #[inline]
616    pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
617        assert!(!self.in_scan_mode());
618        self.cursor = Some(cursor);
619        self.args.push(Arg::Cursor);
620        self
621    }
622
623    /// Returns the packed command as a byte vector.
624    ///
625    /// This is a wrapper around [`write_packed_command`] that creates a [`Vec`] to write to.
626    ///
627    /// [`write_packed_command`]: Self::write_packed_command
628    #[inline]
629    pub fn get_packed_command(&self) -> Vec<u8> {
630        let mut cmd = Vec::new();
631        self.write_packed_command(&mut cmd);
632        cmd
633    }
634
635    /// Writes the packed command to `dst`.
636    ///
637    /// This will *append* the packed command.
638    ///
639    /// See also [`get_packed_command`].
640    ///
641    /// [`get_packed_command`]: Self::get_packed_command.
642    #[inline]
643    pub fn write_packed_command(&self, dst: &mut Vec<u8>) {
644        write_command_to_vec(dst, self.args_iter(), self.cursor.unwrap_or(0))
645    }
646
647    pub(crate) fn write_packed_command_preallocated(&self, cmd: &mut Vec<u8>) {
648        write_command(cmd, self.args_iter(), self.cursor.unwrap_or(0)).unwrap()
649    }
650
651    /// Returns true if the command is in scan mode.
652    #[inline]
653    pub fn in_scan_mode(&self) -> bool {
654        self.cursor.is_some()
655    }
656
657    /// Sends the command as query to the connection and converts the
658    /// result to the target redis value.  This is the general way how
659    /// you can retrieve data.
660    #[inline]
661    pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
662        match con.req_command(self) {
663            Ok(val) => from_owned_redis_value(val.extract_error()?),
664            Err(e) => Err(e),
665        }
666    }
667
668    /// Async version of `query`.
669    #[inline]
670    #[cfg(feature = "aio")]
671    pub async fn query_async<T: FromRedisValue>(
672        &self,
673        con: &mut impl crate::aio::ConnectionLike,
674    ) -> RedisResult<T> {
675        let val = con.req_packed_command(self).await?;
676        from_owned_redis_value(val.extract_error()?)
677    }
678
679    /// Sets the cursor and converts the passed value to a batch used by the
680    /// iterators.
681    fn set_cursor_and_get_batch<T: FromRedisValue>(
682        &mut self,
683        value: crate::Value,
684    ) -> RedisResult<Vec<RedisResult<T>>> {
685        let (cursor, values) = if value.looks_like_cursor() {
686            let (cursor, values) = from_owned_redis_value::<(u64, _)>(value)?;
687            (cursor, values)
688        } else {
689            (0, from_owned_redis_value(value)?)
690        };
691
692        self.cursor = Some(cursor);
693
694        Ok(T::from_each_owned_redis_values(values))
695    }
696
697    /// Similar to `query()` but returns an iterator over the items of the
698    /// bulk result or iterator.  In normal mode this is not in any way more
699    /// efficient than just querying into a `Vec<T>` as it's internally
700    /// implemented as buffering into a vector.  This however is useful when
701    /// `cursor_arg` was used in which case the iterator will query for more
702    /// items until the server side cursor is exhausted.
703    ///
704    /// This is useful for commands such as `SSCAN`, `SCAN` and others.
705    ///
706    /// One speciality of this function is that it will check if the response
707    /// looks like a cursor or not and always just looks at the payload.
708    /// This way you can use the function the same for responses in the
709    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
710    /// tuple of cursor and list).
711    #[inline]
712    pub fn iter<T: FromRedisValue>(
713        mut self,
714        con: &mut dyn ConnectionLike,
715    ) -> RedisResult<Iter<'_, T>> {
716        let rv = con.req_command(&self)?;
717
718        let batch = self.set_cursor_and_get_batch(rv)?;
719
720        Ok(Iter {
721            iter: CheckedIter {
722                batch: batch.into_iter(),
723                con,
724                cmd: self,
725            },
726        })
727    }
728
729    /// Similar to `iter()` but returns an AsyncIter over the items of the
730    /// bulk result or iterator.  A [futures::Stream](https://docs.rs/futures/0.3.3/futures/stream/trait.Stream.html)
731    /// is implemented on AsyncIter. In normal mode this is not in any way more
732    /// efficient than just querying into a `Vec<T>` as it's internally
733    /// implemented as buffering into a vector.  This however is useful when
734    /// `cursor_arg` was used in which case the stream will query for more
735    /// items until the server side cursor is exhausted.
736    ///
737    /// This is useful for commands such as `SSCAN`, `SCAN` and others in async contexts.
738    ///
739    /// One speciality of this function is that it will check if the response
740    /// looks like a cursor or not and always just looks at the payload.
741    /// This way you can use the function the same for responses in the
742    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
743    /// tuple of cursor and list).
744    #[cfg(feature = "aio")]
745    #[inline]
746    pub async fn iter_async<'a, T: FromRedisValue + 'a>(
747        mut self,
748        con: &'a mut (dyn AsyncConnection + Send),
749    ) -> RedisResult<AsyncIter<'a, T>> {
750        let rv = con.req_packed_command(&self).await?;
751
752        let batch = self.set_cursor_and_get_batch(rv)?;
753
754        Ok(AsyncIter {
755            inner: IterOrFuture::Iter(AsyncIterInner {
756                batch: batch.into_iter(),
757                con,
758                cmd: self,
759            }),
760        })
761    }
762
763    /// This is a shortcut to `query()` that does not return a value and
764    /// will fail the task if the query fails because of an error.  This is
765    /// mainly useful in examples and for simple commands like setting
766    /// keys.
767    ///
768    /// This is equivalent to a call of query like this:
769    ///
770    /// ```rust,no_run
771    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
772    /// # let mut con = client.get_connection().unwrap();
773    /// redis::cmd("PING").query::<()>(&mut con).unwrap();
774    /// ```
775    #[inline]
776    #[deprecated(note = "Use Cmd::exec + unwrap, instead")]
777    pub fn execute(&self, con: &mut dyn ConnectionLike) {
778        self.exec(con).unwrap();
779    }
780
781    /// This is an alternative to `query`` that can be used if you want to be able to handle a
782    /// command's success or failure but don't care about the command's response. For example,
783    /// this is useful for "SET" commands for which the response's content is not important.
784    /// It avoids the need to define generic bounds for ().
785    #[inline]
786    pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
787        self.query::<()>(con)
788    }
789
790    /// This is an alternative to `query_async` that can be used if you want to be able to handle a
791    /// command's success or failure but don't care about the command's response. For example,
792    /// this is useful for "SET" commands for which the response's content is not important.
793    /// It avoids the need to define generic bounds for ().
794    #[cfg(feature = "aio")]
795    pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
796        self.query_async::<()>(con).await
797    }
798
799    /// Returns an iterator over the arguments in this command (including the command name itself)
800    pub fn args_iter(&self) -> impl Clone + ExactSizeIterator<Item = Arg<&[u8]>> {
801        let mut prev = 0;
802        self.args.iter().map(move |arg| match *arg {
803            Arg::Simple(i) => {
804                let arg = Arg::Simple(&self.data[prev..i]);
805                prev = i;
806                arg
807            }
808
809            Arg::Cursor => Arg::Cursor,
810        })
811    }
812
813    // Get a reference to the argument at `idx`
814    #[cfg(any(feature = "cluster", feature = "cache-aio"))]
815    pub(crate) fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
816        if idx >= self.args.len() {
817            return None;
818        }
819
820        let start = if idx == 0 {
821            0
822        } else {
823            match self.args[idx - 1] {
824                Arg::Simple(n) => n,
825                _ => 0,
826            }
827        };
828        let end = match self.args[idx] {
829            Arg::Simple(n) => n,
830            _ => 0,
831        };
832        if start == 0 && end == 0 {
833            return None;
834        }
835        Some(&self.data[start..end])
836    }
837
838    /// Client won't read and wait for results. Currently only used for Pub/Sub commands in RESP3.
839    #[inline]
840    pub fn set_no_response(&mut self, nr: bool) -> &mut Cmd {
841        self.no_response = nr;
842        self
843    }
844
845    /// Check whether command's result will be waited for.
846    #[inline]
847    pub fn is_no_response(&self) -> bool {
848        self.no_response
849    }
850
851    /// Changes caching behaviour for this specific command.
852    #[cfg(feature = "cache-aio")]
853    #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
854    pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Cmd {
855        self.cache = Some(command_cache_config);
856        self
857    }
858
859    #[cfg(feature = "cache-aio")]
860    #[inline]
861    pub(crate) fn get_cache_config(&self) -> &Option<CommandCacheConfig> {
862        &self.cache
863    }
864}
865
866/// Shortcut function to creating a command with a single argument.
867///
868/// The first argument of a redis command is always the name of the command
869/// which needs to be a string.  This is the recommended way to start a
870/// command pipe.
871///
872/// ```rust
873/// redis::cmd("PING");
874/// ```
875pub fn cmd(name: &str) -> Cmd {
876    let mut rv = Cmd::new();
877    rv.arg(name);
878    rv
879}
880
881/// Packs a bunch of commands into a request.
882///
883/// This is generally a quite useless function as this functionality is
884/// nicely wrapped through the `Cmd` object, but in some cases it can be
885/// useful.  The return value of this can then be send to the low level
886/// `ConnectionLike` methods.
887///
888/// Example:
889///
890/// ```rust
891/// # use redis::ToRedisArgs;
892/// let mut args = vec![];
893/// args.extend("SET".to_redis_args());
894/// args.extend("my_key".to_redis_args());
895/// args.extend(42.to_redis_args());
896/// let cmd = redis::pack_command(&args);
897/// assert_eq!(cmd, b"*3\r\n$3\r\nSET\r\n$6\r\nmy_key\r\n$2\r\n42\r\n".to_vec());
898/// ```
899pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
900    encode_command(args.iter().map(|x| Arg::Simple(&x[..])), 0)
901}
902
903/// Shortcut for creating a new pipeline.
904pub fn pipe() -> Pipeline {
905    Pipeline::new()
906}
907
908#[cfg(test)]
909mod tests {
910    use super::Cmd;
911    #[cfg(feature = "bytes")]
912    use bytes::BufMut;
913
914    use crate::RedisWrite;
915    use std::io::Write;
916
917    #[test]
918    fn test_cmd_clean() {
919        let mut cmd = Cmd::new();
920        cmd.arg("key").arg("value");
921        cmd.set_no_response(true);
922        cmd.cursor_arg(24);
923        cmd.clear();
924
925        // Everything should be reset, but the capacity should still be there
926        assert!(cmd.data.is_empty());
927        assert!(cmd.data.capacity() > 0);
928        assert!(cmd.args.is_empty());
929        assert!(cmd.args.capacity() > 0);
930        assert_eq!(cmd.cursor, None);
931        assert!(!cmd.no_response);
932    }
933
934    #[test]
935    #[cfg(feature = "cache-aio")]
936    fn test_cmd_clean_cache_aio() {
937        let mut cmd = Cmd::new();
938        cmd.arg("key").arg("value");
939        cmd.set_no_response(true);
940        cmd.cursor_arg(24);
941        cmd.set_cache_config(crate::CommandCacheConfig::default());
942        cmd.clear();
943
944        // Everything should be reset, but the capacity should still be there
945        assert!(cmd.data.is_empty());
946        assert!(cmd.data.capacity() > 0);
947        assert!(cmd.args.is_empty());
948        assert!(cmd.args.capacity() > 0);
949        assert_eq!(cmd.cursor, None);
950        assert!(!cmd.no_response);
951        assert!(cmd.cache.is_none());
952    }
953
954    #[test]
955    fn test_cmd_writer_for_next_arg() {
956        // Test that a write split across multiple calls to `write` produces the
957        // same result as a single call to `write_arg`
958        let mut c1 = Cmd::new();
959        {
960            let mut c1_writer = c1.writer_for_next_arg();
961            c1_writer.write_all(b"foo").unwrap();
962            c1_writer.write_all(b"bar").unwrap();
963            c1_writer.flush().unwrap();
964        }
965        let v1 = c1.get_packed_command();
966
967        let mut c2 = Cmd::new();
968        c2.write_arg(b"foobar");
969        let v2 = c2.get_packed_command();
970
971        assert_eq!(v1, v2);
972    }
973
974    // Test that multiple writers to the same command produce the same
975    // result as the same multiple calls to `write_arg`
976    #[test]
977    fn test_cmd_writer_for_next_arg_multiple() {
978        let mut c1 = Cmd::new();
979        {
980            let mut c1_writer = c1.writer_for_next_arg();
981            c1_writer.write_all(b"foo").unwrap();
982            c1_writer.write_all(b"bar").unwrap();
983            c1_writer.flush().unwrap();
984        }
985        {
986            let mut c1_writer = c1.writer_for_next_arg();
987            c1_writer.write_all(b"baz").unwrap();
988            c1_writer.write_all(b"qux").unwrap();
989            c1_writer.flush().unwrap();
990        }
991        let v1 = c1.get_packed_command();
992
993        let mut c2 = Cmd::new();
994        c2.write_arg(b"foobar");
995        c2.write_arg(b"bazqux");
996        let v2 = c2.get_packed_command();
997
998        assert_eq!(v1, v2);
999    }
1000
1001    // Test that an "empty" write produces the equivalent to `write_arg(b"")`
1002    #[test]
1003    fn test_cmd_writer_for_next_arg_empty() {
1004        let mut c1 = Cmd::new();
1005        {
1006            let mut c1_writer = c1.writer_for_next_arg();
1007            c1_writer.flush().unwrap();
1008        }
1009        let v1 = c1.get_packed_command();
1010
1011        let mut c2 = Cmd::new();
1012        c2.write_arg(b"");
1013        let v2 = c2.get_packed_command();
1014
1015        assert_eq!(v1, v2);
1016    }
1017
1018    #[cfg(feature = "bytes")]
1019    /// Test that a write split across multiple calls to `write` produces the
1020    /// same result as a single call to `write_arg`
1021    #[test]
1022    fn test_cmd_bufmut_for_next_arg() {
1023        let mut c1 = Cmd::new();
1024        {
1025            let mut c1_writer = c1.bufmut_for_next_arg(6);
1026            c1_writer.put_slice(b"foo");
1027            c1_writer.put_slice(b"bar");
1028        }
1029        let v1 = c1.get_packed_command();
1030
1031        let mut c2 = Cmd::new();
1032        c2.write_arg(b"foobar");
1033        let v2 = c2.get_packed_command();
1034
1035        assert_eq!(v1, v2);
1036    }
1037
1038    #[cfg(feature = "bytes")]
1039    /// Test that multiple writers to the same command produce the same
1040    /// result as the same multiple calls to `write_arg`
1041    #[test]
1042    fn test_cmd_bufmut_for_next_arg_multiple() {
1043        let mut c1 = Cmd::new();
1044        {
1045            let mut c1_writer = c1.bufmut_for_next_arg(6);
1046            c1_writer.put_slice(b"foo");
1047            c1_writer.put_slice(b"bar");
1048        }
1049        {
1050            let mut c1_writer = c1.bufmut_for_next_arg(6);
1051            c1_writer.put_slice(b"baz");
1052            c1_writer.put_slice(b"qux");
1053        }
1054        let v1 = c1.get_packed_command();
1055
1056        let mut c2 = Cmd::new();
1057        c2.write_arg(b"foobar");
1058        c2.write_arg(b"bazqux");
1059        let v2 = c2.get_packed_command();
1060
1061        assert_eq!(v1, v2);
1062    }
1063
1064    #[cfg(feature = "bytes")]
1065    /// Test that an "empty" write produces the equivalent to `write_arg(b"")`
1066    #[test]
1067    fn test_cmd_bufmut_for_next_arg_empty() {
1068        let mut c1 = Cmd::new();
1069        {
1070            let _c1_writer = c1.bufmut_for_next_arg(0);
1071        }
1072        let v1 = c1.get_packed_command();
1073
1074        let mut c2 = Cmd::new();
1075        c2.write_arg(b"");
1076        let v2 = c2.get_packed_command();
1077
1078        assert_eq!(v1, v2);
1079    }
1080
1081    #[test]
1082    #[cfg(feature = "cluster")]
1083    fn test_cmd_arg_idx() {
1084        let mut c = Cmd::new();
1085        assert_eq!(c.arg_idx(0), None);
1086
1087        c.arg("SET");
1088        assert_eq!(c.arg_idx(0), Some(&b"SET"[..]));
1089        assert_eq!(c.arg_idx(1), None);
1090
1091        c.arg("foo").arg("42");
1092        assert_eq!(c.arg_idx(1), Some(&b"foo"[..]));
1093        assert_eq!(c.arg_idx(2), Some(&b"42"[..]));
1094        assert_eq!(c.arg_idx(3), None);
1095        assert_eq!(c.arg_idx(4), None);
1096    }
1097}