Skip to main content

headless_lms_server/controllers/main_frontend/
playground_views.rs

1//! Handles playground-views-related functionality, in particular the websocket connections used to update the grading for services like tmc.
2
3use crate::{
4    domain::models_requests::{JwtKey, PlaygroundGradingCallbackClaim},
5    prelude::*,
6};
7use actix::{
8    Actor, ActorContext, Addr, AsyncContext, Handler, Message, SpawnHandle, StreamHandler,
9};
10use actix_web_actors::ws;
11use models::exercise_task_gradings::ExerciseTaskGradingResult;
12use once_cell::sync::OnceCell;
13use std::{
14    collections::HashMap,
15    sync::RwLock,
16    time::{Duration, Instant},
17};
18use utoipa::OpenApi;
19
20// the clients are pinged, to which they are supposed to respond with pongs, and...
21const PING_INTERVAL: Duration = Duration::from_secs(10);
22// ..if we get no pongs for this duration, we'll drop the connection
23const CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
24
25// stores all the ws connections so that they can be fetched by the handler that receives updated gradings
26static WS_CONNECTIONS: WsConnections = WsConnections::new();
27
28#[derive(OpenApi)]
29#[openapi(paths(websocket, receive_grading))]
30pub(crate) struct MainFrontendPlaygroundViewsApiDoc;
31
32// a simple RwLock should be fine since we're not expecting a large amount of ws connections for this page
33struct WsConnections(OnceCell<RwLock<HashMap<Uuid, Addr<ClientConnection>>>>);
34
35impl WsConnections {
36    const fn new() -> Self {
37        Self(OnceCell::new())
38    }
39
40    fn get(&self, id: Uuid) -> Option<Addr<ClientConnection>> {
41        let lock = self
42            .0
43            .get_or_init(Default::default)
44            .read()
45            .unwrap_or_else(|poisoned| poisoned.into_inner());
46        lock.get(&id).cloned()
47    }
48
49    fn register(&self, id: Uuid, addr: Addr<ClientConnection>) {
50        let mut lock = self
51            .0
52            .get_or_init(Default::default)
53            .write()
54            .unwrap_or_else(|poisoned| poisoned.into_inner());
55        lock.insert(id, addr);
56    }
57
58    fn unregister(&self, id: Uuid) {
59        if let Some(connections) = self.0.get() {
60            let mut lock = connections
61                .write()
62                .unwrap_or_else(|poisoned| poisoned.into_inner());
63            lock.remove(&id);
64        }
65    }
66}
67
68// models a single client's websocket connection
69struct ClientConnection {
70    client_id: Uuid,
71    playground_grading_callback_claim: String,
72    last_pong: Instant,
73    // stores the task that continuously pings the client
74    ping_handle: Option<SpawnHandle>,
75}
76
77impl ClientConnection {
78    fn new(client_id: Uuid, playground_grading_callback_claim: String) -> Self {
79        Self {
80            client_id,
81            playground_grading_callback_claim,
82            last_pong: Instant::now(),
83            ping_handle: None,
84        }
85    }
86}
87
88impl Actor for ClientConnection {
89    type Context = ws::WebsocketContext<Self>;
90
91    fn started(&mut self, ctx: &mut Self::Context) {
92        let id = self.client_id;
93        // start the task that continuously pings the client
94        let ping_handle = ctx.run_interval(PING_INTERVAL, move |socket, ctx| {
95            if socket.last_pong.elapsed() > CONNECTION_TIMEOUT {
96                // timed out
97                WS_CONNECTIONS.unregister(id);
98                if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::TimedOut) {
99                    ctx.text(text);
100                } else {
101                    error!("Failed to serialize PlaygroundViewsMessage::TimedOut");
102                }
103                ctx.stop();
104            } else {
105                ctx.ping(b"ping");
106            }
107        });
108        WS_CONNECTIONS.register(id, ctx.address());
109        // inform the client of their id
110        if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::Registered {
111            websocket_id: id,
112            playground_grading_callback_claim: self.playground_grading_callback_claim.clone(),
113        }) {
114            ctx.text(text);
115        } else {
116            error!("Failed to serialize PlaygroundViewsMessage::Registered");
117        }
118        self.ping_handle = Some(ping_handle);
119    }
120}
121
122impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for ClientConnection {
123    fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
124        // currently, no messages are expected from the frontend other than pings
125        match item {
126            Ok(ws::Message::Ping(_)) => {
127                ctx.pong(b"pong");
128            }
129            Ok(ws::Message::Pong(_)) => {
130                self.last_pong = Instant::now();
131            }
132            _ => {}
133        };
134    }
135}
136
137// this is a little awkward, but this is a message sent from the function that handles incoming grading updates
138// to the ClientConnection actor, so that it can then pass the message on to the client
139#[derive(Debug, Message)]
140#[rtype(result = "()")]
141struct PlaygroundSubmissionMessage {
142    grading_result: ExerciseTaskGradingResult,
143}
144
145impl Handler<PlaygroundSubmissionMessage> for ClientConnection {
146    type Result = ();
147
148    fn handle(
149        &mut self,
150        msg: PlaygroundSubmissionMessage,
151        ctx: &mut Self::Context,
152    ) -> Self::Result {
153        // pass on the message to the client
154        if let Ok(text) = serde_json::to_string(&PlaygroundViewsMessage::ExerciseTaskGradingResult(
155            msg.grading_result,
156        )) {
157            ctx.text(text);
158        } else {
159            error!("Failed to serialize PlaygroundViewsMessage::ExerciseTaskGradingResult");
160        }
161    }
162}
163
164/// The message type for all messages sent from the server to the client from the playgrounds-views websocket connection.
165#[derive(Debug, Serialize, Message)]
166#[rtype(result = "()")]
167#[serde(tag = "tag", content = "data")]
168pub enum PlaygroundViewsMessage {
169    /// Server did not receive a pong for a certain period so the connection timed out.
170    TimedOut,
171    /// Server accepted a new websocket connection and is informing the new client of their connection id.
172    Registered {
173        websocket_id: Uuid,
174        playground_grading_callback_claim: String,
175    },
176    /// Server received an updated grading from an exercise service and is passing it on to the client.
177    ExerciseTaskGradingResult(ExerciseTaskGradingResult),
178}
179
180/// Starts a new websocket connection.
181#[utoipa::path(
182    get,
183    path = "/ws",
184    operation_id = "getPlaygroundViewsWebsocket",
185    tag = "playground-views",
186    responses(
187        (status = 101, description = "WebSocket connection upgraded")
188    )
189)]
190async fn websocket(
191    req: HttpRequest,
192    stream: web::Payload,
193    jwt_key: web::Data<JwtKey>,
194) -> Result<HttpResponse, ControllerError> {
195    let client_id = Uuid::new_v4();
196    let playground_grading_callback_claim =
197        PlaygroundGradingCallbackClaim::expiring_in_1_day(client_id)
198            .sign(jwt_key.as_ref())
199            .map_err(|err| {
200                controller_err!(
201                    InternalServerError,
202                    "Failed to sign playground grading callback claim".to_string(),
203                    err
204                )
205            })?;
206    // start websocket connection
207    ws::start(
208        ClientConnection::new(client_id, playground_grading_callback_claim),
209        &req,
210        stream,
211    )
212    .map_err(Into::into)
213}
214
215/// playground-views passes a URL pointing to this route to an exercise service when sending submissions so that if
216/// the service has an update for a pending grading, it will be sent here and passed on to through the websocket
217#[utoipa::path(
218    post,
219    path = "/grading/{websocket_id}",
220    operation_id = "receivePlaygroundGrading",
221    tag = "playground-views",
222    params(
223        ("websocket_id" = Uuid, Path, description = "Playground websocket id")
224    ),
225    request_body = ExerciseTaskGradingResult,
226    responses(
227        (status = 200, description = "Grading forwarded to websocket client")
228    )
229)]
230async fn receive_grading(
231    websocket_id: web::Path<Uuid>,
232    grading_result: web::Json<ExerciseTaskGradingResult>,
233    playground_grading_callback_claim: PlaygroundGradingCallbackClaim,
234) -> Result<HttpResponse, ControllerError> {
235    if playground_grading_callback_claim.websocket_id() != *websocket_id {
236        return Err(controller_err!(
237            BadRequest,
238            "Playground grading callback claim did not match websocket id".to_string()
239        ));
240    }
241    // send grading result to the corresponding websocket connection
242    if let Some(conn) = WS_CONNECTIONS.get(*websocket_id) {
243        conn.do_send(PlaygroundSubmissionMessage {
244            grading_result: grading_result.into_inner(),
245        });
246    }
247    Ok(HttpResponse::Ok().finish())
248}
249
250pub fn _add_routes(cfg: &mut ServiceConfig) {
251    cfg.route("/ws", web::get().to(websocket))
252        .route("/grading/{websocket_id}", web::post().to(receive_grading));
253}