redis/
client.rs

1use std::time::Duration;
2
3#[cfg(feature = "aio")]
4use crate::aio::{AsyncPushSender, DefaultAsyncDNSResolver};
5#[cfg(feature = "aio")]
6use crate::io::AsyncDNSResolver;
7use crate::{
8    connection::{connect, Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo},
9    types::{RedisResult, Value},
10};
11#[cfg(feature = "aio")]
12use std::pin::Pin;
13
14#[cfg(feature = "tls-rustls")]
15use crate::tls::{inner_build_with_tls, TlsCertificates};
16
17#[cfg(feature = "cache-aio")]
18use crate::caching::CacheConfig;
19#[cfg(all(
20    feature = "cache-aio",
21    any(feature = "connection-manager", feature = "cluster-async")
22))]
23use crate::caching::CacheManager;
24
25/// The client type.
26#[derive(Debug, Clone)]
27pub struct Client {
28    pub(crate) connection_info: ConnectionInfo,
29}
30
31/// The client acts as connector to the redis server.  By itself it does not
32/// do much other than providing a convenient way to fetch a connection from
33/// it.  In the future the plan is to provide a connection pool in the client.
34///
35/// When opening a client a URL in the following format should be used:
36///
37/// ```plain
38/// redis://host:port/db
39/// ```
40///
41/// Example usage::
42///
43/// ```rust,no_run
44/// let client = redis::Client::open("redis://127.0.0.1/").unwrap();
45/// let con = client.get_connection().unwrap();
46/// ```
47impl Client {
48    /// Connects to a redis server and returns a client.  This does not
49    /// actually open a connection yet but it does perform some basic
50    /// checks on the URL that might make the operation fail.
51    pub fn open<T: IntoConnectionInfo>(params: T) -> RedisResult<Client> {
52        Ok(Client {
53            connection_info: params.into_connection_info()?,
54        })
55    }
56
57    /// Instructs the client to actually connect to redis and returns a
58    /// connection object.  The connection object can be used to send
59    /// commands to the server.  This can fail with a variety of errors
60    /// (like unreachable host) so it's important that you handle those
61    /// errors.
62    pub fn get_connection(&self) -> RedisResult<Connection> {
63        connect(&self.connection_info, None)
64    }
65
66    /// Instructs the client to actually connect to redis with specified
67    /// timeout and returns a connection object.  The connection object
68    /// can be used to send commands to the server.  This can fail with
69    /// a variety of errors (like unreachable host) so it's important
70    /// that you handle those errors.
71    pub fn get_connection_with_timeout(&self, timeout: Duration) -> RedisResult<Connection> {
72        connect(&self.connection_info, Some(timeout))
73    }
74
75    /// Returns a reference of client connection info object.
76    pub fn get_connection_info(&self) -> &ConnectionInfo {
77        &self.connection_info
78    }
79
80    /// Constructs a new `Client` with parameters necessary to create a TLS connection.
81    ///
82    /// - `conn_info` - URL using the `rediss://` scheme.
83    /// - `tls_certs` - `TlsCertificates` structure containing:
84    ///     - `client_tls` - Optional `ClientTlsConfig` containing byte streams for
85    ///         - `client_cert` - client's byte stream containing client certificate in PEM format
86    ///         - `client_key` - client's byte stream containing private key in PEM format
87    ///     - `root_cert` - Optional byte stream yielding PEM formatted file for root certificates.
88    ///
89    /// If `ClientTlsConfig` ( cert+key pair ) is not provided, then client-side authentication is not enabled.
90    /// If `root_cert` is not provided, then system root certificates are used instead.
91    ///
92    /// # Examples
93    ///
94    /// ```no_run
95    /// use std::{fs::File, io::{BufReader, Read}};
96    ///
97    /// use redis::{Client, AsyncTypedCommands as _, TlsCertificates, ClientTlsConfig};
98    ///
99    /// async fn do_redis_code(
100    ///     url: &str,
101    ///     root_cert_file: &str,
102    ///     cert_file: &str,
103    ///     key_file: &str
104    /// ) -> redis::RedisResult<()> {
105    ///     let root_cert_file = File::open(root_cert_file).expect("cannot open private cert file");
106    ///     let mut root_cert_vec = Vec::new();
107    ///     BufReader::new(root_cert_file)
108    ///         .read_to_end(&mut root_cert_vec)
109    ///         .expect("Unable to read ROOT cert file");
110    ///
111    ///     let cert_file = File::open(cert_file).expect("cannot open private cert file");
112    ///     let mut client_cert_vec = Vec::new();
113    ///     BufReader::new(cert_file)
114    ///         .read_to_end(&mut client_cert_vec)
115    ///         .expect("Unable to read client cert file");
116    ///
117    ///     let key_file = File::open(key_file).expect("cannot open private key file");
118    ///     let mut client_key_vec = Vec::new();
119    ///     BufReader::new(key_file)
120    ///         .read_to_end(&mut client_key_vec)
121    ///         .expect("Unable to read client key file");
122    ///
123    ///     let client = Client::build_with_tls(
124    ///         url,
125    ///         TlsCertificates {
126    ///             client_tls: Some(ClientTlsConfig{
127    ///                 client_cert: client_cert_vec,
128    ///                 client_key: client_key_vec,
129    ///             }),
130    ///             root_cert: Some(root_cert_vec),
131    ///         }
132    ///     )
133    ///     .expect("Unable to build client");
134    ///
135    ///     let connection_info = client.get_connection_info();
136    ///
137    ///     println!(">>> connection info: {connection_info:?}");
138    ///
139    ///     let mut con = client.get_multiplexed_async_connection().await?;
140    ///
141    ///     con.set("key1", b"foo").await?;
142    ///
143    ///     redis::cmd("SET")
144    ///         .arg(&["key2", "bar"])
145    ///         .exec_async(&mut con)
146    ///         .await?;
147    ///
148    ///     let result = redis::cmd("MGET")
149    ///         .arg(&["key1", "key2"])
150    ///         .query_async(&mut con)
151    ///         .await;
152    ///     assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
153    ///     println!("Result from MGET: {result:?}");
154    ///
155    ///     Ok(())
156    /// }
157    /// ```
158    #[cfg(feature = "tls-rustls")]
159    pub fn build_with_tls<C: IntoConnectionInfo>(
160        conn_info: C,
161        tls_certs: TlsCertificates,
162    ) -> RedisResult<Client> {
163        let connection_info = conn_info.into_connection_info()?;
164
165        inner_build_with_tls(connection_info, &tls_certs)
166    }
167}
168
169#[cfg(feature = "cache-aio")]
170#[derive(Clone)]
171pub(crate) enum Cache {
172    Config(CacheConfig),
173    #[cfg(any(feature = "connection-manager", feature = "cluster-async"))]
174    Manager(CacheManager),
175}
176
177#[cfg(feature = "aio")]
178pub(crate) const DEFAULT_RESPONSE_TIMEOUT: Option<Duration> = Some(Duration::from_millis(500));
179#[cfg(any(feature = "aio", feature = "cluster"))]
180pub(crate) const DEFAULT_CONNECTION_TIMEOUT: Option<Duration> = Some(Duration::from_secs(1));
181
182/// Options for creation of async connection
183#[cfg(feature = "aio")]
184#[derive(Clone)]
185pub struct AsyncConnectionConfig {
186    /// Maximum time to wait for a response from the server
187    pub(crate) response_timeout: Option<Duration>,
188    /// Maximum time to wait for a connection to be established
189    pub(crate) connection_timeout: Option<Duration>,
190    pub(crate) push_sender: Option<std::sync::Arc<dyn AsyncPushSender>>,
191    #[cfg(feature = "cache-aio")]
192    pub(crate) cache: Option<Cache>,
193    pub(crate) dns_resolver: Option<std::sync::Arc<dyn AsyncDNSResolver>>,
194}
195
196#[cfg(feature = "aio")]
197impl Default for AsyncConnectionConfig {
198    fn default() -> Self {
199        Self {
200            response_timeout: DEFAULT_RESPONSE_TIMEOUT,
201            connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
202            push_sender: Default::default(),
203            #[cfg(feature = "cache-aio")]
204            cache: Default::default(),
205            dns_resolver: Default::default(),
206        }
207    }
208}
209
210#[cfg(feature = "aio")]
211impl AsyncConnectionConfig {
212    /// Creates a new instance of the options with nothing set
213    pub fn new() -> Self {
214        Self::default()
215    }
216
217    /// Each connection attempt to the server will time out after `connection_timeout`.
218    ///
219    /// Set `None` if you don't want the connection attempt to time out.
220    pub fn set_connection_timeout(mut self, connection_timeout: Option<Duration>) -> Self {
221        self.connection_timeout = connection_timeout;
222        self
223    }
224
225    /// The new connection will time out operations after `response_timeout` has passed.
226    ///
227    /// Set `None` if you don't want requests to time out.
228    pub fn set_response_timeout(mut self, response_timeout: Option<Duration>) -> Self {
229        self.response_timeout = response_timeout;
230        self
231    }
232
233    /// Sets sender sender for push values.
234    ///
235    /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
236    /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::set_protocol] function.
237    ///
238    /// # Examples
239    ///
240    /// ```rust
241    /// # use redis::AsyncConnectionConfig;
242    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
243    /// let config = AsyncConnectionConfig::new().set_push_sender(tx);
244    /// ```
245    ///
246    /// ```rust
247    /// # use std::sync::{Mutex, Arc};
248    /// # use redis::AsyncConnectionConfig;
249    /// let messages = Arc::new(Mutex::new(Vec::new()));
250    /// let config = AsyncConnectionConfig::new().set_push_sender(move |msg|{
251    ///     let Ok(mut messages) = messages.lock() else {
252    ///         return Err(redis::aio::SendError);
253    ///     };
254    ///     messages.push(msg);
255    ///     Ok(())
256    /// });
257    /// ```
258    pub fn set_push_sender(self, sender: impl AsyncPushSender) -> Self {
259        self.set_push_sender_internal(std::sync::Arc::new(sender))
260    }
261
262    pub(crate) fn set_push_sender_internal(
263        mut self,
264        sender: std::sync::Arc<dyn AsyncPushSender>,
265    ) -> Self {
266        self.push_sender = Some(sender);
267        self
268    }
269
270    /// Sets cache config for MultiplexedConnection, check CacheConfig for more details.
271    #[cfg(feature = "cache-aio")]
272    pub fn set_cache_config(mut self, cache_config: CacheConfig) -> Self {
273        self.cache = Some(Cache::Config(cache_config));
274        self
275    }
276
277    #[cfg(all(
278        feature = "cache-aio",
279        any(feature = "connection-manager", feature = "cluster-async")
280    ))]
281    pub(crate) fn set_cache_manager(mut self, cache_manager: CacheManager) -> Self {
282        self.cache = Some(Cache::Manager(cache_manager));
283        self
284    }
285
286    /// Set the DNS resolver for the underlying TCP connection.
287    ///
288    /// The parameter resolver must implement the [`crate::io::AsyncDNSResolver`] trait.
289    pub fn set_dns_resolver(self, dns_resolver: impl AsyncDNSResolver) -> Self {
290        self.set_dns_resolver_internal(std::sync::Arc::new(dns_resolver))
291    }
292
293    pub(super) fn set_dns_resolver_internal(
294        mut self,
295        dns_resolver: std::sync::Arc<dyn AsyncDNSResolver>,
296    ) -> Self {
297        self.dns_resolver = Some(dns_resolver);
298        self
299    }
300}
301
302/// To enable async support you need to chose one of the supported runtimes and active its
303/// corresponding feature: `tokio-comp` or `smol-comp`
304#[cfg(feature = "aio")]
305#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
306impl Client {
307    /// Returns an async connection from the client.
308    #[cfg(feature = "aio")]
309    #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
310    pub async fn get_multiplexed_async_connection(
311        &self,
312    ) -> RedisResult<crate::aio::MultiplexedConnection> {
313        self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
314            .await
315    }
316
317    /// Returns an async connection from the client.
318    #[cfg(feature = "aio")]
319    #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
320    pub async fn get_multiplexed_async_connection_with_config(
321        &self,
322        config: &AsyncConnectionConfig,
323    ) -> RedisResult<crate::aio::MultiplexedConnection> {
324        match Runtime::locate() {
325            #[cfg(feature = "tokio-comp")]
326            rt @ Runtime::Tokio => self
327                .get_multiplexed_async_connection_inner_with_timeout::<crate::aio::tokio::Tokio>(
328                    config, rt,
329                )
330                .await,
331
332            #[cfg(feature = "smol-comp")]
333            rt @ Runtime::Smol => {
334                self.get_multiplexed_async_connection_inner_with_timeout::<crate::aio::smol::Smol>(
335                    config, rt,
336                )
337                .await
338            }
339        }
340    }
341
342    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
343    ///
344    /// The connection manager wraps a
345    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
346    /// connection fails with a connection error, then a new connection is
347    /// established in the background and the error is returned to the caller.
348    ///
349    /// This means that on connection loss at least one command will fail, but
350    /// the connection will be re-established automatically if possible. Please
351    /// refer to the [`ConnectionManager`][connection-manager] docs for
352    /// detailed reconnecting behavior.
353    ///
354    /// A connection manager can be cloned, allowing requests to be sent concurrently
355    /// on the same underlying connection (tcp/unix socket).
356    ///
357    /// [connection-manager]: aio/struct.ConnectionManager.html
358    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
359    #[cfg(feature = "connection-manager")]
360    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
361    pub async fn get_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
362        crate::aio::ConnectionManager::new(self.clone()).await
363    }
364
365    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
366    ///
367    /// The connection manager wraps a
368    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
369    /// connection fails with a connection error, then a new connection is
370    /// established in the background and the error is returned to the caller.
371    ///
372    /// This means that on connection loss at least one command will fail, but
373    /// the connection will be re-established automatically if possible. Please
374    /// refer to the [`ConnectionManager`][connection-manager] docs for
375    /// detailed reconnecting behavior.
376    ///
377    /// A connection manager can be cloned, allowing requests to be sent concurrently
378    /// on the same underlying connection (tcp/unix socket).
379    ///
380    /// [connection-manager]: aio/struct.ConnectionManager.html
381    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
382    #[cfg(feature = "connection-manager")]
383    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
384    pub async fn get_connection_manager_with_config(
385        &self,
386        config: crate::aio::ConnectionManagerConfig,
387    ) -> RedisResult<crate::aio::ConnectionManager> {
388        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
389    }
390
391    async fn get_multiplexed_async_connection_inner_with_timeout<T>(
392        &self,
393        config: &AsyncConnectionConfig,
394        rt: Runtime,
395    ) -> RedisResult<crate::aio::MultiplexedConnection>
396    where
397        T: crate::aio::RedisRuntime,
398    {
399        let result = if let Some(connection_timeout) = config.connection_timeout {
400            rt.timeout(
401                connection_timeout,
402                self.get_multiplexed_async_connection_inner::<T>(config),
403            )
404            .await
405        } else {
406            Ok(self
407                .get_multiplexed_async_connection_inner::<T>(config)
408                .await)
409        };
410
411        match result {
412            Ok(Ok(connection)) => Ok(connection),
413            Ok(Err(e)) => Err(e),
414            Err(elapsed) => Err(elapsed.into()),
415        }
416    }
417
418    async fn get_multiplexed_async_connection_inner<T>(
419        &self,
420        config: &AsyncConnectionConfig,
421    ) -> RedisResult<crate::aio::MultiplexedConnection>
422    where
423        T: crate::aio::RedisRuntime,
424    {
425        let (mut connection, driver) = self
426            .create_multiplexed_async_connection_inner::<T>(config)
427            .await?;
428        let handle = T::spawn(driver);
429        connection.set_task_handle(handle);
430        Ok(connection)
431    }
432
433    async fn create_multiplexed_async_connection_inner<T>(
434        &self,
435        config: &AsyncConnectionConfig,
436    ) -> RedisResult<(
437        crate::aio::MultiplexedConnection,
438        impl std::future::Future<Output = ()>,
439    )>
440    where
441        T: crate::aio::RedisRuntime,
442    {
443        let resolver = config
444            .dns_resolver
445            .as_deref()
446            .unwrap_or(&DefaultAsyncDNSResolver);
447        let con = self.get_simple_async_connection::<T>(resolver).await?;
448        crate::aio::MultiplexedConnection::new_with_config(
449            &self.connection_info.redis,
450            con,
451            config.clone(),
452        )
453        .await
454    }
455
456    async fn get_simple_async_connection_dynamically(
457        &self,
458        dns_resolver: &dyn AsyncDNSResolver,
459    ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>> {
460        match Runtime::locate() {
461            #[cfg(feature = "tokio-comp")]
462            Runtime::Tokio => {
463                self.get_simple_async_connection::<crate::aio::tokio::Tokio>(dns_resolver)
464                    .await
465            }
466
467            #[cfg(feature = "smol-comp")]
468            Runtime::Smol => {
469                self.get_simple_async_connection::<crate::aio::smol::Smol>(dns_resolver)
470                    .await
471            }
472        }
473    }
474
475    async fn get_simple_async_connection<T>(
476        &self,
477        dns_resolver: &dyn AsyncDNSResolver,
478    ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>>
479    where
480        T: crate::aio::RedisRuntime,
481    {
482        Ok(
483            crate::aio::connect_simple::<T>(&self.connection_info, dns_resolver)
484                .await?
485                .boxed(),
486        )
487    }
488
489    #[cfg(feature = "connection-manager")]
490    pub(crate) fn connection_info(&self) -> &ConnectionInfo {
491        &self.connection_info
492    }
493
494    /// Returns an async receiver for pub-sub messages.
495    #[cfg(feature = "aio")]
496    // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later?
497    pub async fn get_async_pubsub(&self) -> RedisResult<crate::aio::PubSub> {
498        let connection = self
499            .get_simple_async_connection_dynamically(&DefaultAsyncDNSResolver)
500            .await?;
501
502        crate::aio::PubSub::new(&self.connection_info.redis, connection).await
503    }
504
505    /// Returns an async receiver for monitor messages.
506    #[cfg(feature = "aio")]
507    pub async fn get_async_monitor(&self) -> RedisResult<crate::aio::Monitor> {
508        let connection = self
509            .get_simple_async_connection_dynamically(&DefaultAsyncDNSResolver)
510            .await?;
511        crate::aio::Monitor::new(&self.connection_info.redis, connection).await
512    }
513}
514
515#[cfg(feature = "aio")]
516use crate::aio::Runtime;
517
518impl ConnectionLike for Client {
519    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
520        self.get_connection()?.req_packed_command(cmd)
521    }
522
523    fn req_packed_commands(
524        &mut self,
525        cmd: &[u8],
526        offset: usize,
527        count: usize,
528    ) -> RedisResult<Vec<Value>> {
529        self.get_connection()?
530            .req_packed_commands(cmd, offset, count)
531    }
532
533    fn get_db(&self) -> i64 {
534        self.connection_info.redis.db
535    }
536
537    fn check_connection(&mut self) -> bool {
538        if let Ok(mut conn) = self.get_connection() {
539            conn.check_connection()
540        } else {
541            false
542        }
543    }
544
545    fn is_open(&self) -> bool {
546        if let Ok(conn) = self.get_connection() {
547            conn.is_open()
548        } else {
549            false
550        }
551    }
552}
553
554#[cfg(test)]
555mod test {
556    use super::*;
557
558    #[test]
559    fn regression_293_parse_ipv6_with_interface() {
560        assert!(Client::open(("fe80::cafe:beef%eno1", 6379)).is_ok());
561    }
562}