redis/pipeline.rs
1#![macro_use]
2
3#[cfg(feature = "cache-aio")]
4use crate::cmd::CommandCacheConfig;
5use crate::cmd::{cmd, cmd_len, Cmd};
6use crate::connection::ConnectionLike;
7use crate::errors::ErrorKind;
8use crate::types::{from_redis_value, FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value};
9
10/// Represents a redis command pipeline.
11#[derive(Clone)]
12pub struct Pipeline {
13 pub(crate) commands: Vec<Cmd>,
14 pub(crate) transaction_mode: bool,
15 pub(crate) ignored_commands: HashSet<usize>,
16 pub(crate) ignore_errors: bool,
17}
18
19/// A pipeline allows you to send multiple commands in one go to the
20/// redis server. API wise it's very similar to just using a command
21/// but it allows multiple commands to be chained and some features such
22/// as iteration are not available.
23///
24/// Basic example:
25///
26/// ```rust,no_run
27/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
28/// # let mut con = client.get_connection().unwrap();
29/// let ((k1, k2),) : ((i32, i32),) = redis::pipe()
30/// .cmd("SET").arg("key_1").arg(42).ignore()
31/// .cmd("SET").arg("key_2").arg(43).ignore()
32/// .cmd("MGET").arg(&["key_1", "key_2"]).query(&mut con).unwrap();
33/// ```
34///
35/// As you can see with `cmd` you can start a new command. By default
36/// each command produces a value but for some you can ignore them by
37/// calling `ignore` on the command. That way it will be skipped in the
38/// return value which is useful for `SET` commands and others, which
39/// do not have a useful return value.
40impl Pipeline {
41 /// Creates an empty pipeline. For consistency with the `cmd`
42 /// api a `pipe` function is provided as alias.
43 pub fn new() -> Pipeline {
44 Self::with_capacity(0)
45 }
46
47 /// Creates an empty pipeline with pre-allocated capacity.
48 pub fn with_capacity(capacity: usize) -> Pipeline {
49 Pipeline {
50 commands: Vec::with_capacity(capacity),
51 transaction_mode: false,
52 ignored_commands: HashSet::new(),
53 ignore_errors: false,
54 }
55 }
56
57 /// This enables atomic mode. In atomic mode the whole pipeline is
58 /// enclosed in `MULTI`/`EXEC`. From the user's point of view nothing
59 /// changes however. This is easier than using `MULTI`/`EXEC` yourself
60 /// as the format does not change.
61 ///
62 /// ```rust,no_run
63 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
64 /// # let mut con = client.get_connection().unwrap();
65 /// let (k1, k2) : (i32, i32) = redis::pipe()
66 /// .atomic()
67 /// .cmd("GET").arg("key_1")
68 /// .cmd("GET").arg("key_2").query(&mut con).unwrap();
69 /// ```
70 #[inline]
71 pub fn atomic(&mut self) -> &mut Pipeline {
72 self.transaction_mode = true;
73 self
74 }
75
76 /// Configures the pipeline to return partial results even if some commands fail.
77 ///
78 /// By default, if any command within the pipeline returns an error from the Redis server,
79 /// the `query()` will only return errors, and not the results of the successful calls.
80 ///
81 /// When set:
82 /// 1. The pipeline executes all commands.
83 /// 2. `query()` returns `Ok` (unless there is a connection/IO error).
84 /// 3. The results of individual commands are returned in the collection. If a command failed,
85 /// its corresponding element in the collection will be an `Err`.
86 ///
87 /// This allows handling partial successes where some commands succeed and others fail.
88 ///
89 /// **Note on ignored commands:** If you use `.ignore()` on a command, its result is discarded
90 /// regardless of success or failure. If `ignore_errors()` is set, errors from ignored
91 /// commands are also silently discarded and do not affect the returned result.
92 ///
93 /// **Note on Transactions (`atomic()`):** When used with `atomic()`, this method allows you to inspect
94 /// the results of individual commands within the transaction. Redis transactions do not rollback on
95 /// response errors (like `Path .path not exists`), so some commands may succeed while others fail. Using `ignore_errors()`
96 /// enables you to see which commands in the transaction succeeded and which failed, rather than receiving
97 /// a single aggregate error for the entire transaction.
98 ///
99 /// **Return Type:** When enabling this, you must use a collection of `RedisResult<T>`
100 /// (e.g., `Vec<RedisResult<T>>`) or `Value` to capture both successes and failures.
101 ///
102 /// ```rust,no_run
103 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
104 /// # let mut con = client.get_connection().unwrap();
105 /// let mut pipe = redis::pipe();
106 /// pipe.set("key_a", 1)
107 /// .hset("key_a", "field", "val") // This will fail (WRONGTYPE)
108 /// .get("key_a");
109 ///
110 /// // Note the return type: Vec<RedisResult<i32>>
111 /// let results: Vec<redis::RedisResult<i32>> = pipe
112 /// .ignore_errors()
113 /// .query(&mut con)
114 /// .unwrap(); // The query itself succeeds
115 ///
116 /// assert!(results[0].is_ok()); // set succeeded
117 /// assert!(results[1].is_err()); // hset failed
118 /// assert!(results[2].is_ok()); // get succeeded
119 /// ```
120 #[inline]
121 pub fn ignore_errors(&mut self) -> &mut Pipeline {
122 self.ignore_errors = true;
123 self
124 }
125
126 /// Returns `true` if the pipeline is in transaction mode (aka atomic mode).
127 pub fn is_transaction(&self) -> bool {
128 self.transaction_mode
129 }
130
131 /// Returns the encoded pipeline commands.
132 pub fn get_packed_pipeline(&self) -> Vec<u8> {
133 encode_pipeline(&self.commands, self.transaction_mode)
134 }
135
136 /// Returns the number of commands currently queued by the usr in the pipeline.
137 ///
138 /// Depending on its configuration (e.g. `atomic`), the pipeline may send more commands to the server than the returned length
139 pub fn len(&self) -> usize {
140 self.commands.len()
141 }
142
143 /// Returns `true` is the pipeline contains no elements.
144 pub fn is_empty(&self) -> bool {
145 self.commands.is_empty()
146 }
147
148 /// Executes the pipeline and fetches the return values. Since most
149 /// pipelines return different types it's recommended to use tuple
150 /// matching to process the results:
151 ///
152 /// ```rust,no_run
153 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
154 /// # let mut con = client.get_connection().unwrap();
155 /// let (k1, k2) : (i32, i32) = redis::pipe()
156 /// .cmd("SET").arg("key_1").arg(42).ignore()
157 /// .cmd("SET").arg("key_2").arg(43).ignore()
158 /// .cmd("GET").arg("key_1")
159 /// .cmd("GET").arg("key_2").query(&mut con).unwrap();
160 /// ```
161 ///
162 /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
163 /// to them. In order to clear a Pipeline object with minimal memory released/allocated,
164 /// it is necessary to call the `clear()` before inserting new commands.
165 ///
166 /// NOTE: This method returns a collection of results, even when there's only a single response.
167 /// Make sure to wrap single-result values in a collection. For example:
168 ///
169 /// ```rust,no_run
170 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
171 /// # let mut con = client.get_connection().unwrap();
172 /// let (k1,): (i32,) = redis::pipe()
173 /// .cmd("SET").arg("key_1").arg(42).ignore()
174 /// .cmd("GET").arg("key_1").query(&mut con).unwrap();
175 /// ```
176 #[inline]
177 pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
178 if !con.supports_pipelining() {
179 fail!((
180 ErrorKind::Client,
181 "This connection does not support pipelining."
182 ));
183 }
184
185 let response = if self.transaction_mode {
186 con.req_packed_commands(
187 &encode_pipeline(&self.commands, true),
188 self.commands.len() + 1,
189 1,
190 )?
191 } else {
192 con.req_packed_commands(
193 &encode_pipeline(&self.commands, false),
194 0,
195 self.commands.len(),
196 )?
197 };
198
199 self.complete_request(response)
200 }
201
202 /// Async version of [Self::query].
203 #[inline]
204 #[cfg(feature = "aio")]
205 pub async fn query_async<T: FromRedisValue>(
206 &self,
207 con: &mut impl crate::aio::ConnectionLike,
208 ) -> RedisResult<T> {
209 let response = if self.transaction_mode {
210 con.req_packed_commands(self, self.commands.len() + 1, 1)
211 .await?
212 } else {
213 con.req_packed_commands(self, 0, self.commands.len())
214 .await?
215 };
216
217 self.complete_request(response)
218 }
219
220 /// This is an alternative to [Self::query] that can be used if you want to be able to handle a
221 /// command's success or failure but don't care about the command's response. For example,
222 /// this is useful for "SET" commands for which the response's content is not important.
223 /// It avoids the need to define generic bounds for ().
224 #[inline]
225 pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
226 self.query::<()>(con)
227 }
228
229 /// This is an alternative to [Self::query_async] that can be used if you want to be able to handle a
230 /// command's success or failure but don't care about the command's response. For example,
231 /// this is useful for "SET" commands for which the response's content is not important.
232 /// It avoids the need to define generic bounds for ().
233 #[cfg(feature = "aio")]
234 pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
235 self.query_async::<()>(con).await
236 }
237
238 fn complete_request<T: FromRedisValue>(&self, mut response: Vec<Value>) -> RedisResult<T> {
239 let response = if self.is_transaction() {
240 match response.pop() {
241 Some(Value::Nil) => {
242 return Ok(from_redis_value(Value::Nil)?);
243 }
244 Some(Value::Array(items)) => items,
245 _ => {
246 return Err((
247 ErrorKind::UnexpectedReturnType,
248 "Invalid response when parsing multi response",
249 )
250 .into());
251 }
252 }
253 } else {
254 response
255 };
256
257 self.compose_response(response)
258 }
259}
260
261fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
262 let mut rv = vec![];
263 write_pipeline(&mut rv, cmds, atomic);
264 rv
265}
266
267fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Cmd], atomic: bool) {
268 let cmds_len = cmds.iter().map(cmd_len).sum();
269
270 if atomic {
271 let multi = cmd("MULTI");
272 let exec = cmd("EXEC");
273 rv.reserve(cmd_len(&multi) + cmd_len(&exec) + cmds_len);
274
275 multi.write_packed_command_preallocated(rv);
276 for cmd in cmds {
277 cmd.write_packed_command_preallocated(rv);
278 }
279 exec.write_packed_command_preallocated(rv);
280 } else {
281 rv.reserve(cmds_len);
282
283 for cmd in cmds {
284 cmd.write_packed_command_preallocated(rv);
285 }
286 }
287}
288
289// Macro to implement shared methods between Pipeline and ClusterPipeline
290macro_rules! implement_pipeline_commands {
291 ($struct_name:ident) => {
292 impl $struct_name {
293 /// Adds a command to the cluster pipeline.
294 #[inline]
295 pub fn add_command(&mut self, cmd: Cmd) -> &mut Self {
296 self.commands.push(cmd);
297 self
298 }
299
300 /// Starts a new command. Functions such as `arg` then become
301 /// available to add more arguments to that command.
302 #[inline]
303 pub fn cmd(&mut self, name: &str) -> &mut Self {
304 self.add_command(cmd(name))
305 }
306
307 /// Returns an iterator over all the commands currently in this pipeline
308 pub fn cmd_iter(&self) -> impl Iterator<Item = &Cmd> {
309 self.commands.iter()
310 }
311
312 /// Instructs the pipeline to ignore the return value of this command.
313 ///
314 /// On any successful result the value from this command is thrown away.
315 /// This makes result processing through tuples much easier because you
316 /// do not need to handle all the items you do not care about.
317 /// If any command received an error from the server, no result will be ignored,
318 /// so that the user could retrace which command failed.
319 #[inline]
320 pub fn ignore(&mut self) -> &mut Self {
321 match self.commands.len() {
322 0 => true,
323 x => self.ignored_commands.insert(x - 1),
324 };
325 self
326 }
327
328 /// Adds an argument to the last started command. This works similar
329 /// to the `arg` method of the `Cmd` object.
330 ///
331 /// Note that this function fails the task if executed on an empty pipeline.
332 #[inline]
333 pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Self {
334 {
335 let cmd = self.get_last_command();
336 cmd.arg(arg);
337 }
338 self
339 }
340
341 /// Clear a pipeline object's internal data structure.
342 ///
343 /// This allows reusing a pipeline object as a clear object while performing a minimal
344 /// amount of memory released/reallocated.
345 #[inline]
346 pub fn clear(&mut self) {
347 self.commands.clear();
348 self.ignored_commands.clear();
349 }
350
351 #[inline]
352 fn get_last_command(&mut self) -> &mut Cmd {
353 let idx = match self.commands.len() {
354 0 => panic!("No command on stack"),
355 x => x - 1,
356 };
357 &mut self.commands[idx]
358 }
359
360 fn filter_ignored_results(&self, resp: Vec<Value>) -> Vec<Value> {
361 resp.into_iter()
362 .enumerate()
363 .filter_map(|(index, result)| {
364 (!self.ignored_commands.contains(&index)).then(|| result)
365 })
366 .collect()
367 }
368
369 fn compose_response<T: FromRedisValue>(&self, response: Vec<Value>) -> RedisResult<T> {
370 if self.ignore_errors {
371 return Ok(from_redis_value(Value::Array(
372 self.filter_ignored_results(response),
373 ))?);
374 }
375
376 let server_errors: Vec<_> = response
377 .iter()
378 .enumerate()
379 .filter_map(|(index, value)| match value {
380 Value::ServerError(error) => Some((index, error.clone())),
381 _ => None,
382 })
383 .collect();
384
385 if server_errors.is_empty() {
386 Ok(from_redis_value(
387 Value::Array(self.filter_ignored_results(response)).extract_error()?,
388 )?)
389 } else {
390 Err(crate::RedisError::pipeline(server_errors))
391 }
392 }
393 }
394
395 impl Default for $struct_name {
396 fn default() -> Self {
397 Self::new()
398 }
399 }
400 };
401}
402
403implement_pipeline_commands!(Pipeline);
404
405// Defines caching related functions for Pipeline, ClusterPipeline isn't supported yet.
406impl Pipeline {
407 /// Changes caching behaviour for latest command in the pipeline.
408 #[cfg(feature = "cache-aio")]
409 #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
410 pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Self {
411 let cmd = self.get_last_command();
412 cmd.set_cache_config(command_cache_config);
413 self
414 }
415
416 #[cfg(feature = "cluster-async")]
417 pub(crate) fn into_cmd_iter(self) -> impl Iterator<Item = Cmd> {
418 self.commands.into_iter()
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use crate::{
425 errors::{RedisError, Repr, ServerError},
426 pipe, ServerErrorKind,
427 };
428
429 use super::*;
430
431 fn test_pipe() -> Pipeline {
432 let mut pipeline = pipe();
433 pipeline
434 .cmd("FOO")
435 .cmd("BAR")
436 .ignore()
437 .cmd("baz")
438 .ignore()
439 .cmd("barvaz");
440 pipeline
441 }
442
443 fn server_error() -> Value {
444 Value::ServerError(ServerError(Repr::Known {
445 kind: ServerErrorKind::CrossSlot,
446 detail: None,
447 }))
448 }
449
450 #[test]
451 fn test_pipeline_passes_values_only_from_non_ignored_commands() {
452 let pipeline = test_pipe();
453 let inputs = vec![Value::Int(1), Value::Int(2), Value::Int(3), Value::Okay];
454 let result = pipeline.complete_request(inputs);
455
456 let expected = vec!["1".to_string(), "OK".to_string()];
457 assert_eq!(result, Ok(expected));
458 }
459
460 #[test]
461 fn test_pipeline_passes_errors_from_ignored_commands() {
462 let pipeline = test_pipe();
463 let inputs = vec![Value::Okay, server_error(), Value::Okay, server_error()];
464 let error = pipeline.compose_response::<Vec<Value>>(inputs).unwrap_err();
465 let error_message = error.to_string();
466
467 assert!(error_message.contains("Index 1"), "{error_message}");
468 assert!(error_message.contains("Index 3"), "{error_message}");
469 }
470
471 #[test]
472 fn test_pipeline_passes_response_from_each_commands_despite_errors() {
473 let mut pipeline = test_pipe();
474 let inputs = vec![Value::Okay, server_error(), Value::Okay, server_error()];
475 let result = pipeline
476 .ignore_errors()
477 .compose_response::<Vec<RedisResult<Value>>>(inputs)
478 .unwrap();
479
480 assert_eq!(result[0], Ok::<Value, RedisError>(Value::Okay));
481 assert!(result[1]
482 .clone()
483 .is_err_and(|e| e.to_string().contains("CrossSlot")))
484 }
485}