1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_util::{ready, SinkExt, Stream, StreamExt};
7use tokio::io::{AsyncRead, AsyncWrite};
8use tokio_util::codec::Decoder;
9
10use crate::{
11 cmd, errors::closed_connection_error, parser::ValueCodec, FromRedisValue, RedisConnectionInfo,
12 RedisResult, Value,
13};
14
15use super::setup_connection;
16
17pub struct Monitor {
19 stream: Box<dyn Stream<Item = RedisResult<Value>> + Send + Sync + Unpin>,
20}
21
22impl Monitor {
23 pub(crate) async fn new<C>(
24 connection_info: &RedisConnectionInfo,
25 stream: C,
26 ) -> RedisResult<Self>
27 where
28 C: Unpin + AsyncRead + AsyncWrite + Send + Sync + 'static,
29 {
30 let mut codec = ValueCodec::default().framed(stream);
31 setup_connection(
32 &mut codec,
33 connection_info,
34 #[cfg(feature = "cache-aio")]
35 None,
36 )
37 .await?;
38 codec.send(cmd("MONITOR").get_packed_command()).await?;
39 codec.next().await.ok_or_else(closed_connection_error)??;
40 let stream = Box::new(codec);
41
42 Ok(Self { stream })
43 }
44
45 pub fn on_message<'a, T: FromRedisValue + 'a>(&'a mut self) -> impl Stream<Item = T> + 'a {
47 MonitorStreamRef {
48 monitor: self,
49 _phantom: std::marker::PhantomData,
50 }
51 }
52
53 pub fn into_on_message<T: FromRedisValue>(self) -> impl Stream<Item = T> {
55 MonitorStream {
56 stream: self.stream,
57 _phantom: std::marker::PhantomData,
58 }
59 }
60}
61
62struct MonitorStream<T> {
63 stream: Box<dyn Stream<Item = RedisResult<Value>> + Send + Sync + Unpin>,
64 _phantom: std::marker::PhantomData<T>,
65}
66impl<T> Unpin for MonitorStream<T> {}
67
68fn convert_value<T>(value: RedisResult<Value>) -> Option<T>
69where
70 T: FromRedisValue,
71{
72 value.ok().and_then(|value| T::from_redis_value(value).ok())
73}
74
75impl<T> Stream for MonitorStream<T>
76where
77 T: FromRedisValue,
78{
79 type Item = T;
80
81 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
82 Poll::Ready(ready!(self.stream.poll_next_unpin(cx)).and_then(convert_value))
83 }
84}
85
86struct MonitorStreamRef<'a, T> {
87 monitor: &'a mut Monitor,
88 _phantom: std::marker::PhantomData<T>,
89}
90impl<T> Unpin for MonitorStreamRef<'_, T> {}
91
92impl<T> Stream for MonitorStreamRef<'_, T>
93where
94 T: FromRedisValue,
95{
96 type Item = T;
97
98 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
99 Poll::Ready(ready!(self.monitor.stream.poll_next_unpin(cx)).and_then(convert_value))
100 }
101}