headless_lms_server/controllers/main_frontend/
playground_views.rsuse crate::prelude::*;
use actix::{
Actor, ActorContext, Addr, AsyncContext, Handler, Message, SpawnHandle, StreamHandler,
};
use actix_web_actors::ws;
use models::exercise_task_gradings::ExerciseTaskGradingResult;
use once_cell::sync::OnceCell;
use std::{
collections::HashMap,
sync::RwLock,
time::{Duration, Instant},
};
const PING_INTERVAL: Duration = Duration::from_secs(10);
const CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
static WS_CONNECTIONS: WsConnections = WsConnections::new();
struct WsConnections(OnceCell<RwLock<HashMap<Uuid, Addr<ClientConnection>>>>);
impl WsConnections {
const fn new() -> Self {
Self(OnceCell::new())
}
fn get(&self, id: Uuid) -> Option<Addr<ClientConnection>> {
let lock = self
.0
.get_or_init(Default::default)
.read()
.expect("should never panic with the lock");
lock.get(&id).cloned()
}
fn register(&self, id: Uuid, addr: Addr<ClientConnection>) {
let mut lock = self
.0
.get_or_init(Default::default)
.write()
.expect("should never panic with the lock");
lock.insert(id, addr);
}
fn unregister(&self, id: Uuid) {
if let Some(connections) = self.0.get() {
let mut lock = connections
.write()
.expect("should never panic with the lock");
lock.remove(&id);
}
}
}
struct ClientConnection {
client_id: Uuid,
last_pong: Instant,
ping_handle: Option<SpawnHandle>,
}
impl ClientConnection {
fn new() -> Self {
Self {
client_id: Uuid::new_v4(),
last_pong: Instant::now(),
ping_handle: None,
}
}
}
impl Actor for ClientConnection {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let id = self.client_id;
let ping_handle = ctx.run_interval(PING_INTERVAL, move |socket, ctx| {
if socket.last_pong.elapsed() > CONNECTION_TIMEOUT {
WS_CONNECTIONS.unregister(id);
ctx.text(
serde_json::to_string(&PlaygroundViewsMessage::TimedOut)
.expect("should never panic"),
);
ctx.stop();
} else {
ctx.ping(b"ping");
}
});
WS_CONNECTIONS.register(id, ctx.address());
ctx.text(
serde_json::to_string(&PlaygroundViewsMessage::Registered(id))
.expect("should never fail"),
);
self.ping_handle = Some(ping_handle);
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for ClientConnection {
fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match item {
Ok(ws::Message::Ping(_)) => {
ctx.pong(b"pong");
}
Ok(ws::Message::Pong(_)) => {
self.last_pong = Instant::now();
}
_ => {}
};
}
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
struct PlaygroundSubmissionMessage {
grading_result: ExerciseTaskGradingResult,
}
impl Handler<PlaygroundSubmissionMessage> for ClientConnection {
type Result = ();
fn handle(
&mut self,
msg: PlaygroundSubmissionMessage,
ctx: &mut Self::Context,
) -> Self::Result {
ctx.text(
serde_json::to_string(&PlaygroundViewsMessage::ExerciseTaskGradingResult(
msg.grading_result,
))
.expect("should never fail"),
);
}
}
#[derive(Debug, Serialize, Message)]
#[rtype(result = "()")]
#[cfg_attr(feature = "ts_rs", derive(TS))]
#[serde(tag = "tag", content = "data")]
pub enum PlaygroundViewsMessage {
TimedOut,
Registered(Uuid),
ExerciseTaskGradingResult(ExerciseTaskGradingResult),
}
async fn websocket(
req: HttpRequest,
stream: web::Payload,
) -> Result<HttpResponse, ControllerError> {
ws::start(ClientConnection::new(), &req, stream).map_err(Into::into)
}
async fn receive_grading(
websocket_id: web::Path<Uuid>,
grading_result: web::Json<ExerciseTaskGradingResult>,
) -> Result<HttpResponse, ControllerError> {
if let Some(conn) = WS_CONNECTIONS.get(*websocket_id) {
conn.do_send(PlaygroundSubmissionMessage {
grading_result: grading_result.into_inner(),
});
}
Ok(HttpResponse::Ok().finish())
}
pub fn _add_routes(cfg: &mut ServiceConfig) {
cfg.route("/ws", web::get().to(websocket))
.route("/grading/{websocket_id}", web::post().to(receive_grading));
}