redis/aio/
connection_manager.rs

1use super::{AsyncPushSender, HandleContainer, RedisFuture};
2#[cfg(feature = "cache-aio")]
3use crate::caching::CacheManager;
4use crate::{
5    aio::{ConnectionLike, MultiplexedConnection, Runtime},
6    check_resp3,
7    client::{DEFAULT_CONNECTION_TIMEOUT, DEFAULT_RESPONSE_TIMEOUT},
8    cmd,
9    errors::RedisError,
10    subscription_tracker::{SubscriptionAction, SubscriptionTracker},
11    types::{RedisResult, Value},
12    AsyncConnectionConfig, Client, Cmd, Pipeline, PushInfo, PushKind, ToRedisArgs,
13};
14use arc_swap::ArcSwap;
15use backon::{ExponentialBuilder, Retryable};
16use futures_channel::oneshot;
17use futures_util::future::{self, BoxFuture, FutureExt, Shared};
18use std::sync::{Arc, Weak};
19use std::time::Duration;
20use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
21use tokio::sync::Mutex;
22
23type OptionalPushSender = Option<Arc<dyn AsyncPushSender>>;
24
25/// The configuration for reconnect mechanism and request timing for the [ConnectionManager]
26#[derive(Clone)]
27pub struct ConnectionManagerConfig {
28    /// The resulting duration is calculated by taking the base to the `n`-th power,
29    /// where `n` denotes the number of past attempts.
30    exponent_base: f32,
31    /// The minimal delay for reconnection attempts
32    min_delay: Duration,
33    /// Apply a maximum delay between connection attempts. The delay between attempts won't be longer than max_delay milliseconds.
34    max_delay: Option<Duration>,
35    /// number_of_retries times, with an exponentially increasing delay
36    number_of_retries: usize,
37    /// The new connection will time out operations after `response_timeout` has passed.
38    response_timeout: Option<Duration>,
39    /// Each connection attempt to the server will time out after `connection_timeout`.
40    connection_timeout: Option<Duration>,
41    /// sender channel for push values
42    push_sender: Option<Arc<dyn AsyncPushSender>>,
43    /// if true, the manager should resubscribe automatically to all pubsub channels after reconnect.
44    resubscribe_automatically: bool,
45    #[cfg(feature = "cache-aio")]
46    pub(crate) cache_config: Option<crate::caching::CacheConfig>,
47}
48
49impl std::fmt::Debug for ConnectionManagerConfig {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
51        let &Self {
52            exponent_base,
53            min_delay,
54            number_of_retries,
55            max_delay,
56            response_timeout,
57            connection_timeout,
58            push_sender,
59            resubscribe_automatically,
60            #[cfg(feature = "cache-aio")]
61            cache_config,
62        } = &self;
63        let mut str = f.debug_struct("ConnectionManagerConfig");
64        str.field("exponent_base", &exponent_base)
65            .field("min_delay", &min_delay)
66            .field("max_delay", &max_delay)
67            .field("number_of_retries", &number_of_retries)
68            .field("response_timeout", &response_timeout)
69            .field("connection_timeout", &connection_timeout)
70            .field("resubscribe_automatically", &resubscribe_automatically)
71            .field(
72                "push_sender",
73                if push_sender.is_some() {
74                    &"set"
75                } else {
76                    &"not set"
77                },
78            );
79
80        #[cfg(feature = "cache-aio")]
81        str.field("cache_config", &cache_config);
82
83        str.finish()
84    }
85}
86
87impl ConnectionManagerConfig {
88    const DEFAULT_CONNECTION_RETRY_EXPONENT_BASE: f32 = 2.0;
89    const DEFAULT_CONNECTION_RETRY_MIN_DELAY: Duration = Duration::from_millis(100);
90    const DEFAULT_NUMBER_OF_CONNECTION_RETRIES: usize = 6;
91
92    /// Creates a new instance of the options with nothing set
93    pub fn new() -> Self {
94        Self::default()
95    }
96
97    /// Returns the minimum delay between connection attempts.
98    pub fn min_delay(&self) -> Duration {
99        self.min_delay
100    }
101
102    /// Returns the maximum delay between connection attempts.
103    pub fn max_delay(&self) -> Option<Duration> {
104        self.max_delay
105    }
106
107    /// Returns the base used for calculating the exponential backoff between retries.
108    pub fn exponent_base(&self) -> f32 {
109        self.exponent_base
110    }
111
112    /// Returns the maximum number of connection retry attempts.
113    pub fn number_of_retries(&self) -> usize {
114        self.number_of_retries
115    }
116
117    /// Returns the timeout applied to command responses.
118    ///
119    /// If `None`, responses do not time out.
120    pub fn response_timeout(&self) -> Option<Duration> {
121        self.response_timeout
122    }
123
124    /// Returns the timeout applied to establishing a new connection.
125    ///
126    /// If `None`, connection attempts to do not time out.
127    pub fn connection_timeout(&self) -> Option<Duration> {
128        self.connection_timeout
129    }
130
131    /// Returns `true` if automatic resubscription is enabled after reconnecting.
132    pub fn automatic_resubscription(&self) -> bool {
133        self.resubscribe_automatically
134    }
135
136    /// Returns the current cache configuration, if caching is enabled.
137    #[cfg(feature = "cache-aio")]
138    pub fn cache_config(&self) -> Option<&crate::caching::CacheConfig> {
139        self.cache_config.as_ref()
140    }
141
142    /// Set the minimal delay for reconnect attempts.
143    pub fn set_min_delay(mut self, min_delay: Duration) -> ConnectionManagerConfig {
144        self.min_delay = min_delay;
145        self
146    }
147
148    /// Apply a maximum delay between connection attempts. The delay between attempts won't be longer than max_delay milliseconds.
149    pub fn set_max_delay(mut self, time: Duration) -> ConnectionManagerConfig {
150        self.max_delay = Some(time);
151        self
152    }
153
154    /// The resulting duration is calculated by taking the base to the `n`-th power,
155    /// where `n` denotes the number of past attempts.
156    pub fn set_exponent_base(mut self, base: f32) -> ConnectionManagerConfig {
157        self.exponent_base = base;
158        self
159    }
160
161    /// number_of_retries times, with an exponentially increasing delay.
162    pub fn set_number_of_retries(mut self, amount: usize) -> ConnectionManagerConfig {
163        self.number_of_retries = amount;
164        self
165    }
166
167    /// The new connection will time out operations after `response_timeout` has passed.
168    ///
169    /// Set `None` if you don't want requests to time out.
170    pub fn set_response_timeout(mut self, duration: Option<Duration>) -> ConnectionManagerConfig {
171        self.response_timeout = duration;
172        self
173    }
174
175    /// Each connection attempt to the server will time out after `connection_timeout`.
176    ///
177    /// Set `None` if you don't want the connection attempt to time out.
178    pub fn set_connection_timeout(mut self, duration: Option<Duration>) -> ConnectionManagerConfig {
179        self.connection_timeout = duration;
180        self
181    }
182
183    /// Sets sender sender for push values.
184    ///
185    /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
186    /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::set_protocol] function.
187    ///
188    /// # Examples
189    ///
190    /// ```rust
191    /// # use redis::aio::ConnectionManagerConfig;
192    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
193    /// let config = ConnectionManagerConfig::new().set_push_sender(tx);
194    /// ```
195    ///
196    /// ```rust
197    /// # use std::sync::{Mutex, Arc};
198    /// # use redis::aio::ConnectionManagerConfig;
199    /// let messages = Arc::new(Mutex::new(Vec::new()));
200    /// let config = ConnectionManagerConfig::new().set_push_sender(move |msg|{
201    ///     let Ok(mut messages) = messages.lock() else {
202    ///         return Err(redis::aio::SendError);
203    ///     };
204    ///     messages.push(msg);
205    ///     Ok(())
206    /// });
207    /// ```
208    pub fn set_push_sender(mut self, sender: impl AsyncPushSender) -> Self {
209        self.push_sender = Some(Arc::new(sender));
210        self
211    }
212
213    /// Configures the connection manager to automatically resubscribe to all pubsub channels after reconnecting.
214    pub fn set_automatic_resubscription(mut self) -> Self {
215        self.resubscribe_automatically = true;
216        self
217    }
218
219    /// Set the cache behavior.
220    #[cfg(feature = "cache-aio")]
221    pub fn set_cache_config(self, cache_config: crate::caching::CacheConfig) -> Self {
222        Self {
223            cache_config: Some(cache_config),
224            ..self
225        }
226    }
227}
228
229impl Default for ConnectionManagerConfig {
230    fn default() -> Self {
231        Self {
232            exponent_base: Self::DEFAULT_CONNECTION_RETRY_EXPONENT_BASE,
233            min_delay: Self::DEFAULT_CONNECTION_RETRY_MIN_DELAY,
234            max_delay: None,
235            number_of_retries: Self::DEFAULT_NUMBER_OF_CONNECTION_RETRIES,
236            response_timeout: DEFAULT_RESPONSE_TIMEOUT,
237            connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
238            push_sender: None,
239            resubscribe_automatically: false,
240            #[cfg(feature = "cache-aio")]
241            cache_config: None,
242        }
243    }
244}
245
246struct Internals {
247    /// Information used for the connection. This is needed to be able to reconnect.
248    client: Client,
249    /// The connection future.
250    ///
251    /// The `ArcSwap` is required to be able to replace the connection
252    /// without making the `ConnectionManager` mutable.
253    connection: ArcSwap<SharedRedisFuture<MultiplexedConnection>>,
254
255    runtime: Runtime,
256    retry_strategy: ExponentialBuilder,
257    connection_config: AsyncConnectionConfig,
258    subscription_tracker: Option<Mutex<SubscriptionTracker>>,
259    #[cfg(feature = "cache-aio")]
260    cache_manager: Option<CacheManager>,
261    _task_handle: HandleContainer,
262}
263
264/// A `ConnectionManager` is a proxy that wraps a [multiplexed
265/// connection][multiplexed-connection] and automatically reconnects to the
266/// server when necessary.
267///
268/// Like the [`MultiplexedConnection`][multiplexed-connection], this
269/// manager can be cloned, allowing requests to be sent concurrently on
270/// the same underlying connection (tcp/unix socket).
271///
272/// ## Behavior
273///
274/// - When creating an instance of the `ConnectionManager`, an initial
275///   connection will be established and awaited. Connection errors will be
276///   returned directly.
277/// - When a command sent to the server fails with an error that represents
278///   a "connection dropped" condition, that error will be passed on to the
279///   user, but it will trigger a reconnection in the background.
280/// - The reconnect code will atomically swap the current (dead) connection
281///   with a future that will eventually resolve to a `MultiplexedConnection`
282///   or to a `RedisError`
283/// - All commands that are issued after the reconnect process has been
284///   initiated, will have to await the connection future.
285/// - If reconnecting fails, all pending commands will be failed as well. A
286///   new reconnection attempt will be triggered if the error is an I/O error.
287/// - If the connection manager uses RESP3 connection,it actively listens to updates from the
288///   server, and so it will cause the manager to reconnect after a disconnection, even if the manager was unused at
289///   the time of the disconnect.
290///
291/// [multiplexed-connection]: struct.MultiplexedConnection.html
292#[derive(Clone)]
293pub struct ConnectionManager(Arc<Internals>);
294
295impl std::fmt::Debug for ConnectionManager {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        f.debug_struct("ConnectionManager")
298            .field("client", &self.0.client)
299            .field("retry_strategy", &self.0.retry_strategy)
300            .finish()
301    }
302}
303
304/// Type alias for a shared boxed future that will resolve to a `RedisResult`.
305type SharedRedisFuture<T> = Shared<BoxFuture<'static, RedisResult<T>>>;
306
307/// Handle a command result. If the connection was dropped, reconnect.
308macro_rules! reconnect_if_dropped {
309    ($self:expr, $result:expr, $current:expr) => {
310        if let Err(ref e) = $result {
311            if e.is_unrecoverable_error() {
312                Self::reconnect(Arc::downgrade(&$self.0), $current);
313            }
314        }
315    };
316}
317
318/// Handle a connection result. If there's an I/O error, reconnect.
319/// Propagate any error.
320macro_rules! reconnect_if_io_error {
321    ($self:expr, $result:expr, $current:expr) => {
322        if let Err(e) = $result {
323            if e.is_io_error() {
324                Self::reconnect(Arc::downgrade(&$self.0), $current);
325            }
326            return Err(e);
327        }
328    };
329}
330
331impl ConnectionManager {
332    /// Connect to the server and store the connection inside the returned `ConnectionManager`.
333    ///
334    /// This requires the `connection-manager` feature, which will also pull in
335    /// the Tokio executor.
336    pub async fn new(client: Client) -> RedisResult<Self> {
337        let config = ConnectionManagerConfig::new();
338
339        Self::new_with_config(client, config).await
340    }
341
342    /// Connect to the server and store the connection inside the returned `ConnectionManager`.
343    ///
344    /// This requires the `connection-manager` feature, which will also pull in
345    /// the Tokio executor.
346    ///
347    /// In case of reconnection issues, the manager will retry reconnection
348    /// number_of_retries times, with an exponentially increasing delay, calculated as
349    /// min(max_delay, rand(0 .. min_delay * (exponent_base ^ current-try))).
350    ///
351    /// The new connection will time out operations after `response_timeout` has passed.
352    /// Each connection attempt to the server will time out after `connection_timeout`.
353    pub async fn new_with_config(
354        client: Client,
355        config: ConnectionManagerConfig,
356    ) -> RedisResult<Self> {
357        // Create a MultiplexedConnection and wait for it to be established
358        let runtime = Runtime::locate();
359
360        if config.resubscribe_automatically && config.push_sender.is_none() {
361            return Err((crate::ErrorKind::Client, "Cannot set resubscribe_automatically without setting a push sender to receive messages.").into());
362        }
363
364        let mut retry_strategy = ExponentialBuilder::default()
365            .with_factor(config.exponent_base)
366            .with_min_delay(config.min_delay)
367            .with_max_times(config.number_of_retries)
368            .with_jitter();
369        if let Some(max_delay) = config.max_delay {
370            retry_strategy = retry_strategy.with_max_delay(max_delay);
371        }
372
373        let mut connection_config = AsyncConnectionConfig::new()
374            .set_connection_timeout(config.connection_timeout)
375            .set_response_timeout(config.response_timeout);
376
377        #[cfg(feature = "cache-aio")]
378        let cache_manager = config
379            .cache_config
380            .as_ref()
381            .map(|cache_config| CacheManager::new(*cache_config));
382        #[cfg(feature = "cache-aio")]
383        if let Some(cache_manager) = cache_manager.as_ref() {
384            connection_config = connection_config.set_cache_manager(cache_manager.clone());
385        }
386
387        let (oneshot_sender, oneshot_receiver) = oneshot::channel();
388        let _task_handle = HandleContainer::new(
389            runtime.spawn(Self::check_for_disconnect_pushes(oneshot_receiver)),
390        );
391
392        let mut components_for_reconnection_on_push = None;
393        if let Some(push_sender) = config.push_sender.clone() {
394            check_resp3!(
395                client.connection_info.redis.protocol,
396                "Can only pass push sender to a connection using RESP3"
397            );
398
399            let (internal_sender, internal_receiver) = unbounded_channel();
400            components_for_reconnection_on_push = Some((internal_receiver, Some(push_sender)));
401
402            connection_config =
403                connection_config.set_push_sender_internal(Arc::new(internal_sender));
404        } else if client.connection_info.redis.protocol.supports_resp3() {
405            let (internal_sender, internal_receiver) = unbounded_channel();
406            components_for_reconnection_on_push = Some((internal_receiver, None));
407
408            connection_config =
409                connection_config.set_push_sender_internal(Arc::new(internal_sender));
410        }
411
412        let connection =
413            Self::new_connection(&client, retry_strategy, &connection_config, None).await?;
414        let subscription_tracker = if config.resubscribe_automatically {
415            Some(Mutex::new(SubscriptionTracker::default()))
416        } else {
417            None
418        };
419
420        let new_self = Self(Arc::new(Internals {
421            client,
422            connection: ArcSwap::from_pointee(future::ok(connection).boxed().shared()),
423            runtime,
424            retry_strategy,
425            connection_config,
426            subscription_tracker,
427            #[cfg(feature = "cache-aio")]
428            cache_manager,
429            _task_handle,
430        }));
431
432        if let Some((internal_receiver, external_sender)) = components_for_reconnection_on_push {
433            oneshot_sender
434                .send((
435                    Arc::downgrade(&new_self.0),
436                    internal_receiver,
437                    external_sender,
438                ))
439                .map_err(|_| {
440                    crate::RedisError::from((
441                        crate::ErrorKind::Client,
442                        "Failed to set automatic resubscription",
443                    ))
444                })?;
445        };
446
447        Ok(new_self)
448    }
449
450    async fn new_connection(
451        client: &Client,
452        exponential_backoff: ExponentialBuilder,
453        connection_config: &AsyncConnectionConfig,
454        additional_commands: Option<Pipeline>,
455    ) -> RedisResult<MultiplexedConnection> {
456        let connection_config = connection_config.clone();
457        let get_conn = || async {
458            client
459                .get_multiplexed_async_connection_with_config(&connection_config)
460                .await
461        };
462        let mut conn = get_conn
463            .retry(exponential_backoff)
464            .sleep(|duration| async move { Runtime::locate().sleep(duration).await })
465            .await?;
466        if let Some(pipeline) = additional_commands {
467            // TODO - should we ignore these failures?
468            let _ = pipeline.exec_async(&mut conn).await;
469        }
470        Ok(conn)
471    }
472
473    /// Reconnect and overwrite the old connection.
474    ///
475    /// The `current` guard points to the shared future that was active
476    /// when the connection loss was detected.
477    fn reconnect(
478        internals: Weak<Internals>,
479        current: arc_swap::Guard<Arc<SharedRedisFuture<MultiplexedConnection>>>,
480    ) {
481        let Some(internals) = internals.upgrade() else {
482            return;
483        };
484        let internals_clone = internals.clone();
485        #[cfg(not(feature = "cache-aio"))]
486        let connection_config = internals.connection_config.clone();
487        #[cfg(feature = "cache-aio")]
488        let mut connection_config = internals.connection_config.clone();
489        #[cfg(feature = "cache-aio")]
490        if let Some(manager) = internals.cache_manager.as_ref() {
491            let new_cache_manager = manager.clone_and_increase_epoch();
492            connection_config = connection_config.set_cache_manager(new_cache_manager);
493        }
494        let new_connection: SharedRedisFuture<MultiplexedConnection> = async move {
495            let additional_commands = match &internals_clone.subscription_tracker {
496                Some(subscription_tracker) => Some(
497                    subscription_tracker
498                        .lock()
499                        .await
500                        .get_subscription_pipeline(),
501                ),
502                None => None,
503            };
504
505            let con = Self::new_connection(
506                &internals_clone.client,
507                internals_clone.retry_strategy,
508                &connection_config,
509                additional_commands,
510            )
511            .await?;
512            Ok(con)
513        }
514        .boxed()
515        .shared();
516
517        // Update the connection in the connection manager
518        let new_connection_arc = Arc::new(new_connection.clone());
519        let prev = internals
520            .connection
521            .compare_and_swap(&current, new_connection_arc);
522
523        // If the swap happened...
524        if Arc::ptr_eq(&prev, &current) {
525            // ...start the connection attempt immediately but do not wait on it.
526            internals.runtime.spawn(new_connection.map(|_| ())).detach();
527        }
528    }
529
530    async fn check_for_disconnect_pushes(
531        receiver: oneshot::Receiver<(
532            Weak<Internals>,
533            UnboundedReceiver<PushInfo>,
534            OptionalPushSender,
535        )>,
536    ) {
537        let Ok((this, mut internal_receiver, external_sender)) = receiver.await else {
538            return;
539        };
540        while let Some(push_info) = internal_receiver.recv().await {
541            if push_info.kind == PushKind::Disconnection {
542                let Some(internals) = this.upgrade() else {
543                    return;
544                };
545                Self::reconnect(Arc::downgrade(&internals), internals.connection.load());
546            }
547            if let Some(sender) = external_sender.as_ref() {
548                let _ = sender.send(push_info);
549            }
550        }
551    }
552
553    /// Sends an already encoded (packed) command into the TCP socket and
554    /// reads the single response from it.
555    pub async fn send_packed_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
556        // Clone connection to avoid having to lock the ArcSwap in write mode
557        let guard = self.0.connection.load();
558        let connection_result = (**guard).clone().await.map_err(|e| e.clone());
559        reconnect_if_io_error!(self, connection_result, guard);
560        let result = connection_result?.send_packed_command(cmd).await;
561        reconnect_if_dropped!(self, &result, guard);
562        result
563    }
564
565    /// Sends multiple already encoded (packed) command into the TCP socket
566    /// and reads `count` responses from it.  This is used to implement
567    /// pipelining.
568    pub async fn send_packed_commands(
569        &mut self,
570        cmd: &crate::Pipeline,
571        offset: usize,
572        count: usize,
573    ) -> RedisResult<Vec<Value>> {
574        // Clone shared connection future to avoid having to lock the ArcSwap in write mode
575        let guard = self.0.connection.load();
576        let connection_result = (**guard).clone().await.map_err(|e| e.clone());
577        reconnect_if_io_error!(self, connection_result, guard);
578        let result = connection_result?
579            .send_packed_commands(cmd, offset, count)
580            .await;
581        reconnect_if_dropped!(self, &result, guard);
582        result
583    }
584
585    async fn update_subscription_tracker(
586        &self,
587        action: SubscriptionAction,
588        args: impl ToRedisArgs,
589    ) {
590        let Some(subscription_tracker) = &self.0.subscription_tracker else {
591            return;
592        };
593        let args = args.to_redis_args().into_iter();
594        subscription_tracker
595            .lock()
596            .await
597            .update_with_request(action, args);
598    }
599
600    /// Subscribes to a new channel(s).
601    ///
602    /// Updates from the sender will be sent on the push sender that was passed to the manager.
603    /// If the manager was configured without a push sender, the connection won't be able to pass messages back to the user.
604    ///
605    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
606    /// It should be noted that unless [ConnectionManagerConfig::set_automatic_resubscription] was called,
607    /// the subscription will be removed on a disconnect and must be re-subscribed.
608    ///  
609    /// ```rust,no_run
610    /// # async fn func() -> redis::RedisResult<()> {
611    /// let client = redis::Client::open("redis://127.0.0.1/?protocol=resp3").unwrap();
612    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
613    /// let config = redis::aio::ConnectionManagerConfig::new().set_push_sender(tx);
614    /// let mut con = client.get_connection_manager_with_config(config).await?;
615    /// con.psubscribe("channel*_1").await?;
616    /// con.psubscribe(&["channel*_2", "channel*_3"]).await?;
617    /// # Ok(())
618    /// # }
619    /// ```
620    pub async fn subscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
621        check_resp3!(self.0.client.connection_info.redis.protocol);
622        let mut cmd = cmd("SUBSCRIBE");
623        cmd.arg(&channel_name);
624        cmd.exec_async(self).await?;
625        self.update_subscription_tracker(SubscriptionAction::Subscribe, channel_name)
626            .await;
627
628        Ok(())
629    }
630
631    /// Unsubscribes from channel(s).
632    ///
633    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
634    pub async fn unsubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
635        check_resp3!(self.0.client.connection_info.redis.protocol);
636        let mut cmd = cmd("UNSUBSCRIBE");
637        cmd.arg(&channel_name);
638        cmd.exec_async(self).await?;
639        self.update_subscription_tracker(SubscriptionAction::Unsubscribe, channel_name)
640            .await;
641        Ok(())
642    }
643
644    /// Subscribes to new channel(s) with pattern(s).
645    ///
646    /// Updates from the sender will be sent on the push sender that was passed to the manager.
647    /// If the manager was configured without a push sender, the manager won't be able to pass messages back to the user.
648    ///
649    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
650    /// It should be noted that unless [ConnectionManagerConfig::set_automatic_resubscription] was called,
651    /// the subscription will be removed on a disconnect and must be re-subscribed.
652    ///
653    /// ```rust,no_run
654    /// # async fn func() -> redis::RedisResult<()> {
655    /// let client = redis::Client::open("redis://127.0.0.1/?protocol=resp3").unwrap();
656    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
657    /// let config = redis::aio::ConnectionManagerConfig::new().set_push_sender(tx);
658    /// let mut con = client.get_connection_manager_with_config(config).await?;
659    /// con.psubscribe("channel*_1").await?;
660    /// con.psubscribe(&["channel*_2", "channel*_3"]).await?;
661    /// # Ok(())
662    /// # }
663    /// ```
664    pub async fn psubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
665        check_resp3!(self.0.client.connection_info.redis.protocol);
666        let mut cmd = cmd("PSUBSCRIBE");
667        cmd.arg(&channel_pattern);
668        cmd.exec_async(self).await?;
669        self.update_subscription_tracker(SubscriptionAction::PSubscribe, channel_pattern)
670            .await;
671        Ok(())
672    }
673
674    /// Unsubscribes from channel pattern(s).
675    ///
676    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
677    pub async fn punsubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
678        check_resp3!(self.0.client.connection_info.redis.protocol);
679        let mut cmd = cmd("PUNSUBSCRIBE");
680        cmd.arg(&channel_pattern);
681        cmd.exec_async(self).await?;
682        self.update_subscription_tracker(SubscriptionAction::PUnsubscribe, channel_pattern)
683            .await;
684        Ok(())
685    }
686
687    /// Gets [`crate::caching::CacheStatistics`] for current connection if caching is enabled.
688    #[cfg(feature = "cache-aio")]
689    #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
690    pub fn get_cache_statistics(&self) -> Option<crate::caching::CacheStatistics> {
691        self.0.cache_manager.as_ref().map(|cm| cm.statistics())
692    }
693}
694
695impl ConnectionLike for ConnectionManager {
696    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
697        (async move { self.send_packed_command(cmd).await }).boxed()
698    }
699
700    fn req_packed_commands<'a>(
701        &'a mut self,
702        cmd: &'a crate::Pipeline,
703        offset: usize,
704        count: usize,
705    ) -> RedisFuture<'a, Vec<Value>> {
706        (async move { self.send_packed_commands(cmd, offset, count).await }).boxed()
707    }
708
709    fn get_db(&self) -> i64 {
710        self.0.client.connection_info().redis.db
711    }
712}