redis/
pipeline.rs

1#![macro_use]
2
3#[cfg(feature = "cache-aio")]
4use crate::cmd::CommandCacheConfig;
5use crate::cmd::{cmd, cmd_len, Cmd};
6use crate::connection::ConnectionLike;
7use crate::errors::ErrorKind;
8use crate::types::{from_redis_value, FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value};
9
10/// Represents a redis command pipeline.
11#[derive(Clone)]
12pub struct Pipeline {
13    pub(crate) commands: Vec<Cmd>,
14    pub(crate) transaction_mode: bool,
15    pub(crate) ignored_commands: HashSet<usize>,
16    pub(crate) ignore_errors: bool,
17}
18
19/// A pipeline allows you to send multiple commands in one go to the
20/// redis server.  API wise it's very similar to just using a command
21/// but it allows multiple commands to be chained and some features such
22/// as iteration are not available.
23///
24/// Basic example:
25///
26/// ```rust,no_run
27/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
28/// # let mut con = client.get_connection().unwrap();
29/// let ((k1, k2),) : ((i32, i32),) = redis::pipe()
30///     .cmd("SET").arg("key_1").arg(42).ignore()
31///     .cmd("SET").arg("key_2").arg(43).ignore()
32///     .cmd("MGET").arg(&["key_1", "key_2"]).query(&mut con).unwrap();
33/// ```
34///
35/// As you can see with `cmd` you can start a new command.  By default
36/// each command produces a value but for some you can ignore them by
37/// calling `ignore` on the command.  That way it will be skipped in the
38/// return value which is useful for `SET` commands and others, which
39/// do not have a useful return value.
40impl Pipeline {
41    /// Creates an empty pipeline.  For consistency with the `cmd`
42    /// api a `pipe` function is provided as alias.
43    pub fn new() -> Pipeline {
44        Self::with_capacity(0)
45    }
46
47    /// Creates an empty pipeline with pre-allocated capacity.
48    pub fn with_capacity(capacity: usize) -> Pipeline {
49        Pipeline {
50            commands: Vec::with_capacity(capacity),
51            transaction_mode: false,
52            ignored_commands: HashSet::new(),
53            ignore_errors: false,
54        }
55    }
56
57    /// This enables atomic mode.  In atomic mode the whole pipeline is
58    /// enclosed in `MULTI`/`EXEC`.  From the user's point of view nothing
59    /// changes however.  This is easier than using `MULTI`/`EXEC` yourself
60    /// as the format does not change.
61    ///
62    /// ```rust,no_run
63    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
64    /// # let mut con = client.get_connection().unwrap();
65    /// let (k1, k2) : (i32, i32) = redis::pipe()
66    ///     .atomic()
67    ///     .cmd("GET").arg("key_1")
68    ///     .cmd("GET").arg("key_2").query(&mut con).unwrap();
69    /// ```
70    #[inline]
71    pub fn atomic(&mut self) -> &mut Pipeline {
72        self.transaction_mode = true;
73        self
74    }
75
76    /// Configures the pipeline to return partial results even if some commands fail.
77    ///
78    /// By default, if any command within the pipeline returns an error from the Redis server,
79    /// the `query()` will only return errors, and not the results of the successful calls.
80    ///
81    /// When set:
82    /// 1. The pipeline executes all commands.
83    /// 2. `query()` returns `Ok` (unless there is a connection/IO error).
84    /// 3. The results of individual commands are returned in the collection. If a command failed,
85    ///    its corresponding element in the collection will be an `Err`.
86    ///
87    /// This allows handling partial successes where some commands succeed and others fail.
88    ///
89    /// **Note on ignored commands:** If you use `.ignore()` on a command, its result is discarded
90    /// regardless of success or failure. If `ignore_errors()` is set, errors from ignored
91    /// commands are also silently discarded and do not affect the returned result.
92    ///
93    /// **Note on Transactions (`atomic()`):** When used with `atomic()`, this method allows you to inspect
94    /// the results of individual commands within the transaction. Redis transactions do not rollback on
95    /// response errors (like `Path .path not exists`), so some commands may succeed while others fail. Using `ignore_errors()`
96    /// enables you to see which commands in the transaction succeeded and which failed, rather than receiving
97    /// a single aggregate error for the entire transaction.
98    ///
99    /// **Return Type:** When enabling this, you must use a collection of `RedisResult<T>`
100    /// (e.g., `Vec<RedisResult<T>>`) or `Value` to capture both successes and failures.
101    ///
102    /// ```rust,no_run
103    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
104    /// # let mut con = client.get_connection().unwrap();
105    /// let mut pipe = redis::pipe();
106    /// pipe.set("key_a", 1)
107    ///     .hset("key_a", "field", "val") // This will fail (WRONGTYPE)
108    ///     .get("key_a");
109    ///
110    /// // Note the return type: Vec<RedisResult<i32>>
111    /// let results: Vec<redis::RedisResult<i32>> = pipe
112    ///     .ignore_errors()
113    ///     .query(&mut con)
114    ///     .unwrap(); // The query itself succeeds
115    ///
116    /// assert!(results[0].is_ok()); // set succeeded
117    /// assert!(results[1].is_err()); // hset failed
118    /// assert!(results[2].is_ok()); // get succeeded
119    /// ```
120    #[inline]
121    pub fn ignore_errors(&mut self) -> &mut Pipeline {
122        self.ignore_errors = true;
123        self
124    }
125
126    /// Returns `true` if the pipeline is in transaction mode (aka atomic mode).
127    pub fn is_transaction(&self) -> bool {
128        self.transaction_mode
129    }
130
131    /// Returns the encoded pipeline commands.
132    pub fn get_packed_pipeline(&self) -> Vec<u8> {
133        encode_pipeline(&self.commands, self.transaction_mode)
134    }
135
136    /// Returns the number of commands currently queued by the usr in the pipeline.
137    ///
138    /// Depending on its configuration (e.g. `atomic`), the pipeline may send more commands to the server than the returned length
139    pub fn len(&self) -> usize {
140        self.commands.len()
141    }
142
143    /// Returns `true` is the pipeline contains no elements.
144    pub fn is_empty(&self) -> bool {
145        self.commands.is_empty()
146    }
147
148    /// Executes the pipeline and fetches the return values.  Since most
149    /// pipelines return different types it's recommended to use tuple
150    /// matching to process the results:
151    ///
152    /// ```rust,no_run
153    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
154    /// # let mut con = client.get_connection().unwrap();
155    /// let (k1, k2) : (i32, i32) = redis::pipe()
156    ///     .cmd("SET").arg("key_1").arg(42).ignore()
157    ///     .cmd("SET").arg("key_2").arg(43).ignore()
158    ///     .cmd("GET").arg("key_1")
159    ///     .cmd("GET").arg("key_2").query(&mut con).unwrap();
160    /// ```
161    ///
162    /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
163    ///       to them. In order to clear a Pipeline object with minimal memory released/allocated,
164    ///       it is necessary to call the `clear()` before inserting new commands.
165    ///
166    /// NOTE: This method returns a collection of results, even when there's only a single response.
167    ///       Make sure to wrap single-result values in a collection. For example:
168    ///
169    /// ```rust,no_run
170    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
171    /// # let mut con = client.get_connection().unwrap();
172    /// let (k1,): (i32,) = redis::pipe()
173    ///     .cmd("SET").arg("key_1").arg(42).ignore()
174    ///     .cmd("GET").arg("key_1").query(&mut con).unwrap();
175    /// ```
176    #[inline]
177    pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
178        if !con.supports_pipelining() {
179            fail!((
180                ErrorKind::Client,
181                "This connection does not support pipelining."
182            ));
183        }
184
185        let response = if self.transaction_mode {
186            con.req_packed_commands(
187                &encode_pipeline(&self.commands, true),
188                self.commands.len() + 1,
189                1,
190            )?
191        } else {
192            con.req_packed_commands(
193                &encode_pipeline(&self.commands, false),
194                0,
195                self.commands.len(),
196            )?
197        };
198
199        self.complete_request(response)
200    }
201
202    /// Async version of [Self::query].
203    #[inline]
204    #[cfg(feature = "aio")]
205    pub async fn query_async<T: FromRedisValue>(
206        &self,
207        con: &mut impl crate::aio::ConnectionLike,
208    ) -> RedisResult<T> {
209        let response = if self.transaction_mode {
210            con.req_packed_commands(self, self.commands.len() + 1, 1)
211                .await?
212        } else {
213            con.req_packed_commands(self, 0, self.commands.len())
214                .await?
215        };
216
217        self.complete_request(response)
218    }
219
220    /// This is an alternative to [Self::query] that can be used if you want to be able to handle a
221    /// command's success or failure but don't care about the command's response. For example,
222    /// this is useful for "SET" commands for which the response's content is not important.
223    /// It avoids the need to define generic bounds for ().
224    #[inline]
225    pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
226        self.query::<()>(con)
227    }
228
229    /// This is an alternative to [Self::query_async] that can be used if you want to be able to handle a
230    /// command's success or failure but don't care about the command's response. For example,
231    /// this is useful for "SET" commands for which the response's content is not important.
232    /// It avoids the need to define generic bounds for ().
233    #[cfg(feature = "aio")]
234    pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
235        self.query_async::<()>(con).await
236    }
237
238    fn complete_request<T: FromRedisValue>(&self, mut response: Vec<Value>) -> RedisResult<T> {
239        let response = if self.is_transaction() {
240            match response.pop() {
241                Some(Value::Nil) => {
242                    return Ok(from_redis_value(Value::Nil)?);
243                }
244                Some(Value::Array(items)) => items,
245                _ => {
246                    return Err((
247                        ErrorKind::UnexpectedReturnType,
248                        "Invalid response when parsing multi response",
249                    )
250                        .into());
251                }
252            }
253        } else {
254            response
255        };
256
257        self.compose_response(response)
258    }
259}
260
261fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
262    let mut rv = vec![];
263    write_pipeline(&mut rv, cmds, atomic);
264    rv
265}
266
267fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Cmd], atomic: bool) {
268    let cmds_len = cmds.iter().map(cmd_len).sum();
269
270    if atomic {
271        let multi = cmd("MULTI");
272        let exec = cmd("EXEC");
273        rv.reserve(cmd_len(&multi) + cmd_len(&exec) + cmds_len);
274
275        multi.write_packed_command_preallocated(rv);
276        for cmd in cmds {
277            cmd.write_packed_command_preallocated(rv);
278        }
279        exec.write_packed_command_preallocated(rv);
280    } else {
281        rv.reserve(cmds_len);
282
283        for cmd in cmds {
284            cmd.write_packed_command_preallocated(rv);
285        }
286    }
287}
288
289// Macro to implement shared methods between Pipeline and ClusterPipeline
290macro_rules! implement_pipeline_commands {
291    ($struct_name:ident) => {
292        impl $struct_name {
293            /// Adds a command to the cluster pipeline.
294            #[inline]
295            pub fn add_command(&mut self, cmd: Cmd) -> &mut Self {
296                self.commands.push(cmd);
297                self
298            }
299
300            /// Starts a new command. Functions such as `arg` then become
301            /// available to add more arguments to that command.
302            #[inline]
303            pub fn cmd(&mut self, name: &str) -> &mut Self {
304                self.add_command(cmd(name))
305            }
306
307            /// Returns an iterator over all the commands currently in this pipeline
308            pub fn cmd_iter(&self) -> impl Iterator<Item = &Cmd> {
309                self.commands.iter()
310            }
311
312            /// Instructs the pipeline to ignore the return value of this command.
313            ///
314            /// On any successful result the value from this command is thrown away.
315            /// This makes result processing through tuples much easier because you
316            /// do not need to handle all the items you do not care about.
317            /// If any command received an error from the server, no result will be ignored,
318            /// so that the user could retrace which command failed.
319            #[inline]
320            pub fn ignore(&mut self) -> &mut Self {
321                match self.commands.len() {
322                    0 => true,
323                    x => self.ignored_commands.insert(x - 1),
324                };
325                self
326            }
327
328            /// Adds an argument to the last started command. This works similar
329            /// to the `arg` method of the `Cmd` object.
330            ///
331            /// Note that this function fails the task if executed on an empty pipeline.
332            #[inline]
333            pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Self {
334                {
335                    let cmd = self.get_last_command();
336                    cmd.arg(arg);
337                }
338                self
339            }
340
341            /// Clear a pipeline object's internal data structure.
342            ///
343            /// This allows reusing a pipeline object as a clear object while performing a minimal
344            /// amount of memory released/reallocated.
345            #[inline]
346            pub fn clear(&mut self) {
347                self.commands.clear();
348                self.ignored_commands.clear();
349            }
350
351            #[inline]
352            fn get_last_command(&mut self) -> &mut Cmd {
353                let idx = match self.commands.len() {
354                    0 => panic!("No command on stack"),
355                    x => x - 1,
356                };
357                &mut self.commands[idx]
358            }
359
360            fn filter_ignored_results(&self, resp: Vec<Value>) -> Vec<Value> {
361                resp.into_iter()
362                    .enumerate()
363                    .filter_map(|(index, result)| {
364                        (!self.ignored_commands.contains(&index)).then(|| result)
365                    })
366                    .collect()
367            }
368
369            fn compose_response<T: FromRedisValue>(&self, response: Vec<Value>) -> RedisResult<T> {
370                if self.ignore_errors {
371                    return Ok(from_redis_value(Value::Array(
372                        self.filter_ignored_results(response),
373                    ))?);
374                }
375
376                let server_errors: Vec<_> = response
377                    .iter()
378                    .enumerate()
379                    .filter_map(|(index, value)| match value {
380                        Value::ServerError(error) => Some((index, error.clone())),
381                        _ => None,
382                    })
383                    .collect();
384
385                if server_errors.is_empty() {
386                    Ok(from_redis_value(
387                        Value::Array(self.filter_ignored_results(response)).extract_error()?,
388                    )?)
389                } else {
390                    Err(crate::RedisError::pipeline(server_errors))
391                }
392            }
393        }
394
395        impl Default for $struct_name {
396            fn default() -> Self {
397                Self::new()
398            }
399        }
400    };
401}
402
403implement_pipeline_commands!(Pipeline);
404
405// Defines caching related functions for Pipeline, ClusterPipeline isn't supported yet.
406impl Pipeline {
407    /// Changes caching behaviour for latest command in the pipeline.
408    #[cfg(feature = "cache-aio")]
409    #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
410    pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Self {
411        let cmd = self.get_last_command();
412        cmd.set_cache_config(command_cache_config);
413        self
414    }
415
416    #[cfg(feature = "cluster-async")]
417    pub(crate) fn into_cmd_iter(self) -> impl Iterator<Item = Cmd> {
418        self.commands.into_iter()
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use crate::{
425        errors::{RedisError, Repr, ServerError},
426        pipe, ServerErrorKind,
427    };
428
429    use super::*;
430
431    fn test_pipe() -> Pipeline {
432        let mut pipeline = pipe();
433        pipeline
434            .cmd("FOO")
435            .cmd("BAR")
436            .ignore()
437            .cmd("baz")
438            .ignore()
439            .cmd("barvaz");
440        pipeline
441    }
442
443    fn server_error() -> Value {
444        Value::ServerError(ServerError(Repr::Known {
445            kind: ServerErrorKind::CrossSlot,
446            detail: None,
447        }))
448    }
449
450    #[test]
451    fn test_pipeline_passes_values_only_from_non_ignored_commands() {
452        let pipeline = test_pipe();
453        let inputs = vec![Value::Int(1), Value::Int(2), Value::Int(3), Value::Okay];
454        let result = pipeline.complete_request(inputs);
455
456        let expected = vec!["1".to_string(), "OK".to_string()];
457        assert_eq!(result, Ok(expected));
458    }
459
460    #[test]
461    fn test_pipeline_passes_errors_from_ignored_commands() {
462        let pipeline = test_pipe();
463        let inputs = vec![Value::Okay, server_error(), Value::Okay, server_error()];
464        let error = pipeline.compose_response::<Vec<Value>>(inputs).unwrap_err();
465        let error_message = error.to_string();
466
467        assert!(error_message.contains("Index 1"), "{error_message}");
468        assert!(error_message.contains("Index 3"), "{error_message}");
469    }
470
471    #[test]
472    fn test_pipeline_passes_response_from_each_commands_despite_errors() {
473        let mut pipeline = test_pipe();
474        let inputs = vec![Value::Okay, server_error(), Value::Okay, server_error()];
475        let result = pipeline
476            .ignore_errors()
477            .compose_response::<Vec<RedisResult<Value>>>(inputs)
478            .unwrap();
479
480        assert_eq!(result[0], Ok::<Value, RedisError>(Value::Okay));
481        assert!(result[1]
482            .clone()
483            .is_err_and(|e| e.to_string().contains("CrossSlot")))
484    }
485}