headless_lms_server/controllers/main_frontend/
playground_views.rs

1//! Handles playground-views-related functionality, in particular the websocket connections used to update the grading for services like tmc.
2
3use crate::prelude::*;
4use actix::{
5    Actor, ActorContext, Addr, AsyncContext, Handler, Message, SpawnHandle, StreamHandler,
6};
7use actix_web_actors::ws;
8use models::exercise_task_gradings::ExerciseTaskGradingResult;
9use once_cell::sync::OnceCell;
10use std::{
11    collections::HashMap,
12    sync::RwLock,
13    time::{Duration, Instant},
14};
15use utoipa::OpenApi;
16
17// the clients are pinged, to which they are supposed to respond with pongs, and...
18const PING_INTERVAL: Duration = Duration::from_secs(10);
19// ..if we get no pongs for this duration, we'll drop the connection
20const CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
21
22// stores all the ws connections so that they can be fetched by the handler that receives updated gradings
23static WS_CONNECTIONS: WsConnections = WsConnections::new();
24
25#[derive(OpenApi)]
26#[openapi(paths(websocket, receive_grading))]
27pub(crate) struct MainFrontendPlaygroundViewsApiDoc;
28
29// a simple RwLock should be fine since we're not expecting a large amount of ws connections for this page
30struct WsConnections(OnceCell<RwLock<HashMap<Uuid, Addr<ClientConnection>>>>);
31
32impl WsConnections {
33    const fn new() -> Self {
34        Self(OnceCell::new())
35    }
36
37    fn get(&self, id: Uuid) -> Option<Addr<ClientConnection>> {
38        let lock = self
39            .0
40            .get_or_init(Default::default)
41            .read()
42            .unwrap_or_else(|poisoned| poisoned.into_inner());
43        lock.get(&id).cloned()
44    }
45
46    fn register(&self, id: Uuid, addr: Addr<ClientConnection>) {
47        let mut lock = self
48            .0
49            .get_or_init(Default::default)
50            .write()
51            .unwrap_or_else(|poisoned| poisoned.into_inner());
52        lock.insert(id, addr);
53    }
54
55    fn unregister(&self, id: Uuid) {
56        if let Some(connections) = self.0.get() {
57            let mut lock = connections
58                .write()
59                .unwrap_or_else(|poisoned| poisoned.into_inner());
60            lock.remove(&id);
61        }
62    }
63}
64
65// models a single client's websocket connection
66struct ClientConnection {
67    client_id: Uuid,
68    last_pong: Instant,
69    // stores the task that continuously pings the client
70    ping_handle: Option<SpawnHandle>,
71}
72
73impl ClientConnection {
74    fn new() -> Self {
75        Self {
76            client_id: Uuid::new_v4(),
77            last_pong: Instant::now(),
78            ping_handle: None,
79        }
80    }
81}
82
83impl Actor for ClientConnection {
84    type Context = ws::WebsocketContext<Self>;
85
86    fn started(&mut self, ctx: &mut Self::Context) {
87        let id = self.client_id;
88        // start the task that continuously pings the client
89        let ping_handle = ctx.run_interval(PING_INTERVAL, move |socket, ctx| {
90            if socket.last_pong.elapsed() > CONNECTION_TIMEOUT {
91                // timed out
92                WS_CONNECTIONS.unregister(id);
93                if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::TimedOut) {
94                    ctx.text(text);
95                } else {
96                    error!("Failed to serialize PlaygroundViewsMessage::TimedOut");
97                }
98                ctx.stop();
99            } else {
100                ctx.ping(b"ping");
101            }
102        });
103        WS_CONNECTIONS.register(id, ctx.address());
104        // inform the client of their id
105        if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::Registered(id)) {
106            ctx.text(text);
107        } else {
108            error!("Failed to serialize PlaygroundViewsMessage::Registered");
109        }
110        self.ping_handle = Some(ping_handle);
111    }
112}
113
114impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for ClientConnection {
115    fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
116        // currently, no messages are expected from the frontend other than pings
117        match item {
118            Ok(ws::Message::Ping(_)) => {
119                ctx.pong(b"pong");
120            }
121            Ok(ws::Message::Pong(_)) => {
122                self.last_pong = Instant::now();
123            }
124            _ => {}
125        };
126    }
127}
128
129// this is a little awkward, but this is a message sent from the function that handles incoming grading updates
130// to the ClientConnection actor, so that it can then pass the message on to the client
131#[derive(Debug, Message)]
132#[rtype(result = "()")]
133struct PlaygroundSubmissionMessage {
134    grading_result: ExerciseTaskGradingResult,
135}
136
137impl Handler<PlaygroundSubmissionMessage> for ClientConnection {
138    type Result = ();
139
140    fn handle(
141        &mut self,
142        msg: PlaygroundSubmissionMessage,
143        ctx: &mut Self::Context,
144    ) -> Self::Result {
145        // pass on the message to the client
146        if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::ExerciseTaskGradingResult(
147            msg.grading_result,
148        )) {
149            ctx.text(text);
150        } else {
151            error!("Failed to serialize PlaygroundViewsMessage::ExerciseTaskGradingResult");
152        }
153    }
154}
155
156/// The message type for all messages sent from the server to the client from the playgrounds-views websocket connection.
157#[derive(Debug, Serialize, Message)]
158#[rtype(result = "()")]
159#[serde(tag = "tag", content = "data")]
160pub enum PlaygroundViewsMessage {
161    /// Server did not receive a pong for a certain period so the connection timed out.
162    TimedOut,
163    /// Server accepted a new websocket connection and is informing the new client of their connection id.
164    Registered(Uuid),
165    /// Server received an updated grading from an exercise service and is passing it on to the client.
166    ExerciseTaskGradingResult(ExerciseTaskGradingResult),
167}
168
169/// Starts a new websocket connection.
170#[utoipa::path(
171    get,
172    path = "/ws",
173    operation_id = "getPlaygroundViewsWebsocket",
174    tag = "playground-views",
175    responses(
176        (status = 101, description = "WebSocket connection upgraded")
177    )
178)]
179async fn websocket(
180    req: HttpRequest,
181    stream: web::Payload,
182) -> Result<HttpResponse, ControllerError> {
183    // start websocket connection
184    ws::start(ClientConnection::new(), &req, stream).map_err(Into::into)
185}
186
187/// playground-views passes a URL pointing to this route to an exercise service when sending submissions so that if
188/// the service has an update for a pending grading, it will be sent here and passed on to through the websocket
189#[utoipa::path(
190    post,
191    path = "/grading/{websocket_id}",
192    operation_id = "receivePlaygroundGrading",
193    tag = "playground-views",
194    params(
195        ("websocket_id" = Uuid, Path, description = "Playground websocket id")
196    ),
197    request_body = ExerciseTaskGradingResult,
198    responses(
199        (status = 200, description = "Grading forwarded to websocket client")
200    )
201)]
202async fn receive_grading(
203    websocket_id: web::Path<Uuid>,
204    grading_result: web::Json<ExerciseTaskGradingResult>,
205) -> Result<HttpResponse, ControllerError> {
206    // send grading result to the corresponding websocket connection
207    if let Some(conn) = WS_CONNECTIONS.get(*websocket_id) {
208        conn.do_send(PlaygroundSubmissionMessage {
209            grading_result: grading_result.into_inner(),
210        });
211    }
212    Ok(HttpResponse::Ok().finish())
213}
214
215pub fn _add_routes(cfg: &mut ServiceConfig) {
216    cfg.route("/ws", web::get().to(websocket))
217        .route("/grading/{websocket_id}", web::post().to(receive_grading));
218}