1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use std::{env, usize};

use crate::{domain::models_requests, setup_tracing};
use anyhow::Result;
use dotenv::dotenv;
use futures::stream::{self, StreamExt};
use headless_lms_models::{
    exercise_service_info::{fetch_and_upsert_service_info, ExerciseServiceInfo},
    exercise_services::ExerciseService,
};
use sqlx::PgPool;
use tokio::time::{sleep, Duration};
use tracing::info;

const N: usize = 10;

pub async fn main() -> anyhow::Result<()> {
    // Setting the sqlx log level to warn stops sql statements being printed to the console.
    // This is useful here since this is being run in a loop in background and the sql statements
    // would create a lot of noise to the log.
    env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn");
    dotenv().ok();
    setup_tracing()?;

    let database_url = env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
    let db_pool = PgPool::connect(&database_url).await?;

    let mut conn = db_pool.acquire().await?;

    loop {
        let exercise_services =
            headless_lms_models::exercise_services::get_exercise_services(&mut conn).await?;
        debug!(
            "Fetching and updating statuses from {} services",
            exercise_services.len()
        );
        let iter_stream = stream::iter(exercise_services.iter().map(|exercise_service| {
            do_fetch_and_upsert_service_info(db_pool.clone(), exercise_service)
        }));
        // Run N futures concurrently
        let buffer_unordered = iter_stream.buffer_unordered(N);
        let results = buffer_unordered.collect::<Vec<_>>().await;
        let (succeeded, failed) = results.into_iter().partition::<Vec<_>, _>(|o| o.is_ok());
        info!(
            "Fetching and updating statuses complete. Succeeded: {}, failed: {}",
            succeeded.len(),
            failed.len()
        );
        sleep(Duration::from_secs(60)).await;
    }
}

pub async fn do_fetch_and_upsert_service_info(
    pool: PgPool,
    exercise_service: &ExerciseService,
) -> Result<ExerciseServiceInfo> {
    let mut conn = pool.acquire().await?;
    Ok(fetch_and_upsert_service_info(
        &mut conn,
        exercise_service,
        models_requests::fetch_service_info,
    )
    .await?)
}