headless_lms_server/controllers/main_frontend/
playground_views.rs1use crate::prelude::*;
4use actix::{
5 Actor, ActorContext, Addr, AsyncContext, Handler, Message, SpawnHandle, StreamHandler,
6};
7use actix_web_actors::ws;
8use models::exercise_task_gradings::ExerciseTaskGradingResult;
9use once_cell::sync::OnceCell;
10use std::{
11 collections::HashMap,
12 sync::RwLock,
13 time::{Duration, Instant},
14};
15
16const PING_INTERVAL: Duration = Duration::from_secs(10);
18const CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
20
21static WS_CONNECTIONS: WsConnections = WsConnections::new();
23
24struct WsConnections(OnceCell<RwLock<HashMap<Uuid, Addr<ClientConnection>>>>);
26
27impl WsConnections {
28 const fn new() -> Self {
29 Self(OnceCell::new())
30 }
31
32 fn get(&self, id: Uuid) -> Option<Addr<ClientConnection>> {
33 let lock = self
34 .0
35 .get_or_init(Default::default)
36 .read()
37 .unwrap_or_else(|poisoned| poisoned.into_inner());
38 lock.get(&id).cloned()
39 }
40
41 fn register(&self, id: Uuid, addr: Addr<ClientConnection>) {
42 let mut lock = self
43 .0
44 .get_or_init(Default::default)
45 .write()
46 .unwrap_or_else(|poisoned| poisoned.into_inner());
47 lock.insert(id, addr);
48 }
49
50 fn unregister(&self, id: Uuid) {
51 if let Some(connections) = self.0.get() {
52 let mut lock = connections
53 .write()
54 .unwrap_or_else(|poisoned| poisoned.into_inner());
55 lock.remove(&id);
56 }
57 }
58}
59
60struct ClientConnection {
62 client_id: Uuid,
63 last_pong: Instant,
64 ping_handle: Option<SpawnHandle>,
66}
67
68impl ClientConnection {
69 fn new() -> Self {
70 Self {
71 client_id: Uuid::new_v4(),
72 last_pong: Instant::now(),
73 ping_handle: None,
74 }
75 }
76}
77
78impl Actor for ClientConnection {
79 type Context = ws::WebsocketContext<Self>;
80
81 fn started(&mut self, ctx: &mut Self::Context) {
82 let id = self.client_id;
83 let ping_handle = ctx.run_interval(PING_INTERVAL, move |socket, ctx| {
85 if socket.last_pong.elapsed() > CONNECTION_TIMEOUT {
86 WS_CONNECTIONS.unregister(id);
88 if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::TimedOut) {
89 ctx.text(text);
90 } else {
91 error!("Failed to serialize PlaygroundViewsMessage::TimedOut");
92 }
93 ctx.stop();
94 } else {
95 ctx.ping(b"ping");
96 }
97 });
98 WS_CONNECTIONS.register(id, ctx.address());
99 if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::Registered(id)) {
101 ctx.text(text);
102 } else {
103 error!("Failed to serialize PlaygroundViewsMessage::Registered");
104 }
105 self.ping_handle = Some(ping_handle);
106 }
107}
108
109impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for ClientConnection {
110 fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
111 match item {
113 Ok(ws::Message::Ping(_)) => {
114 ctx.pong(b"pong");
115 }
116 Ok(ws::Message::Pong(_)) => {
117 self.last_pong = Instant::now();
118 }
119 _ => {}
120 };
121 }
122}
123
124#[derive(Debug, Message)]
127#[rtype(result = "()")]
128struct PlaygroundSubmissionMessage {
129 grading_result: ExerciseTaskGradingResult,
130}
131
132impl Handler<PlaygroundSubmissionMessage> for ClientConnection {
133 type Result = ();
134
135 fn handle(
136 &mut self,
137 msg: PlaygroundSubmissionMessage,
138 ctx: &mut Self::Context,
139 ) -> Self::Result {
140 if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::ExerciseTaskGradingResult(
142 msg.grading_result,
143 )) {
144 ctx.text(text);
145 } else {
146 error!("Failed to serialize PlaygroundViewsMessage::ExerciseTaskGradingResult");
147 }
148 }
149}
150
151#[derive(Debug, Serialize, Message)]
153#[rtype(result = "()")]
154#[cfg_attr(feature = "ts_rs", derive(TS))]
155#[serde(tag = "tag", content = "data")]
156pub enum PlaygroundViewsMessage {
157 TimedOut,
159 Registered(Uuid),
161 ExerciseTaskGradingResult(ExerciseTaskGradingResult),
163}
164
165async fn websocket(
167 req: HttpRequest,
168 stream: web::Payload,
169) -> Result<HttpResponse, ControllerError> {
170 ws::start(ClientConnection::new(), &req, stream).map_err(Into::into)
172}
173
174async fn receive_grading(
177 websocket_id: web::Path<Uuid>,
178 grading_result: web::Json<ExerciseTaskGradingResult>,
179) -> Result<HttpResponse, ControllerError> {
180 if let Some(conn) = WS_CONNECTIONS.get(*websocket_id) {
182 conn.do_send(PlaygroundSubmissionMessage {
183 grading_result: grading_result.into_inner(),
184 });
185 }
186 Ok(HttpResponse::Ok().finish())
187}
188
189pub fn _add_routes(cfg: &mut ServiceConfig) {
190 cfg.route("/ws", web::get().to(websocket))
191 .route("/grading/{websocket_id}", web::post().to(receive_grading));
192}