headless_lms_server/programs/
service_info_fetcher.rs

1use std::env;
2
3use crate::{domain::models_requests, setup_tracing};
4use anyhow::Result;
5use dotenv::dotenv;
6use futures::stream::{self, StreamExt};
7use headless_lms_models::{
8    exercise_service_info::{ExerciseServiceInfo, fetch_and_upsert_service_info},
9    exercise_services::ExerciseService,
10};
11use sqlx::PgPool;
12use tokio::time::{Duration, sleep};
13use tracing::info;
14
15const N: usize = 10;
16
17pub async fn main() -> anyhow::Result<()> {
18    // Setting the sqlx log level to warn stops sql statements being printed to the console.
19    // This is useful here since this is being run in a loop in background and the sql statements
20    // would create a lot of noise to the log.
21    // TODO: Audit that the environment access only happens in single-threaded code.
22    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
23    dotenv().ok();
24    setup_tracing()?;
25
26    let database_url = env::var("DATABASE_URL")
27        .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
28    let db_pool = PgPool::connect(&database_url).await?;
29
30    let mut conn = db_pool.acquire().await?;
31
32    loop {
33        let exercise_services =
34            headless_lms_models::exercise_services::get_exercise_services(&mut conn).await?;
35        debug!(
36            "Fetching and updating statuses from {} services",
37            exercise_services.len()
38        );
39        let iter_stream = stream::iter(exercise_services.iter().map(|exercise_service| {
40            do_fetch_and_upsert_service_info(db_pool.clone(), exercise_service)
41        }));
42        // Run N futures concurrently
43        let buffer_unordered = iter_stream.buffer_unordered(N);
44        let results = buffer_unordered.collect::<Vec<_>>().await;
45        let (succeeded, failed) = results.into_iter().partition::<Vec<_>, _>(|o| o.is_ok());
46        info!(
47            "Fetching and updating statuses complete. Succeeded: {}, failed: {}",
48            succeeded.len(),
49            failed.len()
50        );
51        sleep(Duration::from_secs(60)).await;
52    }
53}
54
55pub async fn do_fetch_and_upsert_service_info(
56    pool: PgPool,
57    exercise_service: &ExerciseService,
58) -> Result<ExerciseServiceInfo> {
59    let mut conn = pool.acquire().await?;
60    Ok(fetch_and_upsert_service_info(
61        &mut conn,
62        exercise_service,
63        models_requests::fetch_service_info,
64    )
65    .await?)
66}