headless_lms_server/programs/
service_info_fetcher.rs1use std::env;
2
3use crate::config::program_config::ProgramConfig;
4use crate::{domain::models_requests, setup_tracing};
5use anyhow::Result;
6use dotenvy::dotenv;
7use futures::stream::{self, StreamExt};
8use headless_lms_models::{
9 exercise_service_info::{ExerciseServiceInfo, fetch_and_upsert_service_info},
10 exercise_services::ExerciseService,
11};
12use sqlx::PgPool;
13use tokio::time::{Duration, sleep};
14use tracing::info;
15
16const N: usize = 10;
17
18pub async fn main() -> anyhow::Result<()> {
19 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
24 dotenv().ok();
25 setup_tracing()?;
26
27 let database_url = ProgramConfig::database_url_with_default();
28 let db_pool = PgPool::connect(&database_url).await?;
29
30 let mut conn = db_pool.acquire().await?;
31
32 loop {
33 let exercise_services =
34 headless_lms_models::exercise_services::get_exercise_services(&mut conn).await?;
35 debug!(
36 "Fetching and updating statuses from {} services",
37 exercise_services.len()
38 );
39 let iter_stream = stream::iter(exercise_services.iter().map(|exercise_service| {
40 do_fetch_and_upsert_service_info(db_pool.clone(), exercise_service)
41 }));
42 let buffer_unordered = iter_stream.buffer_unordered(N);
44 let results = buffer_unordered.collect::<Vec<_>>().await;
45 let (succeeded, failed) = results.into_iter().partition::<Vec<_>, _>(|o| o.is_ok());
46 info!(
47 "Fetching and updating statuses complete. Succeeded: {}, failed: {}",
48 succeeded.len(),
49 failed.len()
50 );
51 sleep(Duration::from_secs(60)).await;
52 }
53}
54
55pub async fn do_fetch_and_upsert_service_info(
56 pool: PgPool,
57 exercise_service: &ExerciseService,
58) -> Result<ExerciseServiceInfo> {
59 let mut conn = pool.acquire().await?;
60 Ok(fetch_and_upsert_service_info(
61 &mut conn,
62 exercise_service,
63 models_requests::fetch_service_info,
64 )
65 .await?)
66}