redis/
pipeline.rs

1#![macro_use]
2
3#[cfg(feature = "cache-aio")]
4use crate::cmd::CommandCacheConfig;
5use crate::cmd::{cmd, cmd_len, Cmd};
6use crate::connection::ConnectionLike;
7use crate::types::{
8    from_owned_redis_value, ErrorKind, FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value,
9};
10
11/// Represents a redis command pipeline.
12#[derive(Clone)]
13pub struct Pipeline {
14    pub(crate) commands: Vec<Cmd>,
15    pub(crate) transaction_mode: bool,
16    pub(crate) ignored_commands: HashSet<usize>,
17}
18
19/// A pipeline allows you to send multiple commands in one go to the
20/// redis server.  API wise it's very similar to just using a command
21/// but it allows multiple commands to be chained and some features such
22/// as iteration are not available.
23///
24/// Basic example:
25///
26/// ```rust,no_run
27/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
28/// # let mut con = client.get_connection().unwrap();
29/// let ((k1, k2),) : ((i32, i32),) = redis::pipe()
30///     .cmd("SET").arg("key_1").arg(42).ignore()
31///     .cmd("SET").arg("key_2").arg(43).ignore()
32///     .cmd("MGET").arg(&["key_1", "key_2"]).query(&mut con).unwrap();
33/// ```
34///
35/// As you can see with `cmd` you can start a new command.  By default
36/// each command produces a value but for some you can ignore them by
37/// calling `ignore` on the command.  That way it will be skipped in the
38/// return value which is useful for `SET` commands and others, which
39/// do not have a useful return value.
40impl Pipeline {
41    /// Creates an empty pipeline.  For consistency with the `cmd`
42    /// api a `pipe` function is provided as alias.
43    pub fn new() -> Pipeline {
44        Self::with_capacity(0)
45    }
46
47    /// Creates an empty pipeline with pre-allocated capacity.
48    pub fn with_capacity(capacity: usize) -> Pipeline {
49        Pipeline {
50            commands: Vec::with_capacity(capacity),
51            transaction_mode: false,
52            ignored_commands: HashSet::new(),
53        }
54    }
55
56    /// This enables atomic mode.  In atomic mode the whole pipeline is
57    /// enclosed in `MULTI`/`EXEC`.  From the user's point of view nothing
58    /// changes however.  This is easier than using `MULTI`/`EXEC` yourself
59    /// as the format does not change.
60    ///
61    /// ```rust,no_run
62    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
63    /// # let mut con = client.get_connection().unwrap();
64    /// let (k1, k2) : (i32, i32) = redis::pipe()
65    ///     .atomic()
66    ///     .cmd("GET").arg("key_1")
67    ///     .cmd("GET").arg("key_2").query(&mut con).unwrap();
68    /// ```
69    #[inline]
70    pub fn atomic(&mut self) -> &mut Pipeline {
71        self.transaction_mode = true;
72        self
73    }
74
75    #[cfg(feature = "aio")]
76    pub(crate) fn is_transaction(&self) -> bool {
77        self.transaction_mode
78    }
79
80    /// Returns the encoded pipeline commands.
81    pub fn get_packed_pipeline(&self) -> Vec<u8> {
82        encode_pipeline(&self.commands, self.transaction_mode)
83    }
84
85    /// Returns the number of commands currently queued by the usr in the pipeline.
86    ///
87    /// Depending on its configuration (e.g. `atomic`), the pipeline may send more commands to the server than the returned length
88    pub fn len(&self) -> usize {
89        self.commands.len()
90    }
91
92    /// Returns `true` is the pipeline contains no elements.
93    pub fn is_empty(&self) -> bool {
94        self.commands.is_empty()
95    }
96
97    fn execute_pipelined(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
98        self.make_pipeline_results(con.req_packed_commands(
99            &encode_pipeline(&self.commands, false),
100            0,
101            self.commands.len(),
102        )?)
103    }
104
105    fn execute_transaction(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
106        let mut resp = con.req_packed_commands(
107            &encode_pipeline(&self.commands, true),
108            self.commands.len() + 1,
109            1,
110        )?;
111
112        match resp.pop() {
113            Some(Value::Nil) => Ok(Value::Nil),
114            Some(Value::Array(items)) => self.make_pipeline_results(items),
115            _ => fail!((
116                ErrorKind::ResponseError,
117                "Invalid response when parsing multi response"
118            )),
119        }
120    }
121
122    /// Executes the pipeline and fetches the return values.  Since most
123    /// pipelines return different types it's recommended to use tuple
124    /// matching to process the results:
125    ///
126    /// ```rust,no_run
127    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
128    /// # let mut con = client.get_connection().unwrap();
129    /// let (k1, k2) : (i32, i32) = redis::pipe()
130    ///     .cmd("SET").arg("key_1").arg(42).ignore()
131    ///     .cmd("SET").arg("key_2").arg(43).ignore()
132    ///     .cmd("GET").arg("key_1")
133    ///     .cmd("GET").arg("key_2").query(&mut con).unwrap();
134    /// ```
135    ///
136    /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
137    ///       to them. In order to clear a Pipeline object with minimal memory released/allocated,
138    ///       it is necessary to call the `clear()` before inserting new commands.
139    #[inline]
140    pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
141        if !con.supports_pipelining() {
142            fail!((
143                ErrorKind::ResponseError,
144                "This connection does not support pipelining."
145            ));
146        }
147        let value = if self.commands.is_empty() {
148            Value::Array(vec![])
149        } else if self.transaction_mode {
150            self.execute_transaction(con)?
151        } else {
152            self.execute_pipelined(con)?
153        };
154
155        from_owned_redis_value(value.extract_error()?)
156    }
157
158    #[cfg(feature = "aio")]
159    async fn execute_pipelined_async<C>(&self, con: &mut C) -> RedisResult<Value>
160    where
161        C: crate::aio::ConnectionLike,
162    {
163        let value = con
164            .req_packed_commands(self, 0, self.commands.len())
165            .await?;
166        self.make_pipeline_results(value)
167    }
168
169    #[cfg(feature = "aio")]
170    async fn execute_transaction_async<C>(&self, con: &mut C) -> RedisResult<Value>
171    where
172        C: crate::aio::ConnectionLike,
173    {
174        let mut resp = con
175            .req_packed_commands(self, self.commands.len() + 1, 1)
176            .await?;
177        match resp.pop() {
178            Some(Value::Nil) => Ok(Value::Nil),
179            Some(Value::Array(items)) => self.make_pipeline_results(items),
180            _ => Err((
181                ErrorKind::ResponseError,
182                "Invalid response when parsing multi response",
183            )
184                .into()),
185        }
186    }
187
188    /// Async version of `query`.
189    #[inline]
190    #[cfg(feature = "aio")]
191    pub async fn query_async<T: FromRedisValue>(
192        &self,
193        con: &mut impl crate::aio::ConnectionLike,
194    ) -> RedisResult<T> {
195        let value = if self.commands.is_empty() {
196            return from_owned_redis_value(Value::Array(vec![]));
197        } else if self.transaction_mode {
198            self.execute_transaction_async(con).await?
199        } else {
200            self.execute_pipelined_async(con).await?
201        };
202        from_owned_redis_value(value.extract_error()?)
203    }
204
205    /// This is a shortcut to `query()` that does not return a value and
206    /// will fail the task if the query of the pipeline fails.
207    ///
208    /// This is equivalent to a call of query like this:
209    ///
210    /// ```rust,no_run
211    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
212    /// # let mut con = client.get_connection().unwrap();
213    /// redis::pipe().cmd("PING").query::<()>(&mut con).unwrap();
214    /// ```
215    ///
216    /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
217    ///       to them. In order to clear a Pipeline object with minimal memory released/allocated,
218    ///       it is necessary to call the `clear()` before inserting new commands.
219    #[inline]
220    #[deprecated(note = "Use Cmd::exec + unwrap, instead")]
221    pub fn execute(&self, con: &mut dyn ConnectionLike) {
222        self.exec(con).unwrap();
223    }
224
225    /// This is an alternative to `query`` that can be used if you want to be able to handle a
226    /// command's success or failure but don't care about the command's response. For example,
227    /// this is useful for "SET" commands for which the response's content is not important.
228    /// It avoids the need to define generic bounds for ().
229    #[inline]
230    pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
231        self.query::<()>(con)
232    }
233
234    /// This is an alternative to `query_async` that can be used if you want to be able to handle a
235    /// command's success or failure but don't care about the command's response. For example,
236    /// this is useful for "SET" commands for which the response's content is not important.
237    /// It avoids the need to define generic bounds for ().
238    #[cfg(feature = "aio")]
239    pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
240        self.query_async::<()>(con).await
241    }
242}
243
244fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
245    let mut rv = vec![];
246    write_pipeline(&mut rv, cmds, atomic);
247    rv
248}
249
250fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Cmd], atomic: bool) {
251    let cmds_len = cmds.iter().map(cmd_len).sum();
252
253    if atomic {
254        let multi = cmd("MULTI");
255        let exec = cmd("EXEC");
256        rv.reserve(cmd_len(&multi) + cmd_len(&exec) + cmds_len);
257
258        multi.write_packed_command_preallocated(rv);
259        for cmd in cmds {
260            cmd.write_packed_command_preallocated(rv);
261        }
262        exec.write_packed_command_preallocated(rv);
263    } else {
264        rv.reserve(cmds_len);
265
266        for cmd in cmds {
267            cmd.write_packed_command_preallocated(rv);
268        }
269    }
270}
271
272// Macro to implement shared methods between Pipeline and ClusterPipeline
273macro_rules! implement_pipeline_commands {
274    ($struct_name:ident) => {
275        impl $struct_name {
276            /// Adds a command to the cluster pipeline.
277            #[inline]
278            pub fn add_command(&mut self, cmd: Cmd) -> &mut Self {
279                self.commands.push(cmd);
280                self
281            }
282
283            /// Starts a new command. Functions such as `arg` then become
284            /// available to add more arguments to that command.
285            #[inline]
286            pub fn cmd(&mut self, name: &str) -> &mut Self {
287                self.add_command(cmd(name))
288            }
289
290            /// Returns an iterator over all the commands currently in this pipeline
291            pub fn cmd_iter(&self) -> impl Iterator<Item = &Cmd> {
292                self.commands.iter()
293            }
294
295            /// Instructs the pipeline to ignore the return value of this command.
296            /// It will still be ensured that it is not an error, but any successful
297            /// result is just thrown away.  This makes result processing through
298            /// tuples much easier because you do not need to handle all the items
299            /// you do not care about.
300            #[inline]
301            pub fn ignore(&mut self) -> &mut Self {
302                match self.commands.len() {
303                    0 => true,
304                    x => self.ignored_commands.insert(x - 1),
305                };
306                self
307            }
308
309            /// Adds an argument to the last started command. This works similar
310            /// to the `arg` method of the `Cmd` object.
311            ///
312            /// Note that this function fails the task if executed on an empty pipeline.
313            #[inline]
314            pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Self {
315                {
316                    let cmd = self.get_last_command();
317                    cmd.arg(arg);
318                }
319                self
320            }
321
322            /// Clear a pipeline object's internal data structure.
323            ///
324            /// This allows reusing a pipeline object as a clear object while performing a minimal
325            /// amount of memory released/reallocated.
326            #[inline]
327            pub fn clear(&mut self) {
328                self.commands.clear();
329                self.ignored_commands.clear();
330            }
331
332            #[inline]
333            fn get_last_command(&mut self) -> &mut Cmd {
334                let idx = match self.commands.len() {
335                    0 => panic!("No command on stack"),
336                    x => x - 1,
337                };
338                &mut self.commands[idx]
339            }
340
341            fn make_pipeline_results(&self, resp: Vec<Value>) -> RedisResult<Value> {
342                let resp = Value::extract_error_vec(resp)?;
343
344                let mut rv = Vec::with_capacity(resp.len() - self.ignored_commands.len());
345                for (idx, result) in resp.into_iter().enumerate() {
346                    if !self.ignored_commands.contains(&idx) {
347                        rv.push(result);
348                    }
349                }
350                Ok(Value::Array(rv))
351            }
352        }
353
354        impl Default for $struct_name {
355            fn default() -> Self {
356                Self::new()
357            }
358        }
359    };
360}
361
362implement_pipeline_commands!(Pipeline);
363
364// Defines caching related functions for Pipeline, ClusterPipeline isn't supported yet.
365impl Pipeline {
366    /// Changes caching behaviour for latest command in the pipeline.
367    #[cfg(feature = "cache-aio")]
368    #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
369    pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Self {
370        let cmd = self.get_last_command();
371        cmd.set_cache_config(command_cache_config);
372        self
373    }
374}