headless_lms_server/controllers/main_frontend/
playground_views.rs1use crate::{
4 domain::models_requests::{JwtKey, PlaygroundGradingCallbackClaim},
5 prelude::*,
6};
7use actix::{
8 Actor, ActorContext, Addr, AsyncContext, Handler, Message, SpawnHandle, StreamHandler,
9};
10use actix_web_actors::ws;
11use models::exercise_task_gradings::ExerciseTaskGradingResult;
12use once_cell::sync::OnceCell;
13use std::{
14 collections::HashMap,
15 sync::RwLock,
16 time::{Duration, Instant},
17};
18use utoipa::OpenApi;
19
20const PING_INTERVAL: Duration = Duration::from_secs(10);
22const CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
24
25static WS_CONNECTIONS: WsConnections = WsConnections::new();
27
28#[derive(OpenApi)]
29#[openapi(paths(websocket, receive_grading))]
30pub(crate) struct MainFrontendPlaygroundViewsApiDoc;
31
32struct WsConnections(OnceCell<RwLock<HashMap<Uuid, Addr<ClientConnection>>>>);
34
35impl WsConnections {
36 const fn new() -> Self {
37 Self(OnceCell::new())
38 }
39
40 fn get(&self, id: Uuid) -> Option<Addr<ClientConnection>> {
41 let lock = self
42 .0
43 .get_or_init(Default::default)
44 .read()
45 .unwrap_or_else(|poisoned| poisoned.into_inner());
46 lock.get(&id).cloned()
47 }
48
49 fn register(&self, id: Uuid, addr: Addr<ClientConnection>) {
50 let mut lock = self
51 .0
52 .get_or_init(Default::default)
53 .write()
54 .unwrap_or_else(|poisoned| poisoned.into_inner());
55 lock.insert(id, addr);
56 }
57
58 fn unregister(&self, id: Uuid) {
59 if let Some(connections) = self.0.get() {
60 let mut lock = connections
61 .write()
62 .unwrap_or_else(|poisoned| poisoned.into_inner());
63 lock.remove(&id);
64 }
65 }
66}
67
68struct ClientConnection {
70 client_id: Uuid,
71 playground_grading_callback_claim: String,
72 last_pong: Instant,
73 ping_handle: Option<SpawnHandle>,
75}
76
77impl ClientConnection {
78 fn new(client_id: Uuid, playground_grading_callback_claim: String) -> Self {
79 Self {
80 client_id,
81 playground_grading_callback_claim,
82 last_pong: Instant::now(),
83 ping_handle: None,
84 }
85 }
86}
87
88impl Actor for ClientConnection {
89 type Context = ws::WebsocketContext<Self>;
90
91 fn started(&mut self, ctx: &mut Self::Context) {
92 let id = self.client_id;
93 let ping_handle = ctx.run_interval(PING_INTERVAL, move |socket, ctx| {
95 if socket.last_pong.elapsed() > CONNECTION_TIMEOUT {
96 WS_CONNECTIONS.unregister(id);
98 if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::TimedOut) {
99 ctx.text(text);
100 } else {
101 error!("Failed to serialize PlaygroundViewsMessage::TimedOut");
102 }
103 ctx.stop();
104 } else {
105 ctx.ping(b"ping");
106 }
107 });
108 WS_CONNECTIONS.register(id, ctx.address());
109 if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::Registered {
111 websocket_id: id,
112 playground_grading_callback_claim: self.playground_grading_callback_claim.clone(),
113 }) {
114 ctx.text(text);
115 } else {
116 error!("Failed to serialize PlaygroundViewsMessage::Registered");
117 }
118 self.ping_handle = Some(ping_handle);
119 }
120}
121
122impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for ClientConnection {
123 fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
124 match item {
126 Ok(ws::Message::Ping(_)) => {
127 ctx.pong(b"pong");
128 }
129 Ok(ws::Message::Pong(_)) => {
130 self.last_pong = Instant::now();
131 }
132 _ => {}
133 };
134 }
135}
136
137#[derive(Debug, Message)]
140#[rtype(result = "()")]
141struct PlaygroundSubmissionMessage {
142 grading_result: ExerciseTaskGradingResult,
143}
144
145impl Handler<PlaygroundSubmissionMessage> for ClientConnection {
146 type Result = ();
147
148 fn handle(
149 &mut self,
150 msg: PlaygroundSubmissionMessage,
151 ctx: &mut Self::Context,
152 ) -> Self::Result {
153 if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::ExerciseTaskGradingResult(
155 msg.grading_result,
156 )) {
157 ctx.text(text);
158 } else {
159 error!("Failed to serialize PlaygroundViewsMessage::ExerciseTaskGradingResult");
160 }
161 }
162}
163
164#[derive(Debug, Serialize, Message)]
166#[rtype(result = "()")]
167#[serde(tag = "tag", content = "data")]
168pub enum PlaygroundViewsMessage {
169 TimedOut,
171 Registered {
173 websocket_id: Uuid,
174 playground_grading_callback_claim: String,
175 },
176 ExerciseTaskGradingResult(ExerciseTaskGradingResult),
178}
179
180#[utoipa::path(
182 get,
183 path = "/ws",
184 operation_id = "getPlaygroundViewsWebsocket",
185 tag = "playground-views",
186 responses(
187 (status = 101, description = "WebSocket connection upgraded")
188 )
189)]
190async fn websocket(
191 req: HttpRequest,
192 stream: web::Payload,
193 jwt_key: web::Data<JwtKey>,
194) -> Result<HttpResponse, ControllerError> {
195 let client_id = Uuid::new_v4();
196 let playground_grading_callback_claim =
197 PlaygroundGradingCallbackClaim::expiring_in_1_day(client_id)
198 .sign(jwt_key.as_ref())
199 .map_err(|err| {
200 controller_err!(
201 InternalServerError,
202 "Failed to sign playground grading callback claim".to_string(),
203 err
204 )
205 })?;
206 ws::start(
208 ClientConnection::new(client_id, playground_grading_callback_claim),
209 &req,
210 stream,
211 )
212 .map_err(Into::into)
213}
214
215#[utoipa::path(
218 post,
219 path = "/grading/{websocket_id}",
220 operation_id = "receivePlaygroundGrading",
221 tag = "playground-views",
222 params(
223 ("websocket_id" = Uuid, Path, description = "Playground websocket id")
224 ),
225 request_body = ExerciseTaskGradingResult,
226 responses(
227 (status = 200, description = "Grading forwarded to websocket client")
228 )
229)]
230async fn receive_grading(
231 websocket_id: web::Path<Uuid>,
232 grading_result: web::Json<ExerciseTaskGradingResult>,
233 playground_grading_callback_claim: PlaygroundGradingCallbackClaim,
234) -> Result<HttpResponse, ControllerError> {
235 if playground_grading_callback_claim.websocket_id() != *websocket_id {
236 return Err(controller_err!(
237 BadRequest,
238 "Playground grading callback claim did not match websocket id".to_string()
239 ));
240 }
241 if let Some(conn) = WS_CONNECTIONS.get(*websocket_id) {
243 conn.do_send(PlaygroundSubmissionMessage {
244 grading_result: grading_result.into_inner(),
245 });
246 }
247 Ok(HttpResponse::Ok().finish())
248}
249
250pub fn _add_routes(cfg: &mut ServiceConfig) {
251 cfg.route("/ws", web::get().to(websocket))
252 .route("/grading/{websocket_id}", web::post().to(receive_grading));
253}