actix_web_actors/
context.rs1use std::{
2 collections::VecDeque,
3 future::Future,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use actix::{
9 dev::{AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, ToEnvelope},
10 fut::ActorFuture,
11 Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle,
12};
13use actix_web::error::Error;
14use bytes::Bytes;
15use futures_core::Stream;
16use tokio::sync::oneshot::Sender;
17
18pub struct HttpContext<A>
72where
73 A: Actor<Context = HttpContext<A>>,
74{
75 inner: ContextParts<A>,
76 stream: VecDeque<Option<Bytes>>,
77}
78
79impl<A> ActorContext for HttpContext<A>
80where
81 A: Actor<Context = Self>,
82{
83 fn stop(&mut self) {
84 self.inner.stop();
85 }
86 fn terminate(&mut self) {
87 self.inner.terminate()
88 }
89 fn state(&self) -> ActorState {
90 self.inner.state()
91 }
92}
93
94impl<A> AsyncContext<A> for HttpContext<A>
95where
96 A: Actor<Context = Self>,
97{
98 #[inline]
99 fn spawn<F>(&mut self, fut: F) -> SpawnHandle
100 where
101 F: ActorFuture<A, Output = ()> + 'static,
102 {
103 self.inner.spawn(fut)
104 }
105
106 #[inline]
107 fn wait<F>(&mut self, fut: F)
108 where
109 F: ActorFuture<A, Output = ()> + 'static,
110 {
111 self.inner.wait(fut)
112 }
113
114 #[doc(hidden)]
115 #[inline]
116 fn waiting(&self) -> bool {
117 self.inner.waiting()
118 || self.inner.state() == ActorState::Stopping
119 || self.inner.state() == ActorState::Stopped
120 }
121
122 #[inline]
123 fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
124 self.inner.cancel_future(handle)
125 }
126
127 #[inline]
128 fn address(&self) -> Addr<A> {
129 self.inner.address()
130 }
131}
132
133impl<A> HttpContext<A>
134where
135 A: Actor<Context = Self>,
136{
137 #[inline]
138 pub fn create(actor: A) -> impl Stream<Item = Result<Bytes, Error>> {
140 let mb = Mailbox::default();
141 let ctx = HttpContext {
142 inner: ContextParts::new(mb.sender_producer()),
143 stream: VecDeque::new(),
144 };
145 HttpContextFut::new(ctx, actor, mb)
146 }
147
148 pub fn with_factory<F>(f: F) -> impl Stream<Item = Result<Bytes, Error>>
150 where
151 F: FnOnce(&mut Self) -> A + 'static,
152 {
153 let mb = Mailbox::default();
154 let mut ctx = HttpContext {
155 inner: ContextParts::new(mb.sender_producer()),
156 stream: VecDeque::new(),
157 };
158
159 let act = f(&mut ctx);
160 HttpContextFut::new(ctx, act, mb)
161 }
162}
163
164impl<A> HttpContext<A>
165where
166 A: Actor<Context = Self>,
167{
168 #[inline]
170 pub fn write(&mut self, data: Bytes) {
171 self.stream.push_back(Some(data));
172 }
173
174 #[inline]
176 pub fn write_eof(&mut self) {
177 self.stream.push_back(None);
178 }
179
180 pub fn handle(&self) -> SpawnHandle {
184 self.inner.curr_handle()
185 }
186}
187
188impl<A> AsyncContextParts<A> for HttpContext<A>
189where
190 A: Actor<Context = Self>,
191{
192 fn parts(&mut self) -> &mut ContextParts<A> {
193 &mut self.inner
194 }
195}
196
197struct HttpContextFut<A>
198where
199 A: Actor<Context = HttpContext<A>>,
200{
201 fut: ContextFut<A, HttpContext<A>>,
202}
203
204impl<A> HttpContextFut<A>
205where
206 A: Actor<Context = HttpContext<A>>,
207{
208 fn new(ctx: HttpContext<A>, act: A, mailbox: Mailbox<A>) -> Self {
209 let fut = ContextFut::new(ctx, act, mailbox);
210 HttpContextFut { fut }
211 }
212}
213
214impl<A> Stream for HttpContextFut<A>
215where
216 A: Actor<Context = HttpContext<A>>,
217{
218 type Item = Result<Bytes, Error>;
219
220 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
221 if self.fut.alive() {
222 let _ = Pin::new(&mut self.fut).poll(cx);
223 }
224
225 if let Some(data) = self.fut.ctx().stream.pop_front() {
227 Poll::Ready(data.map(Ok))
228 } else if self.fut.alive() {
229 Poll::Pending
230 } else {
231 Poll::Ready(None)
232 }
233 }
234}
235
236impl<A, M> ToEnvelope<A, M> for HttpContext<A>
237where
238 A: Actor<Context = HttpContext<A>> + Handler<M>,
239 M: Message + Send + 'static,
240 M::Result: Send,
241{
242 fn pack(msg: M, tx: Option<Sender<M::Result>>) -> Envelope<A> {
243 Envelope::new(msg, tx)
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use std::time::Duration;
250
251 use actix_web::{
252 http::StatusCode,
253 test::{call_service, init_service, read_body, TestRequest},
254 web, App, HttpResponse,
255 };
256
257 use super::*;
258
259 struct MyActor {
260 count: usize,
261 }
262
263 impl Actor for MyActor {
264 type Context = HttpContext<Self>;
265
266 fn started(&mut self, ctx: &mut Self::Context) {
267 ctx.run_later(Duration::from_millis(100), Self::write);
268 }
269 }
270
271 impl MyActor {
272 fn write(&mut self, ctx: &mut HttpContext<Self>) {
273 self.count += 1;
274 if self.count > 3 {
275 ctx.write_eof()
276 } else {
277 ctx.write(Bytes::from(format!("LINE-{}", self.count)));
278 ctx.run_later(Duration::from_millis(100), Self::write);
279 }
280 }
281 }
282
283 #[actix_rt::test]
284 async fn test_default_resource() {
285 let srv = init_service(App::new().service(web::resource("/test").to(|| async {
286 HttpResponse::Ok().streaming(HttpContext::create(MyActor { count: 0 }))
287 })))
288 .await;
289
290 let req = TestRequest::with_uri("/test").to_request();
291 let resp = call_service(&srv, req).await;
292 assert_eq!(resp.status(), StatusCode::OK);
293
294 let body = read_body(resp).await;
295 assert_eq!(body, Bytes::from_static(b"LINE-1LINE-2LINE-3"));
296 }
297}