redis/aio/
monitor.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_util::{ready, SinkExt, Stream, StreamExt};
7use tokio::io::{AsyncRead, AsyncWrite};
8use tokio_util::codec::Decoder;
9
10use crate::{
11    cmd, parser::ValueCodec, types::closed_connection_error, FromRedisValue, RedisConnectionInfo,
12    RedisResult, Value,
13};
14
15use super::setup_connection;
16
17/// Represents a `Monitor` connection.
18pub struct Monitor {
19    stream: Box<dyn Stream<Item = RedisResult<Value>> + Send + Sync + Unpin>,
20}
21
22impl Monitor {
23    pub(crate) async fn new<C>(
24        connection_info: &RedisConnectionInfo,
25        stream: C,
26    ) -> RedisResult<Self>
27    where
28        C: Unpin + AsyncRead + AsyncWrite + Send + Sync + 'static,
29    {
30        let mut codec = ValueCodec::default().framed(stream);
31        setup_connection(
32            &mut codec,
33            connection_info,
34            #[cfg(feature = "cache-aio")]
35            None,
36        )
37        .await?;
38        codec.send(cmd("MONITOR").get_packed_command()).await?;
39        codec.next().await.ok_or_else(closed_connection_error)??;
40        let stream = Box::new(codec);
41
42        Ok(Self { stream })
43    }
44
45    /// Deliver the MONITOR command to this [`Monitor`]ing wrapper.
46    #[deprecated(note = "A monitor now sends the MONITOR command automatically")]
47    pub async fn monitor(&mut self) -> RedisResult<()> {
48        Ok(())
49    }
50
51    /// Returns [`Stream`] of [`FromRedisValue`] values from this [`Monitor`]ing connection
52    pub fn on_message<'a, T: FromRedisValue + 'a>(&'a mut self) -> impl Stream<Item = T> + 'a {
53        MonitorStreamRef {
54            monitor: self,
55            _phantom: std::marker::PhantomData,
56        }
57    }
58
59    /// Returns [`Stream`] of [`FromRedisValue`] values from this [`Monitor`]ing connection
60    pub fn into_on_message<T: FromRedisValue>(self) -> impl Stream<Item = T> {
61        MonitorStream {
62            stream: self.stream,
63            _phantom: std::marker::PhantomData,
64        }
65    }
66}
67
68struct MonitorStream<T> {
69    stream: Box<dyn Stream<Item = RedisResult<Value>> + Send + Sync + Unpin>,
70    _phantom: std::marker::PhantomData<T>,
71}
72impl<T> Unpin for MonitorStream<T> {}
73
74fn convert_value<T>(value: RedisResult<Value>) -> Option<T>
75where
76    T: FromRedisValue,
77{
78    value
79        .ok()
80        .and_then(|value| T::from_owned_redis_value(value).ok())
81}
82
83impl<T> Stream for MonitorStream<T>
84where
85    T: FromRedisValue,
86{
87    type Item = T;
88
89    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90        Poll::Ready(ready!(self.stream.poll_next_unpin(cx)).and_then(convert_value))
91    }
92}
93
94struct MonitorStreamRef<'a, T> {
95    monitor: &'a mut Monitor,
96    _phantom: std::marker::PhantomData<T>,
97}
98impl<T> Unpin for MonitorStreamRef<'_, T> {}
99
100impl<T> Stream for MonitorStreamRef<'_, T>
101where
102    T: FromRedisValue,
103{
104    type Item = T;
105
106    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
107        Poll::Ready(ready!(self.monitor.stream.poll_next_unpin(cx)).and_then(convert_value))
108    }
109}