headless_lms_server/controllers/main_frontend/
playground_views.rs1use crate::prelude::*;
4use actix::{
5 Actor, ActorContext, Addr, AsyncContext, Handler, Message, SpawnHandle, StreamHandler,
6};
7use actix_web_actors::ws;
8use models::exercise_task_gradings::ExerciseTaskGradingResult;
9use once_cell::sync::OnceCell;
10use std::{
11 collections::HashMap,
12 sync::RwLock,
13 time::{Duration, Instant},
14};
15use utoipa::OpenApi;
16
17const PING_INTERVAL: Duration = Duration::from_secs(10);
19const CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
21
22static WS_CONNECTIONS: WsConnections = WsConnections::new();
24
25#[derive(OpenApi)]
26#[openapi(paths(websocket, receive_grading))]
27pub(crate) struct MainFrontendPlaygroundViewsApiDoc;
28
29struct WsConnections(OnceCell<RwLock<HashMap<Uuid, Addr<ClientConnection>>>>);
31
32impl WsConnections {
33 const fn new() -> Self {
34 Self(OnceCell::new())
35 }
36
37 fn get(&self, id: Uuid) -> Option<Addr<ClientConnection>> {
38 let lock = self
39 .0
40 .get_or_init(Default::default)
41 .read()
42 .unwrap_or_else(|poisoned| poisoned.into_inner());
43 lock.get(&id).cloned()
44 }
45
46 fn register(&self, id: Uuid, addr: Addr<ClientConnection>) {
47 let mut lock = self
48 .0
49 .get_or_init(Default::default)
50 .write()
51 .unwrap_or_else(|poisoned| poisoned.into_inner());
52 lock.insert(id, addr);
53 }
54
55 fn unregister(&self, id: Uuid) {
56 if let Some(connections) = self.0.get() {
57 let mut lock = connections
58 .write()
59 .unwrap_or_else(|poisoned| poisoned.into_inner());
60 lock.remove(&id);
61 }
62 }
63}
64
65struct ClientConnection {
67 client_id: Uuid,
68 last_pong: Instant,
69 ping_handle: Option<SpawnHandle>,
71}
72
73impl ClientConnection {
74 fn new() -> Self {
75 Self {
76 client_id: Uuid::new_v4(),
77 last_pong: Instant::now(),
78 ping_handle: None,
79 }
80 }
81}
82
83impl Actor for ClientConnection {
84 type Context = ws::WebsocketContext<Self>;
85
86 fn started(&mut self, ctx: &mut Self::Context) {
87 let id = self.client_id;
88 let ping_handle = ctx.run_interval(PING_INTERVAL, move |socket, ctx| {
90 if socket.last_pong.elapsed() > CONNECTION_TIMEOUT {
91 WS_CONNECTIONS.unregister(id);
93 if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::TimedOut) {
94 ctx.text(text);
95 } else {
96 error!("Failed to serialize PlaygroundViewsMessage::TimedOut");
97 }
98 ctx.stop();
99 } else {
100 ctx.ping(b"ping");
101 }
102 });
103 WS_CONNECTIONS.register(id, ctx.address());
104 if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::Registered(id)) {
106 ctx.text(text);
107 } else {
108 error!("Failed to serialize PlaygroundViewsMessage::Registered");
109 }
110 self.ping_handle = Some(ping_handle);
111 }
112}
113
114impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for ClientConnection {
115 fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
116 match item {
118 Ok(ws::Message::Ping(_)) => {
119 ctx.pong(b"pong");
120 }
121 Ok(ws::Message::Pong(_)) => {
122 self.last_pong = Instant::now();
123 }
124 _ => {}
125 };
126 }
127}
128
129#[derive(Debug, Message)]
132#[rtype(result = "()")]
133struct PlaygroundSubmissionMessage {
134 grading_result: ExerciseTaskGradingResult,
135}
136
137impl Handler<PlaygroundSubmissionMessage> for ClientConnection {
138 type Result = ();
139
140 fn handle(
141 &mut self,
142 msg: PlaygroundSubmissionMessage,
143 ctx: &mut Self::Context,
144 ) -> Self::Result {
145 if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::ExerciseTaskGradingResult(
147 msg.grading_result,
148 )) {
149 ctx.text(text);
150 } else {
151 error!("Failed to serialize PlaygroundViewsMessage::ExerciseTaskGradingResult");
152 }
153 }
154}
155
156#[derive(Debug, Serialize, Message)]
158#[rtype(result = "()")]
159#[serde(tag = "tag", content = "data")]
160pub enum PlaygroundViewsMessage {
161 TimedOut,
163 Registered(Uuid),
165 ExerciseTaskGradingResult(ExerciseTaskGradingResult),
167}
168
169#[utoipa::path(
171 get,
172 path = "/ws",
173 operation_id = "getPlaygroundViewsWebsocket",
174 tag = "playground-views",
175 responses(
176 (status = 101, description = "WebSocket connection upgraded")
177 )
178)]
179async fn websocket(
180 req: HttpRequest,
181 stream: web::Payload,
182) -> Result<HttpResponse, ControllerError> {
183 ws::start(ClientConnection::new(), &req, stream).map_err(Into::into)
185}
186
187#[utoipa::path(
190 post,
191 path = "/grading/{websocket_id}",
192 operation_id = "receivePlaygroundGrading",
193 tag = "playground-views",
194 params(
195 ("websocket_id" = Uuid, Path, description = "Playground websocket id")
196 ),
197 request_body = ExerciseTaskGradingResult,
198 responses(
199 (status = 200, description = "Grading forwarded to websocket client")
200 )
201)]
202async fn receive_grading(
203 websocket_id: web::Path<Uuid>,
204 grading_result: web::Json<ExerciseTaskGradingResult>,
205) -> Result<HttpResponse, ControllerError> {
206 if let Some(conn) = WS_CONNECTIONS.get(*websocket_id) {
208 conn.do_send(PlaygroundSubmissionMessage {
209 grading_result: grading_result.into_inner(),
210 });
211 }
212 Ok(HttpResponse::Ok().finish())
213}
214
215pub fn _add_routes(cfg: &mut ServiceConfig) {
216 cfg.route("/ws", web::get().to(websocket))
217 .route("/grading/{websocket_id}", web::post().to(receive_grading));
218}