headless_lms_server/controllers/main_frontend/
playground_views.rs

1//! Handles playground-views-related functionality, in particular the websocket connections used to update the grading for services like tmc.
2
3use crate::prelude::*;
4use actix::{
5    Actor, ActorContext, Addr, AsyncContext, Handler, Message, SpawnHandle, StreamHandler,
6};
7use actix_web_actors::ws;
8use models::exercise_task_gradings::ExerciseTaskGradingResult;
9use once_cell::sync::OnceCell;
10use std::{
11    collections::HashMap,
12    sync::RwLock,
13    time::{Duration, Instant},
14};
15
16// the clients are pinged, to which they are supposed to respond with pongs, and...
17const PING_INTERVAL: Duration = Duration::from_secs(10);
18// ..if we get no pongs for this duration, we'll drop the connection
19const CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
20
21// stores all the ws connections so that they can be fetched by the handler that receives updated gradings
22static WS_CONNECTIONS: WsConnections = WsConnections::new();
23
24// a simple RwLock should be fine since we're not expecting a large amount of ws connections for this page
25struct WsConnections(OnceCell<RwLock<HashMap<Uuid, Addr<ClientConnection>>>>);
26
27impl WsConnections {
28    const fn new() -> Self {
29        Self(OnceCell::new())
30    }
31
32    fn get(&self, id: Uuid) -> Option<Addr<ClientConnection>> {
33        let lock = self
34            .0
35            .get_or_init(Default::default)
36            .read()
37            .unwrap_or_else(|poisoned| poisoned.into_inner());
38        lock.get(&id).cloned()
39    }
40
41    fn register(&self, id: Uuid, addr: Addr<ClientConnection>) {
42        let mut lock = self
43            .0
44            .get_or_init(Default::default)
45            .write()
46            .unwrap_or_else(|poisoned| poisoned.into_inner());
47        lock.insert(id, addr);
48    }
49
50    fn unregister(&self, id: Uuid) {
51        if let Some(connections) = self.0.get() {
52            let mut lock = connections
53                .write()
54                .unwrap_or_else(|poisoned| poisoned.into_inner());
55            lock.remove(&id);
56        }
57    }
58}
59
60// models a single client's websocket connection
61struct ClientConnection {
62    client_id: Uuid,
63    last_pong: Instant,
64    // stores the task that continuously pings the client
65    ping_handle: Option<SpawnHandle>,
66}
67
68impl ClientConnection {
69    fn new() -> Self {
70        Self {
71            client_id: Uuid::new_v4(),
72            last_pong: Instant::now(),
73            ping_handle: None,
74        }
75    }
76}
77
78impl Actor for ClientConnection {
79    type Context = ws::WebsocketContext<Self>;
80
81    fn started(&mut self, ctx: &mut Self::Context) {
82        let id = self.client_id;
83        // start the task that continuously pings the client
84        let ping_handle = ctx.run_interval(PING_INTERVAL, move |socket, ctx| {
85            if socket.last_pong.elapsed() > CONNECTION_TIMEOUT {
86                // timed out
87                WS_CONNECTIONS.unregister(id);
88                if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::TimedOut) {
89                    ctx.text(text);
90                } else {
91                    error!("Failed to serialize PlaygroundViewsMessage::TimedOut");
92                }
93                ctx.stop();
94            } else {
95                ctx.ping(b"ping");
96            }
97        });
98        WS_CONNECTIONS.register(id, ctx.address());
99        // inform the client of their id
100        if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::Registered(id)) {
101            ctx.text(text);
102        } else {
103            error!("Failed to serialize PlaygroundViewsMessage::Registered");
104        }
105        self.ping_handle = Some(ping_handle);
106    }
107}
108
109impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for ClientConnection {
110    fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
111        // currently, no messages are expected from the frontend other than pings
112        match item {
113            Ok(ws::Message::Ping(_)) => {
114                ctx.pong(b"pong");
115            }
116            Ok(ws::Message::Pong(_)) => {
117                self.last_pong = Instant::now();
118            }
119            _ => {}
120        };
121    }
122}
123
124// this is a little awkward, but this is a message sent from the function that handles incoming grading updates
125// to the ClientConnection actor, so that it can then pass the message on to the client
126#[derive(Debug, Message)]
127#[rtype(result = "()")]
128struct PlaygroundSubmissionMessage {
129    grading_result: ExerciseTaskGradingResult,
130}
131
132impl Handler<PlaygroundSubmissionMessage> for ClientConnection {
133    type Result = ();
134
135    fn handle(
136        &mut self,
137        msg: PlaygroundSubmissionMessage,
138        ctx: &mut Self::Context,
139    ) -> Self::Result {
140        // pass on the message to the client
141        if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::ExerciseTaskGradingResult(
142            msg.grading_result,
143        )) {
144            ctx.text(text);
145        } else {
146            error!("Failed to serialize PlaygroundViewsMessage::ExerciseTaskGradingResult");
147        }
148    }
149}
150
151/// The message type for all messages sent from the server to the client from the playgrounds-views websocket connection.
152#[derive(Debug, Serialize, Message)]
153#[rtype(result = "()")]
154#[cfg_attr(feature = "ts_rs", derive(TS))]
155#[serde(tag = "tag", content = "data")]
156pub enum PlaygroundViewsMessage {
157    /// Server did not receive a pong for a certain period so the connection timed out.
158    TimedOut,
159    /// Server accepted a new websocket connection and is informing the new client of their connection id.
160    Registered(Uuid),
161    /// Server received an updated grading from an exercise service and is passing it on to the client.
162    ExerciseTaskGradingResult(ExerciseTaskGradingResult),
163}
164
165/// Starts a new websocket connection.
166async fn websocket(
167    req: HttpRequest,
168    stream: web::Payload,
169) -> Result<HttpResponse, ControllerError> {
170    // start websocket connection
171    ws::start(ClientConnection::new(), &req, stream).map_err(Into::into)
172}
173
174/// playground-views passes a URL pointing to this route to an exercise service when sending submissions so that if
175/// the service has an update for a pending grading, it will be sent here and passed on to through the websocket
176async fn receive_grading(
177    websocket_id: web::Path<Uuid>,
178    grading_result: web::Json<ExerciseTaskGradingResult>,
179) -> Result<HttpResponse, ControllerError> {
180    // send grading result to the corresponding websocket connection
181    if let Some(conn) = WS_CONNECTIONS.get(*websocket_id) {
182        conn.do_send(PlaygroundSubmissionMessage {
183            grading_result: grading_result.into_inner(),
184        });
185    }
186    Ok(HttpResponse::Ok().finish())
187}
188
189pub fn _add_routes(cfg: &mut ServiceConfig) {
190    cfg.route("/ws", web::get().to(websocket))
191        .route("/grading/{websocket_id}", web::post().to(receive_grading));
192}