redis/pipeline.rs
1#![macro_use]
2
3#[cfg(feature = "cache-aio")]
4use crate::cmd::CommandCacheConfig;
5use crate::cmd::{cmd, cmd_len, Cmd};
6use crate::connection::ConnectionLike;
7use crate::types::{
8 from_owned_redis_value, ErrorKind, FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value,
9};
10
11/// Represents a redis command pipeline.
12#[derive(Clone)]
13pub struct Pipeline {
14 pub(crate) commands: Vec<Cmd>,
15 pub(crate) transaction_mode: bool,
16 pub(crate) ignored_commands: HashSet<usize>,
17}
18
19/// A pipeline allows you to send multiple commands in one go to the
20/// redis server. API wise it's very similar to just using a command
21/// but it allows multiple commands to be chained and some features such
22/// as iteration are not available.
23///
24/// Basic example:
25///
26/// ```rust,no_run
27/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
28/// # let mut con = client.get_connection().unwrap();
29/// let ((k1, k2),) : ((i32, i32),) = redis::pipe()
30/// .cmd("SET").arg("key_1").arg(42).ignore()
31/// .cmd("SET").arg("key_2").arg(43).ignore()
32/// .cmd("MGET").arg(&["key_1", "key_2"]).query(&mut con).unwrap();
33/// ```
34///
35/// As you can see with `cmd` you can start a new command. By default
36/// each command produces a value but for some you can ignore them by
37/// calling `ignore` on the command. That way it will be skipped in the
38/// return value which is useful for `SET` commands and others, which
39/// do not have a useful return value.
40impl Pipeline {
41 /// Creates an empty pipeline. For consistency with the `cmd`
42 /// api a `pipe` function is provided as alias.
43 pub fn new() -> Pipeline {
44 Self::with_capacity(0)
45 }
46
47 /// Creates an empty pipeline with pre-allocated capacity.
48 pub fn with_capacity(capacity: usize) -> Pipeline {
49 Pipeline {
50 commands: Vec::with_capacity(capacity),
51 transaction_mode: false,
52 ignored_commands: HashSet::new(),
53 }
54 }
55
56 /// This enables atomic mode. In atomic mode the whole pipeline is
57 /// enclosed in `MULTI`/`EXEC`. From the user's point of view nothing
58 /// changes however. This is easier than using `MULTI`/`EXEC` yourself
59 /// as the format does not change.
60 ///
61 /// ```rust,no_run
62 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
63 /// # let mut con = client.get_connection().unwrap();
64 /// let (k1, k2) : (i32, i32) = redis::pipe()
65 /// .atomic()
66 /// .cmd("GET").arg("key_1")
67 /// .cmd("GET").arg("key_2").query(&mut con).unwrap();
68 /// ```
69 #[inline]
70 pub fn atomic(&mut self) -> &mut Pipeline {
71 self.transaction_mode = true;
72 self
73 }
74
75 #[cfg(feature = "aio")]
76 pub(crate) fn is_transaction(&self) -> bool {
77 self.transaction_mode
78 }
79
80 /// Returns the encoded pipeline commands.
81 pub fn get_packed_pipeline(&self) -> Vec<u8> {
82 encode_pipeline(&self.commands, self.transaction_mode)
83 }
84
85 /// Returns the number of commands currently queued by the usr in the pipeline.
86 ///
87 /// Depending on its configuration (e.g. `atomic`), the pipeline may send more commands to the server than the returned length
88 pub fn len(&self) -> usize {
89 self.commands.len()
90 }
91
92 /// Returns `true` is the pipeline contains no elements.
93 pub fn is_empty(&self) -> bool {
94 self.commands.is_empty()
95 }
96
97 fn execute_pipelined(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
98 self.make_pipeline_results(con.req_packed_commands(
99 &encode_pipeline(&self.commands, false),
100 0,
101 self.commands.len(),
102 )?)
103 }
104
105 fn execute_transaction(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
106 let mut resp = con.req_packed_commands(
107 &encode_pipeline(&self.commands, true),
108 self.commands.len() + 1,
109 1,
110 )?;
111
112 match resp.pop() {
113 Some(Value::Nil) => Ok(Value::Nil),
114 Some(Value::Array(items)) => self.make_pipeline_results(items),
115 _ => fail!((
116 ErrorKind::ResponseError,
117 "Invalid response when parsing multi response"
118 )),
119 }
120 }
121
122 /// Executes the pipeline and fetches the return values. Since most
123 /// pipelines return different types it's recommended to use tuple
124 /// matching to process the results:
125 ///
126 /// ```rust,no_run
127 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
128 /// # let mut con = client.get_connection().unwrap();
129 /// let (k1, k2) : (i32, i32) = redis::pipe()
130 /// .cmd("SET").arg("key_1").arg(42).ignore()
131 /// .cmd("SET").arg("key_2").arg(43).ignore()
132 /// .cmd("GET").arg("key_1")
133 /// .cmd("GET").arg("key_2").query(&mut con).unwrap();
134 /// ```
135 ///
136 /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
137 /// to them. In order to clear a Pipeline object with minimal memory released/allocated,
138 /// it is necessary to call the `clear()` before inserting new commands.
139 #[inline]
140 pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
141 if !con.supports_pipelining() {
142 fail!((
143 ErrorKind::ResponseError,
144 "This connection does not support pipelining."
145 ));
146 }
147 let value = if self.commands.is_empty() {
148 Value::Array(vec![])
149 } else if self.transaction_mode {
150 self.execute_transaction(con)?
151 } else {
152 self.execute_pipelined(con)?
153 };
154
155 from_owned_redis_value(value.extract_error()?)
156 }
157
158 #[cfg(feature = "aio")]
159 async fn execute_pipelined_async<C>(&self, con: &mut C) -> RedisResult<Value>
160 where
161 C: crate::aio::ConnectionLike,
162 {
163 let value = con
164 .req_packed_commands(self, 0, self.commands.len())
165 .await?;
166 self.make_pipeline_results(value)
167 }
168
169 #[cfg(feature = "aio")]
170 async fn execute_transaction_async<C>(&self, con: &mut C) -> RedisResult<Value>
171 where
172 C: crate::aio::ConnectionLike,
173 {
174 let mut resp = con
175 .req_packed_commands(self, self.commands.len() + 1, 1)
176 .await?;
177 match resp.pop() {
178 Some(Value::Nil) => Ok(Value::Nil),
179 Some(Value::Array(items)) => self.make_pipeline_results(items),
180 _ => Err((
181 ErrorKind::ResponseError,
182 "Invalid response when parsing multi response",
183 )
184 .into()),
185 }
186 }
187
188 /// Async version of `query`.
189 #[inline]
190 #[cfg(feature = "aio")]
191 pub async fn query_async<T: FromRedisValue>(
192 &self,
193 con: &mut impl crate::aio::ConnectionLike,
194 ) -> RedisResult<T> {
195 let value = if self.commands.is_empty() {
196 return from_owned_redis_value(Value::Array(vec![]));
197 } else if self.transaction_mode {
198 self.execute_transaction_async(con).await?
199 } else {
200 self.execute_pipelined_async(con).await?
201 };
202 from_owned_redis_value(value.extract_error()?)
203 }
204
205 /// This is a shortcut to `query()` that does not return a value and
206 /// will fail the task if the query of the pipeline fails.
207 ///
208 /// This is equivalent to a call of query like this:
209 ///
210 /// ```rust,no_run
211 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
212 /// # let mut con = client.get_connection().unwrap();
213 /// redis::pipe().cmd("PING").query::<()>(&mut con).unwrap();
214 /// ```
215 ///
216 /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
217 /// to them. In order to clear a Pipeline object with minimal memory released/allocated,
218 /// it is necessary to call the `clear()` before inserting new commands.
219 #[inline]
220 #[deprecated(note = "Use Cmd::exec + unwrap, instead")]
221 pub fn execute(&self, con: &mut dyn ConnectionLike) {
222 self.exec(con).unwrap();
223 }
224
225 /// This is an alternative to `query`` that can be used if you want to be able to handle a
226 /// command's success or failure but don't care about the command's response. For example,
227 /// this is useful for "SET" commands for which the response's content is not important.
228 /// It avoids the need to define generic bounds for ().
229 #[inline]
230 pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
231 self.query::<()>(con)
232 }
233
234 /// This is an alternative to `query_async` that can be used if you want to be able to handle a
235 /// command's success or failure but don't care about the command's response. For example,
236 /// this is useful for "SET" commands for which the response's content is not important.
237 /// It avoids the need to define generic bounds for ().
238 #[cfg(feature = "aio")]
239 pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
240 self.query_async::<()>(con).await
241 }
242}
243
244fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
245 let mut rv = vec![];
246 write_pipeline(&mut rv, cmds, atomic);
247 rv
248}
249
250fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Cmd], atomic: bool) {
251 let cmds_len = cmds.iter().map(cmd_len).sum();
252
253 if atomic {
254 let multi = cmd("MULTI");
255 let exec = cmd("EXEC");
256 rv.reserve(cmd_len(&multi) + cmd_len(&exec) + cmds_len);
257
258 multi.write_packed_command_preallocated(rv);
259 for cmd in cmds {
260 cmd.write_packed_command_preallocated(rv);
261 }
262 exec.write_packed_command_preallocated(rv);
263 } else {
264 rv.reserve(cmds_len);
265
266 for cmd in cmds {
267 cmd.write_packed_command_preallocated(rv);
268 }
269 }
270}
271
272// Macro to implement shared methods between Pipeline and ClusterPipeline
273macro_rules! implement_pipeline_commands {
274 ($struct_name:ident) => {
275 impl $struct_name {
276 /// Adds a command to the cluster pipeline.
277 #[inline]
278 pub fn add_command(&mut self, cmd: Cmd) -> &mut Self {
279 self.commands.push(cmd);
280 self
281 }
282
283 /// Starts a new command. Functions such as `arg` then become
284 /// available to add more arguments to that command.
285 #[inline]
286 pub fn cmd(&mut self, name: &str) -> &mut Self {
287 self.add_command(cmd(name))
288 }
289
290 /// Returns an iterator over all the commands currently in this pipeline
291 pub fn cmd_iter(&self) -> impl Iterator<Item = &Cmd> {
292 self.commands.iter()
293 }
294
295 /// Instructs the pipeline to ignore the return value of this command.
296 /// It will still be ensured that it is not an error, but any successful
297 /// result is just thrown away. This makes result processing through
298 /// tuples much easier because you do not need to handle all the items
299 /// you do not care about.
300 #[inline]
301 pub fn ignore(&mut self) -> &mut Self {
302 match self.commands.len() {
303 0 => true,
304 x => self.ignored_commands.insert(x - 1),
305 };
306 self
307 }
308
309 /// Adds an argument to the last started command. This works similar
310 /// to the `arg` method of the `Cmd` object.
311 ///
312 /// Note that this function fails the task if executed on an empty pipeline.
313 #[inline]
314 pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Self {
315 {
316 let cmd = self.get_last_command();
317 cmd.arg(arg);
318 }
319 self
320 }
321
322 /// Clear a pipeline object's internal data structure.
323 ///
324 /// This allows reusing a pipeline object as a clear object while performing a minimal
325 /// amount of memory released/reallocated.
326 #[inline]
327 pub fn clear(&mut self) {
328 self.commands.clear();
329 self.ignored_commands.clear();
330 }
331
332 #[inline]
333 fn get_last_command(&mut self) -> &mut Cmd {
334 let idx = match self.commands.len() {
335 0 => panic!("No command on stack"),
336 x => x - 1,
337 };
338 &mut self.commands[idx]
339 }
340
341 fn make_pipeline_results(&self, resp: Vec<Value>) -> RedisResult<Value> {
342 let resp = Value::extract_error_vec(resp)?;
343
344 let mut rv = Vec::with_capacity(resp.len() - self.ignored_commands.len());
345 for (idx, result) in resp.into_iter().enumerate() {
346 if !self.ignored_commands.contains(&idx) {
347 rv.push(result);
348 }
349 }
350 Ok(Value::Array(rv))
351 }
352 }
353
354 impl Default for $struct_name {
355 fn default() -> Self {
356 Self::new()
357 }
358 }
359 };
360}
361
362implement_pipeline_commands!(Pipeline);
363
364// Defines caching related functions for Pipeline, ClusterPipeline isn't supported yet.
365impl Pipeline {
366 /// Changes caching behaviour for latest command in the pipeline.
367 #[cfg(feature = "cache-aio")]
368 #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
369 pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Self {
370 let cmd = self.get_last_command();
371 cmd.set_cache_config(command_cache_config);
372 self
373 }
374}