headless_lms_server/controllers/main_frontend/
playground_views.rs1use crate::prelude::*;
4use actix::{
5 Actor, ActorContext, Addr, AsyncContext, Handler, Message, SpawnHandle, StreamHandler,
6};
7use actix_web_actors::ws;
8use models::exercise_task_gradings::ExerciseTaskGradingResult;
9use once_cell::sync::OnceCell;
10use std::{
11 collections::HashMap,
12 sync::RwLock,
13 time::{Duration, Instant},
14};
15
16const PING_INTERVAL: Duration = Duration::from_secs(10);
18const CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
20
21static WS_CONNECTIONS: WsConnections = WsConnections::new();
23
24struct WsConnections(OnceCell<RwLock<HashMap<Uuid, Addr<ClientConnection>>>>);
26
27impl WsConnections {
28 const fn new() -> Self {
29 Self(OnceCell::new())
30 }
31
32 fn get(&self, id: Uuid) -> Option<Addr<ClientConnection>> {
33 let lock = self
34 .0
35 .get_or_init(Default::default)
36 .read()
37 .expect("should never panic with the lock");
38 lock.get(&id).cloned()
39 }
40
41 fn register(&self, id: Uuid, addr: Addr<ClientConnection>) {
42 let mut lock = self
43 .0
44 .get_or_init(Default::default)
45 .write()
46 .expect("should never panic with the lock");
47 lock.insert(id, addr);
48 }
49
50 fn unregister(&self, id: Uuid) {
51 if let Some(connections) = self.0.get() {
52 let mut lock = connections
53 .write()
54 .expect("should never panic with the lock");
55 lock.remove(&id);
56 }
57 }
58}
59
60struct ClientConnection {
62 client_id: Uuid,
63 last_pong: Instant,
64 ping_handle: Option<SpawnHandle>,
66}
67
68impl ClientConnection {
69 fn new() -> Self {
70 Self {
71 client_id: Uuid::new_v4(),
72 last_pong: Instant::now(),
73 ping_handle: None,
74 }
75 }
76}
77
78impl Actor for ClientConnection {
79 type Context = ws::WebsocketContext<Self>;
80
81 fn started(&mut self, ctx: &mut Self::Context) {
82 let id = self.client_id;
83 let ping_handle = ctx.run_interval(PING_INTERVAL, move |socket, ctx| {
85 if socket.last_pong.elapsed() > CONNECTION_TIMEOUT {
86 WS_CONNECTIONS.unregister(id);
88 ctx.text(
89 serde_json::to_string(&PlaygroundViewsMessage::TimedOut)
90 .expect("should never panic"),
91 );
92 ctx.stop();
93 } else {
94 ctx.ping(b"ping");
95 }
96 });
97 WS_CONNECTIONS.register(id, ctx.address());
98 ctx.text(
100 serde_json::to_string(&PlaygroundViewsMessage::Registered(id))
101 .expect("should never fail"),
102 );
103 self.ping_handle = Some(ping_handle);
104 }
105}
106
107impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for ClientConnection {
108 fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
109 match item {
111 Ok(ws::Message::Ping(_)) => {
112 ctx.pong(b"pong");
113 }
114 Ok(ws::Message::Pong(_)) => {
115 self.last_pong = Instant::now();
116 }
117 _ => {}
118 };
119 }
120}
121
122#[derive(Debug, Message)]
125#[rtype(result = "()")]
126struct PlaygroundSubmissionMessage {
127 grading_result: ExerciseTaskGradingResult,
128}
129
130impl Handler<PlaygroundSubmissionMessage> for ClientConnection {
131 type Result = ();
132
133 fn handle(
134 &mut self,
135 msg: PlaygroundSubmissionMessage,
136 ctx: &mut Self::Context,
137 ) -> Self::Result {
138 ctx.text(
140 serde_json::to_string(&PlaygroundViewsMessage::ExerciseTaskGradingResult(
141 msg.grading_result,
142 ))
143 .expect("should never fail"),
144 );
145 }
146}
147
148#[derive(Debug, Serialize, Message)]
150#[rtype(result = "()")]
151#[cfg_attr(feature = "ts_rs", derive(TS))]
152#[serde(tag = "tag", content = "data")]
153pub enum PlaygroundViewsMessage {
154 TimedOut,
156 Registered(Uuid),
158 ExerciseTaskGradingResult(ExerciseTaskGradingResult),
160}
161
162async fn websocket(
164 req: HttpRequest,
165 stream: web::Payload,
166) -> Result<HttpResponse, ControllerError> {
167 ws::start(ClientConnection::new(), &req, stream).map_err(Into::into)
169}
170
171async fn receive_grading(
174 websocket_id: web::Path<Uuid>,
175 grading_result: web::Json<ExerciseTaskGradingResult>,
176) -> Result<HttpResponse, ControllerError> {
177 if let Some(conn) = WS_CONNECTIONS.get(*websocket_id) {
179 conn.do_send(PlaygroundSubmissionMessage {
180 grading_result: grading_result.into_inner(),
181 });
182 }
183 Ok(HttpResponse::Ok().finish())
184}
185
186pub fn _add_routes(cfg: &mut ServiceConfig) {
187 cfg.route("/ws", web::get().to(websocket))
188 .route("/grading/{websocket_id}", web::post().to(receive_grading));
189}