redis/client.rs
1use std::time::Duration;
2
3#[cfg(feature = "aio")]
4use crate::aio::{AsyncPushSender, DefaultAsyncDNSResolver};
5#[cfg(feature = "aio")]
6use crate::io::AsyncDNSResolver;
7use crate::{
8 connection::{connect, Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo},
9 types::{RedisResult, Value},
10};
11#[cfg(feature = "aio")]
12use std::pin::Pin;
13
14#[cfg(feature = "tls-rustls")]
15use crate::tls::{inner_build_with_tls, TlsCertificates};
16
17#[cfg(feature = "cache-aio")]
18use crate::caching::CacheConfig;
19#[cfg(all(
20 feature = "cache-aio",
21 any(feature = "connection-manager", feature = "cluster-async")
22))]
23use crate::caching::CacheManager;
24
25/// The client type.
26#[derive(Debug, Clone)]
27pub struct Client {
28 pub(crate) connection_info: ConnectionInfo,
29}
30
31/// The client acts as connector to the redis server. By itself it does not
32/// do much other than providing a convenient way to fetch a connection from
33/// it. In the future the plan is to provide a connection pool in the client.
34///
35/// When opening a client a URL in the following format should be used:
36///
37/// ```plain
38/// redis://host:port/db
39/// ```
40///
41/// Example usage::
42///
43/// ```rust,no_run
44/// let client = redis::Client::open("redis://127.0.0.1/").unwrap();
45/// let con = client.get_connection().unwrap();
46/// ```
47impl Client {
48 /// Connects to a redis server and returns a client. This does not
49 /// actually open a connection yet but it does perform some basic
50 /// checks on the URL that might make the operation fail.
51 pub fn open<T: IntoConnectionInfo>(params: T) -> RedisResult<Client> {
52 Ok(Client {
53 connection_info: params.into_connection_info()?,
54 })
55 }
56
57 /// Instructs the client to actually connect to redis and returns a
58 /// connection object. The connection object can be used to send
59 /// commands to the server. This can fail with a variety of errors
60 /// (like unreachable host) so it's important that you handle those
61 /// errors.
62 pub fn get_connection(&self) -> RedisResult<Connection> {
63 connect(&self.connection_info, None)
64 }
65
66 /// Instructs the client to actually connect to redis with specified
67 /// timeout and returns a connection object. The connection object
68 /// can be used to send commands to the server. This can fail with
69 /// a variety of errors (like unreachable host) so it's important
70 /// that you handle those errors.
71 pub fn get_connection_with_timeout(&self, timeout: Duration) -> RedisResult<Connection> {
72 connect(&self.connection_info, Some(timeout))
73 }
74
75 /// Returns a reference of client connection info object.
76 pub fn get_connection_info(&self) -> &ConnectionInfo {
77 &self.connection_info
78 }
79
80 /// Constructs a new `Client` with parameters necessary to create a TLS connection.
81 ///
82 /// - `conn_info` - URL using the `rediss://` scheme.
83 /// - `tls_certs` - `TlsCertificates` structure containing:
84 /// - `client_tls` - Optional `ClientTlsConfig` containing byte streams for
85 /// - `client_cert` - client's byte stream containing client certificate in PEM format
86 /// - `client_key` - client's byte stream containing private key in PEM format
87 /// - `root_cert` - Optional byte stream yielding PEM formatted file for root certificates.
88 ///
89 /// If `ClientTlsConfig` ( cert+key pair ) is not provided, then client-side authentication is not enabled.
90 /// If `root_cert` is not provided, then system root certificates are used instead.
91 ///
92 /// # Examples
93 ///
94 /// ```no_run
95 /// use std::{fs::File, io::{BufReader, Read}};
96 ///
97 /// use redis::{Client, AsyncTypedCommands as _, TlsCertificates, ClientTlsConfig};
98 ///
99 /// async fn do_redis_code(
100 /// url: &str,
101 /// root_cert_file: &str,
102 /// cert_file: &str,
103 /// key_file: &str
104 /// ) -> redis::RedisResult<()> {
105 /// let root_cert_file = File::open(root_cert_file).expect("cannot open private cert file");
106 /// let mut root_cert_vec = Vec::new();
107 /// BufReader::new(root_cert_file)
108 /// .read_to_end(&mut root_cert_vec)
109 /// .expect("Unable to read ROOT cert file");
110 ///
111 /// let cert_file = File::open(cert_file).expect("cannot open private cert file");
112 /// let mut client_cert_vec = Vec::new();
113 /// BufReader::new(cert_file)
114 /// .read_to_end(&mut client_cert_vec)
115 /// .expect("Unable to read client cert file");
116 ///
117 /// let key_file = File::open(key_file).expect("cannot open private key file");
118 /// let mut client_key_vec = Vec::new();
119 /// BufReader::new(key_file)
120 /// .read_to_end(&mut client_key_vec)
121 /// .expect("Unable to read client key file");
122 ///
123 /// let client = Client::build_with_tls(
124 /// url,
125 /// TlsCertificates {
126 /// client_tls: Some(ClientTlsConfig{
127 /// client_cert: client_cert_vec,
128 /// client_key: client_key_vec,
129 /// }),
130 /// root_cert: Some(root_cert_vec),
131 /// }
132 /// )
133 /// .expect("Unable to build client");
134 ///
135 /// let connection_info = client.get_connection_info();
136 ///
137 /// println!(">>> connection info: {connection_info:?}");
138 ///
139 /// let mut con = client.get_multiplexed_async_connection().await?;
140 ///
141 /// con.set("key1", b"foo").await?;
142 ///
143 /// redis::cmd("SET")
144 /// .arg(&["key2", "bar"])
145 /// .exec_async(&mut con)
146 /// .await?;
147 ///
148 /// let result = redis::cmd("MGET")
149 /// .arg(&["key1", "key2"])
150 /// .query_async(&mut con)
151 /// .await;
152 /// assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
153 /// println!("Result from MGET: {result:?}");
154 ///
155 /// Ok(())
156 /// }
157 /// ```
158 #[cfg(feature = "tls-rustls")]
159 pub fn build_with_tls<C: IntoConnectionInfo>(
160 conn_info: C,
161 tls_certs: TlsCertificates,
162 ) -> RedisResult<Client> {
163 let connection_info = conn_info.into_connection_info()?;
164
165 inner_build_with_tls(connection_info, &tls_certs)
166 }
167}
168
169#[cfg(feature = "cache-aio")]
170#[derive(Clone)]
171pub(crate) enum Cache {
172 Config(CacheConfig),
173 #[cfg(any(feature = "connection-manager", feature = "cluster-async"))]
174 Manager(CacheManager),
175}
176
177#[cfg(feature = "aio")]
178pub(crate) const DEFAULT_RESPONSE_TIMEOUT: Option<Duration> = Some(Duration::from_millis(500));
179#[cfg(any(feature = "aio", feature = "cluster"))]
180pub(crate) const DEFAULT_CONNECTION_TIMEOUT: Option<Duration> = Some(Duration::from_secs(1));
181
182/// Options for creation of async connection
183#[cfg(feature = "aio")]
184#[derive(Clone)]
185pub struct AsyncConnectionConfig {
186 /// Maximum time to wait for a response from the server
187 pub(crate) response_timeout: Option<Duration>,
188 /// Maximum time to wait for a connection to be established
189 pub(crate) connection_timeout: Option<Duration>,
190 pub(crate) push_sender: Option<std::sync::Arc<dyn AsyncPushSender>>,
191 #[cfg(feature = "cache-aio")]
192 pub(crate) cache: Option<Cache>,
193 pub(crate) dns_resolver: Option<std::sync::Arc<dyn AsyncDNSResolver>>,
194}
195
196#[cfg(feature = "aio")]
197impl Default for AsyncConnectionConfig {
198 fn default() -> Self {
199 Self {
200 response_timeout: DEFAULT_RESPONSE_TIMEOUT,
201 connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
202 push_sender: Default::default(),
203 #[cfg(feature = "cache-aio")]
204 cache: Default::default(),
205 dns_resolver: Default::default(),
206 }
207 }
208}
209
210#[cfg(feature = "aio")]
211impl AsyncConnectionConfig {
212 /// Creates a new instance of the options with nothing set
213 pub fn new() -> Self {
214 Self::default()
215 }
216
217 /// Each connection attempt to the server will time out after `connection_timeout`.
218 ///
219 /// Set `None` if you don't want the connection attempt to time out.
220 pub fn set_connection_timeout(mut self, connection_timeout: Option<Duration>) -> Self {
221 self.connection_timeout = connection_timeout;
222 self
223 }
224
225 /// The new connection will time out operations after `response_timeout` has passed.
226 ///
227 /// Set `None` if you don't want requests to time out.
228 pub fn set_response_timeout(mut self, response_timeout: Option<Duration>) -> Self {
229 self.response_timeout = response_timeout;
230 self
231 }
232
233 /// Sets sender sender for push values.
234 ///
235 /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
236 /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::set_protocol] function.
237 ///
238 /// # Examples
239 ///
240 /// ```rust
241 /// # use redis::AsyncConnectionConfig;
242 /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
243 /// let config = AsyncConnectionConfig::new().set_push_sender(tx);
244 /// ```
245 ///
246 /// ```rust
247 /// # use std::sync::{Mutex, Arc};
248 /// # use redis::AsyncConnectionConfig;
249 /// let messages = Arc::new(Mutex::new(Vec::new()));
250 /// let config = AsyncConnectionConfig::new().set_push_sender(move |msg|{
251 /// let Ok(mut messages) = messages.lock() else {
252 /// return Err(redis::aio::SendError);
253 /// };
254 /// messages.push(msg);
255 /// Ok(())
256 /// });
257 /// ```
258 pub fn set_push_sender(self, sender: impl AsyncPushSender) -> Self {
259 self.set_push_sender_internal(std::sync::Arc::new(sender))
260 }
261
262 pub(crate) fn set_push_sender_internal(
263 mut self,
264 sender: std::sync::Arc<dyn AsyncPushSender>,
265 ) -> Self {
266 self.push_sender = Some(sender);
267 self
268 }
269
270 /// Sets cache config for MultiplexedConnection, check CacheConfig for more details.
271 #[cfg(feature = "cache-aio")]
272 pub fn set_cache_config(mut self, cache_config: CacheConfig) -> Self {
273 self.cache = Some(Cache::Config(cache_config));
274 self
275 }
276
277 #[cfg(all(
278 feature = "cache-aio",
279 any(feature = "connection-manager", feature = "cluster-async")
280 ))]
281 pub(crate) fn set_cache_manager(mut self, cache_manager: CacheManager) -> Self {
282 self.cache = Some(Cache::Manager(cache_manager));
283 self
284 }
285
286 /// Set the DNS resolver for the underlying TCP connection.
287 ///
288 /// The parameter resolver must implement the [`crate::io::AsyncDNSResolver`] trait.
289 pub fn set_dns_resolver(self, dns_resolver: impl AsyncDNSResolver) -> Self {
290 self.set_dns_resolver_internal(std::sync::Arc::new(dns_resolver))
291 }
292
293 pub(super) fn set_dns_resolver_internal(
294 mut self,
295 dns_resolver: std::sync::Arc<dyn AsyncDNSResolver>,
296 ) -> Self {
297 self.dns_resolver = Some(dns_resolver);
298 self
299 }
300}
301
302/// To enable async support you need to chose one of the supported runtimes and active its
303/// corresponding feature: `tokio-comp` or `smol-comp`
304#[cfg(feature = "aio")]
305#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
306impl Client {
307 /// Returns an async connection from the client.
308 #[cfg(feature = "aio")]
309 #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
310 pub async fn get_multiplexed_async_connection(
311 &self,
312 ) -> RedisResult<crate::aio::MultiplexedConnection> {
313 self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
314 .await
315 }
316
317 /// Returns an async connection from the client.
318 #[cfg(feature = "aio")]
319 #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
320 pub async fn get_multiplexed_async_connection_with_config(
321 &self,
322 config: &AsyncConnectionConfig,
323 ) -> RedisResult<crate::aio::MultiplexedConnection> {
324 match Runtime::locate() {
325 #[cfg(feature = "tokio-comp")]
326 rt @ Runtime::Tokio => self
327 .get_multiplexed_async_connection_inner_with_timeout::<crate::aio::tokio::Tokio>(
328 config, rt,
329 )
330 .await,
331
332 #[cfg(feature = "smol-comp")]
333 rt @ Runtime::Smol => {
334 self.get_multiplexed_async_connection_inner_with_timeout::<crate::aio::smol::Smol>(
335 config, rt,
336 )
337 .await
338 }
339 }
340 }
341
342 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
343 ///
344 /// The connection manager wraps a
345 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
346 /// connection fails with a connection error, then a new connection is
347 /// established in the background and the error is returned to the caller.
348 ///
349 /// This means that on connection loss at least one command will fail, but
350 /// the connection will be re-established automatically if possible. Please
351 /// refer to the [`ConnectionManager`][connection-manager] docs for
352 /// detailed reconnecting behavior.
353 ///
354 /// A connection manager can be cloned, allowing requests to be sent concurrently
355 /// on the same underlying connection (tcp/unix socket).
356 ///
357 /// [connection-manager]: aio/struct.ConnectionManager.html
358 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
359 #[cfg(feature = "connection-manager")]
360 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
361 pub async fn get_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
362 crate::aio::ConnectionManager::new(self.clone()).await
363 }
364
365 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
366 ///
367 /// The connection manager wraps a
368 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
369 /// connection fails with a connection error, then a new connection is
370 /// established in the background and the error is returned to the caller.
371 ///
372 /// This means that on connection loss at least one command will fail, but
373 /// the connection will be re-established automatically if possible. Please
374 /// refer to the [`ConnectionManager`][connection-manager] docs for
375 /// detailed reconnecting behavior.
376 ///
377 /// A connection manager can be cloned, allowing requests to be sent concurrently
378 /// on the same underlying connection (tcp/unix socket).
379 ///
380 /// [connection-manager]: aio/struct.ConnectionManager.html
381 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
382 #[cfg(feature = "connection-manager")]
383 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
384 pub async fn get_connection_manager_with_config(
385 &self,
386 config: crate::aio::ConnectionManagerConfig,
387 ) -> RedisResult<crate::aio::ConnectionManager> {
388 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
389 }
390
391 async fn get_multiplexed_async_connection_inner_with_timeout<T>(
392 &self,
393 config: &AsyncConnectionConfig,
394 rt: Runtime,
395 ) -> RedisResult<crate::aio::MultiplexedConnection>
396 where
397 T: crate::aio::RedisRuntime,
398 {
399 let result = if let Some(connection_timeout) = config.connection_timeout {
400 rt.timeout(
401 connection_timeout,
402 self.get_multiplexed_async_connection_inner::<T>(config),
403 )
404 .await
405 } else {
406 Ok(self
407 .get_multiplexed_async_connection_inner::<T>(config)
408 .await)
409 };
410
411 match result {
412 Ok(Ok(connection)) => Ok(connection),
413 Ok(Err(e)) => Err(e),
414 Err(elapsed) => Err(elapsed.into()),
415 }
416 }
417
418 async fn get_multiplexed_async_connection_inner<T>(
419 &self,
420 config: &AsyncConnectionConfig,
421 ) -> RedisResult<crate::aio::MultiplexedConnection>
422 where
423 T: crate::aio::RedisRuntime,
424 {
425 let (mut connection, driver) = self
426 .create_multiplexed_async_connection_inner::<T>(config)
427 .await?;
428 let handle = T::spawn(driver);
429 connection.set_task_handle(handle);
430 Ok(connection)
431 }
432
433 async fn create_multiplexed_async_connection_inner<T>(
434 &self,
435 config: &AsyncConnectionConfig,
436 ) -> RedisResult<(
437 crate::aio::MultiplexedConnection,
438 impl std::future::Future<Output = ()>,
439 )>
440 where
441 T: crate::aio::RedisRuntime,
442 {
443 let resolver = config
444 .dns_resolver
445 .as_deref()
446 .unwrap_or(&DefaultAsyncDNSResolver);
447 let con = self.get_simple_async_connection::<T>(resolver).await?;
448 crate::aio::MultiplexedConnection::new_with_config(
449 &self.connection_info.redis,
450 con,
451 config.clone(),
452 )
453 .await
454 }
455
456 async fn get_simple_async_connection_dynamically(
457 &self,
458 dns_resolver: &dyn AsyncDNSResolver,
459 ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>> {
460 match Runtime::locate() {
461 #[cfg(feature = "tokio-comp")]
462 Runtime::Tokio => {
463 self.get_simple_async_connection::<crate::aio::tokio::Tokio>(dns_resolver)
464 .await
465 }
466
467 #[cfg(feature = "smol-comp")]
468 Runtime::Smol => {
469 self.get_simple_async_connection::<crate::aio::smol::Smol>(dns_resolver)
470 .await
471 }
472 }
473 }
474
475 async fn get_simple_async_connection<T>(
476 &self,
477 dns_resolver: &dyn AsyncDNSResolver,
478 ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>>
479 where
480 T: crate::aio::RedisRuntime,
481 {
482 Ok(
483 crate::aio::connect_simple::<T>(&self.connection_info, dns_resolver)
484 .await?
485 .boxed(),
486 )
487 }
488
489 #[cfg(feature = "connection-manager")]
490 pub(crate) fn connection_info(&self) -> &ConnectionInfo {
491 &self.connection_info
492 }
493
494 /// Returns an async receiver for pub-sub messages.
495 #[cfg(feature = "aio")]
496 // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later?
497 pub async fn get_async_pubsub(&self) -> RedisResult<crate::aio::PubSub> {
498 let connection = self
499 .get_simple_async_connection_dynamically(&DefaultAsyncDNSResolver)
500 .await?;
501
502 crate::aio::PubSub::new(&self.connection_info.redis, connection).await
503 }
504
505 /// Returns an async receiver for monitor messages.
506 #[cfg(feature = "aio")]
507 pub async fn get_async_monitor(&self) -> RedisResult<crate::aio::Monitor> {
508 let connection = self
509 .get_simple_async_connection_dynamically(&DefaultAsyncDNSResolver)
510 .await?;
511 crate::aio::Monitor::new(&self.connection_info.redis, connection).await
512 }
513}
514
515#[cfg(feature = "aio")]
516use crate::aio::Runtime;
517
518impl ConnectionLike for Client {
519 fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
520 self.get_connection()?.req_packed_command(cmd)
521 }
522
523 fn req_packed_commands(
524 &mut self,
525 cmd: &[u8],
526 offset: usize,
527 count: usize,
528 ) -> RedisResult<Vec<Value>> {
529 self.get_connection()?
530 .req_packed_commands(cmd, offset, count)
531 }
532
533 fn get_db(&self) -> i64 {
534 self.connection_info.redis.db
535 }
536
537 fn check_connection(&mut self) -> bool {
538 if let Ok(mut conn) = self.get_connection() {
539 conn.check_connection()
540 } else {
541 false
542 }
543 }
544
545 fn is_open(&self) -> bool {
546 if let Ok(conn) = self.get_connection() {
547 conn.is_open()
548 } else {
549 false
550 }
551 }
552}
553
554#[cfg(test)]
555mod test {
556 use super::*;
557
558 #[test]
559 fn regression_293_parse_ipv6_with_interface() {
560 assert!(Client::open(("fe80::cafe:beef%eno1", 6379)).is_ok());
561 }
562}