headless_lms_server/controllers/main_frontend/
playground_views.rs

1//! Handles playground-views-related functionality, in particular the websocket connections used to update the grading for services like tmc.
2
3use crate::prelude::*;
4use actix::{
5    Actor, ActorContext, Addr, AsyncContext, Handler, Message, SpawnHandle, StreamHandler,
6};
7use actix_web_actors::ws;
8use models::exercise_task_gradings::ExerciseTaskGradingResult;
9use once_cell::sync::OnceCell;
10use std::{
11    collections::HashMap,
12    sync::RwLock,
13    time::{Duration, Instant},
14};
15
16// the clients are pinged, to which they are supposed to respond with pongs, and...
17const PING_INTERVAL: Duration = Duration::from_secs(10);
18// ..if we get no pongs for this duration, we'll drop the connection
19const CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
20
21// stores all the ws connections so that they can be fetched by the handler that receives updated gradings
22static WS_CONNECTIONS: WsConnections = WsConnections::new();
23
24// a simple RwLock should be fine since we're not expecting a large amount of ws connections for this page
25struct WsConnections(OnceCell<RwLock<HashMap<Uuid, Addr<ClientConnection>>>>);
26
27impl WsConnections {
28    const fn new() -> Self {
29        Self(OnceCell::new())
30    }
31
32    fn get(&self, id: Uuid) -> Option<Addr<ClientConnection>> {
33        let lock = self
34            .0
35            .get_or_init(Default::default)
36            .read()
37            .expect("should never panic with the lock");
38        lock.get(&id).cloned()
39    }
40
41    fn register(&self, id: Uuid, addr: Addr<ClientConnection>) {
42        let mut lock = self
43            .0
44            .get_or_init(Default::default)
45            .write()
46            .expect("should never panic with the lock");
47        lock.insert(id, addr);
48    }
49
50    fn unregister(&self, id: Uuid) {
51        if let Some(connections) = self.0.get() {
52            let mut lock = connections
53                .write()
54                .expect("should never panic with the lock");
55            lock.remove(&id);
56        }
57    }
58}
59
60// models a single client's websocket connection
61struct ClientConnection {
62    client_id: Uuid,
63    last_pong: Instant,
64    // stores the task that continuously pings the client
65    ping_handle: Option<SpawnHandle>,
66}
67
68impl ClientConnection {
69    fn new() -> Self {
70        Self {
71            client_id: Uuid::new_v4(),
72            last_pong: Instant::now(),
73            ping_handle: None,
74        }
75    }
76}
77
78impl Actor for ClientConnection {
79    type Context = ws::WebsocketContext<Self>;
80
81    fn started(&mut self, ctx: &mut Self::Context) {
82        let id = self.client_id;
83        // start the task that continuously pings the client
84        let ping_handle = ctx.run_interval(PING_INTERVAL, move |socket, ctx| {
85            if socket.last_pong.elapsed() > CONNECTION_TIMEOUT {
86                // timed out
87                WS_CONNECTIONS.unregister(id);
88                ctx.text(
89                    serde_json::to_string(&PlaygroundViewsMessage::TimedOut)
90                        .expect("should never panic"),
91                );
92                ctx.stop();
93            } else {
94                ctx.ping(b"ping");
95            }
96        });
97        WS_CONNECTIONS.register(id, ctx.address());
98        // inform the client of their id
99        ctx.text(
100            serde_json::to_string(&PlaygroundViewsMessage::Registered(id))
101                .expect("should never fail"),
102        );
103        self.ping_handle = Some(ping_handle);
104    }
105}
106
107impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for ClientConnection {
108    fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
109        // currently, no messages are expected from the frontend other than pings
110        match item {
111            Ok(ws::Message::Ping(_)) => {
112                ctx.pong(b"pong");
113            }
114            Ok(ws::Message::Pong(_)) => {
115                self.last_pong = Instant::now();
116            }
117            _ => {}
118        };
119    }
120}
121
122// this is a little awkward, but this is a message sent from the function that handles incoming grading updates
123// to the ClientConnection actor, so that it can then pass the message on to the client
124#[derive(Debug, Message)]
125#[rtype(result = "()")]
126struct PlaygroundSubmissionMessage {
127    grading_result: ExerciseTaskGradingResult,
128}
129
130impl Handler<PlaygroundSubmissionMessage> for ClientConnection {
131    type Result = ();
132
133    fn handle(
134        &mut self,
135        msg: PlaygroundSubmissionMessage,
136        ctx: &mut Self::Context,
137    ) -> Self::Result {
138        // pass on the message to the client
139        ctx.text(
140            serde_json::to_string(&PlaygroundViewsMessage::ExerciseTaskGradingResult(
141                msg.grading_result,
142            ))
143            .expect("should never fail"),
144        );
145    }
146}
147
148/// The message type for all messages sent from the server to the client from the playgrounds-views websocket connection.
149#[derive(Debug, Serialize, Message)]
150#[rtype(result = "()")]
151#[cfg_attr(feature = "ts_rs", derive(TS))]
152#[serde(tag = "tag", content = "data")]
153pub enum PlaygroundViewsMessage {
154    /// Server did not receive a pong for a certain period so the connection timed out.
155    TimedOut,
156    /// Server accepted a new websocket connection and is informing the new client of their connection id.
157    Registered(Uuid),
158    /// Server received an updated grading from an exercise service and is passing it on to the client.
159    ExerciseTaskGradingResult(ExerciseTaskGradingResult),
160}
161
162/// Starts a new websocket connection.
163async fn websocket(
164    req: HttpRequest,
165    stream: web::Payload,
166) -> Result<HttpResponse, ControllerError> {
167    // start websocket connection
168    ws::start(ClientConnection::new(), &req, stream).map_err(Into::into)
169}
170
171/// playground-views passes a URL pointing to this route to an exercise service when sending submissions so that if
172/// the service has an update for a pending grading, it will be sent here and passed on to through the websocket
173async fn receive_grading(
174    websocket_id: web::Path<Uuid>,
175    grading_result: web::Json<ExerciseTaskGradingResult>,
176) -> Result<HttpResponse, ControllerError> {
177    // send grading result to the corresponding websocket connection
178    if let Some(conn) = WS_CONNECTIONS.get(*websocket_id) {
179        conn.do_send(PlaygroundSubmissionMessage {
180            grading_result: grading_result.into_inner(),
181        });
182    }
183    Ok(HttpResponse::Ok().finish())
184}
185
186pub fn _add_routes(cfg: &mut ServiceConfig) {
187    cfg.route("/ws", web::get().to(websocket))
188        .route("/grading/{websocket_id}", web::post().to(receive_grading));
189}