Skip to main content

headless_lms_server/programs/
regrader.rs

1use std::{env, error::Error, sync::Arc, time::Duration};
2
3use crate::config::program_config::ProgramConfig;
4use crate::domain::models_requests::{self, JwtKey};
5use headless_lms_models as models;
6use models::library::regrading;
7use sqlx::PgPool;
8
9/**
10Starts a thread that will periodically send regrading submissions to the corresponding exercise services for regrading.
11*/
12pub async fn main() -> anyhow::Result<()> {
13    // TODO: Audit that the environment access only happens in single-threaded code.
14    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
15    dotenvy::dotenv().ok();
16    crate::setup_tracing()?;
17    let db_url = ProgramConfig::database_url_with_default();
18    let jwt_password = ProgramConfig::required("JWT_PASSWORD")?;
19    let jwt_key = Arc::new(JwtKey::new(&jwt_password)?);
20
21    let mut interval = tokio::time::interval(Duration::from_secs(10));
22    let mut ticks = 60;
23
24    // Since this is repeating every 10 seconds we can keep the connection open.
25    let db_pool = PgPool::connect(&db_url).await?;
26    let mut conn = db_pool.acquire().await?;
27    loop {
28        interval.tick().await;
29
30        ticks += 1;
31        // 60 10 second intervals = 10 minutes
32        if ticks > 60 {
33            // occasionally prints a reminder that the service is still running
34            ticks = 0;
35            tracing::info!("running the regrader");
36        }
37
38        let exercise_services_by_type =
39            models::exercise_service_info::get_upsert_all_exercise_services_by_type(
40                &mut conn,
41                models_requests::fetch_service_info,
42            )
43            .await?;
44        // do not stop the thread on error, report it and try again next tick
45        if let Err(err) = regrading::regrade(
46            &mut conn,
47            &exercise_services_by_type,
48            models_requests::make_grading_request_sender(Arc::clone(&jwt_key)),
49        )
50        .await
51        {
52            tracing::error!("Error in regrader: {}", err);
53
54            if let Some(sqlx::Error::Io(..)) =
55                err.source().and_then(|s| s.downcast_ref::<sqlx::Error>())
56            {
57                // this usually happens if the database is reset while running bin/dev etc.
58                tracing::info!(
59                    "regrader may have lost its connection to the db, trying to reconnect"
60                );
61                conn = db_pool.acquire().await?;
62            }
63        }
64    }
65}