Skip to main content

headless_lms_server/programs/
service_info_fetcher.rs

1use std::env;
2
3use crate::config::program_config::ProgramConfig;
4use crate::{domain::models_requests, setup_tracing};
5use anyhow::Result;
6use dotenvy::dotenv;
7use futures::stream::{self, StreamExt};
8use headless_lms_models::{
9    exercise_service_info::{ExerciseServiceInfo, fetch_and_upsert_service_info},
10    exercise_services::ExerciseService,
11};
12use sqlx::PgPool;
13use tokio::time::{Duration, sleep};
14use tracing::info;
15
16const N: usize = 10;
17
18pub async fn main() -> anyhow::Result<()> {
19    // Setting the sqlx log level to warn stops sql statements being printed to the console.
20    // This is useful here since this is being run in a loop in background and the sql statements
21    // would create a lot of noise to the log.
22    // TODO: Audit that the environment access only happens in single-threaded code.
23    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
24    dotenv().ok();
25    setup_tracing()?;
26
27    let database_url = ProgramConfig::database_url_with_default();
28    let db_pool = PgPool::connect(&database_url).await?;
29
30    let mut conn = db_pool.acquire().await?;
31
32    loop {
33        let exercise_services =
34            headless_lms_models::exercise_services::get_exercise_services(&mut conn).await?;
35        debug!(
36            "Fetching and updating statuses from {} services",
37            exercise_services.len()
38        );
39        let iter_stream = stream::iter(exercise_services.iter().map(|exercise_service| {
40            do_fetch_and_upsert_service_info(db_pool.clone(), exercise_service)
41        }));
42        // Run N futures concurrently
43        let buffer_unordered = iter_stream.buffer_unordered(N);
44        let results = buffer_unordered.collect::<Vec<_>>().await;
45        let (succeeded, failed) = results.into_iter().partition::<Vec<_>, _>(|o| o.is_ok());
46        info!(
47            "Fetching and updating statuses complete. Succeeded: {}, failed: {}",
48            succeeded.len(),
49            failed.len()
50        );
51        sleep(Duration::from_secs(60)).await;
52    }
53}
54
55pub async fn do_fetch_and_upsert_service_info(
56    pool: PgPool,
57    exercise_service: &ExerciseService,
58) -> Result<ExerciseServiceInfo> {
59    let mut conn = pool.acquire().await?;
60    Ok(fetch_and_upsert_service_info(
61        &mut conn,
62        exercise_service,
63        models_requests::fetch_service_info,
64    )
65    .await?)
66}