actix_web_actors/
context.rs

1use std::{
2    collections::VecDeque,
3    future::Future,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use actix::{
9    dev::{AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, ToEnvelope},
10    fut::ActorFuture,
11    Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle,
12};
13use actix_web::error::Error;
14use bytes::Bytes;
15use futures_core::Stream;
16use tokio::sync::oneshot::Sender;
17
18/// Execution context for HTTP actors
19///
20/// # Example
21///
22/// A demonstration of [server-sent events](https://developer.mozilla.org/docs/Web/API/Server-sent_events) using actors:
23///
24/// ```no_run
25/// use std::time::Duration;
26///
27/// use actix::{Actor, AsyncContext};
28/// use actix_web::{get, http::header, App, HttpResponse, HttpServer};
29/// use actix_web_actors::HttpContext;
30/// use bytes::Bytes;
31///
32/// struct MyActor {
33///     count: usize,
34/// }
35///
36/// impl Actor for MyActor {
37///     type Context = HttpContext<Self>;
38///
39///     fn started(&mut self, ctx: &mut Self::Context) {
40///         ctx.run_later(Duration::from_millis(100), Self::write);
41///     }
42/// }
43///
44/// impl MyActor {
45///     fn write(&mut self, ctx: &mut HttpContext<Self>) {
46///         self.count += 1;
47///         if self.count > 3 {
48///             ctx.write_eof()
49///         } else {
50///             ctx.write(Bytes::from(format!("event: count\ndata: {}\n\n", self.count)));
51///             ctx.run_later(Duration::from_millis(100), Self::write);
52///         }
53///     }
54/// }
55///
56/// #[get("/")]
57/// async fn index() -> HttpResponse {
58///     HttpResponse::Ok()
59///         .insert_header(header::ContentType(mime::TEXT_EVENT_STREAM))
60///         .streaming(HttpContext::create(MyActor { count: 0 }))
61/// }
62///
63/// #[actix_web::main]
64/// async fn main() -> std::io::Result<()> {
65///     HttpServer::new(|| App::new().service(index))
66///         .bind(("127.0.0.1", 8080))?
67///         .run()
68///         .await
69/// }
70/// ```
71pub struct HttpContext<A>
72where
73    A: Actor<Context = HttpContext<A>>,
74{
75    inner: ContextParts<A>,
76    stream: VecDeque<Option<Bytes>>,
77}
78
79impl<A> ActorContext for HttpContext<A>
80where
81    A: Actor<Context = Self>,
82{
83    fn stop(&mut self) {
84        self.inner.stop();
85    }
86    fn terminate(&mut self) {
87        self.inner.terminate()
88    }
89    fn state(&self) -> ActorState {
90        self.inner.state()
91    }
92}
93
94impl<A> AsyncContext<A> for HttpContext<A>
95where
96    A: Actor<Context = Self>,
97{
98    #[inline]
99    fn spawn<F>(&mut self, fut: F) -> SpawnHandle
100    where
101        F: ActorFuture<A, Output = ()> + 'static,
102    {
103        self.inner.spawn(fut)
104    }
105
106    #[inline]
107    fn wait<F>(&mut self, fut: F)
108    where
109        F: ActorFuture<A, Output = ()> + 'static,
110    {
111        self.inner.wait(fut)
112    }
113
114    #[doc(hidden)]
115    #[inline]
116    fn waiting(&self) -> bool {
117        self.inner.waiting()
118            || self.inner.state() == ActorState::Stopping
119            || self.inner.state() == ActorState::Stopped
120    }
121
122    #[inline]
123    fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
124        self.inner.cancel_future(handle)
125    }
126
127    #[inline]
128    fn address(&self) -> Addr<A> {
129        self.inner.address()
130    }
131}
132
133impl<A> HttpContext<A>
134where
135    A: Actor<Context = Self>,
136{
137    #[inline]
138    /// Create a new HTTP Context from a request and an actor
139    pub fn create(actor: A) -> impl Stream<Item = Result<Bytes, Error>> {
140        let mb = Mailbox::default();
141        let ctx = HttpContext {
142            inner: ContextParts::new(mb.sender_producer()),
143            stream: VecDeque::new(),
144        };
145        HttpContextFut::new(ctx, actor, mb)
146    }
147
148    /// Create a new HTTP Context
149    pub fn with_factory<F>(f: F) -> impl Stream<Item = Result<Bytes, Error>>
150    where
151        F: FnOnce(&mut Self) -> A + 'static,
152    {
153        let mb = Mailbox::default();
154        let mut ctx = HttpContext {
155            inner: ContextParts::new(mb.sender_producer()),
156            stream: VecDeque::new(),
157        };
158
159        let act = f(&mut ctx);
160        HttpContextFut::new(ctx, act, mb)
161    }
162}
163
164impl<A> HttpContext<A>
165where
166    A: Actor<Context = Self>,
167{
168    /// Write payload
169    #[inline]
170    pub fn write(&mut self, data: Bytes) {
171        self.stream.push_back(Some(data));
172    }
173
174    /// Indicate end of streaming payload. Also this method calls `Self::close`.
175    #[inline]
176    pub fn write_eof(&mut self) {
177        self.stream.push_back(None);
178    }
179
180    /// Handle of the running future
181    ///
182    /// SpawnHandle is the handle returned by `AsyncContext::spawn()` method.
183    pub fn handle(&self) -> SpawnHandle {
184        self.inner.curr_handle()
185    }
186}
187
188impl<A> AsyncContextParts<A> for HttpContext<A>
189where
190    A: Actor<Context = Self>,
191{
192    fn parts(&mut self) -> &mut ContextParts<A> {
193        &mut self.inner
194    }
195}
196
197struct HttpContextFut<A>
198where
199    A: Actor<Context = HttpContext<A>>,
200{
201    fut: ContextFut<A, HttpContext<A>>,
202}
203
204impl<A> HttpContextFut<A>
205where
206    A: Actor<Context = HttpContext<A>>,
207{
208    fn new(ctx: HttpContext<A>, act: A, mailbox: Mailbox<A>) -> Self {
209        let fut = ContextFut::new(ctx, act, mailbox);
210        HttpContextFut { fut }
211    }
212}
213
214impl<A> Stream for HttpContextFut<A>
215where
216    A: Actor<Context = HttpContext<A>>,
217{
218    type Item = Result<Bytes, Error>;
219
220    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
221        if self.fut.alive() {
222            let _ = Pin::new(&mut self.fut).poll(cx);
223        }
224
225        // frames
226        if let Some(data) = self.fut.ctx().stream.pop_front() {
227            Poll::Ready(data.map(Ok))
228        } else if self.fut.alive() {
229            Poll::Pending
230        } else {
231            Poll::Ready(None)
232        }
233    }
234}
235
236impl<A, M> ToEnvelope<A, M> for HttpContext<A>
237where
238    A: Actor<Context = HttpContext<A>> + Handler<M>,
239    M: Message + Send + 'static,
240    M::Result: Send,
241{
242    fn pack(msg: M, tx: Option<Sender<M::Result>>) -> Envelope<A> {
243        Envelope::new(msg, tx)
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use std::time::Duration;
250
251    use actix_web::{
252        http::StatusCode,
253        test::{call_service, init_service, read_body, TestRequest},
254        web, App, HttpResponse,
255    };
256
257    use super::*;
258
259    struct MyActor {
260        count: usize,
261    }
262
263    impl Actor for MyActor {
264        type Context = HttpContext<Self>;
265
266        fn started(&mut self, ctx: &mut Self::Context) {
267            ctx.run_later(Duration::from_millis(100), Self::write);
268        }
269    }
270
271    impl MyActor {
272        fn write(&mut self, ctx: &mut HttpContext<Self>) {
273            self.count += 1;
274            if self.count > 3 {
275                ctx.write_eof()
276            } else {
277                ctx.write(Bytes::from(format!("LINE-{}", self.count)));
278                ctx.run_later(Duration::from_millis(100), Self::write);
279            }
280        }
281    }
282
283    #[actix_rt::test]
284    async fn test_default_resource() {
285        let srv = init_service(App::new().service(web::resource("/test").to(|| async {
286            HttpResponse::Ok().streaming(HttpContext::create(MyActor { count: 0 }))
287        })))
288        .await;
289
290        let req = TestRequest::with_uri("/test").to_request();
291        let resp = call_service(&srv, req).await;
292        assert_eq!(resp.status(), StatusCode::OK);
293
294        let body = read_body(resp).await;
295        assert_eq!(body, Bytes::from_static(b"LINE-1LINE-2LINE-3"));
296    }
297}