headless_lms_server/programs/
service_info_fetcher.rs1use std::env;
2
3use crate::{domain::models_requests, setup_tracing};
4use anyhow::Result;
5use dotenv::dotenv;
6use futures::stream::{self, StreamExt};
7use headless_lms_models::{
8 exercise_service_info::{ExerciseServiceInfo, fetch_and_upsert_service_info},
9 exercise_services::ExerciseService,
10};
11use sqlx::PgPool;
12use tokio::time::{Duration, sleep};
13use tracing::info;
14
15const N: usize = 10;
16
17pub async fn main() -> anyhow::Result<()> {
18 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
23 dotenv().ok();
24 setup_tracing()?;
25
26 let database_url = env::var("DATABASE_URL")
27 .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
28 let db_pool = PgPool::connect(&database_url).await?;
29
30 let mut conn = db_pool.acquire().await?;
31
32 loop {
33 let exercise_services =
34 headless_lms_models::exercise_services::get_exercise_services(&mut conn).await?;
35 debug!(
36 "Fetching and updating statuses from {} services",
37 exercise_services.len()
38 );
39 let iter_stream = stream::iter(exercise_services.iter().map(|exercise_service| {
40 do_fetch_and_upsert_service_info(db_pool.clone(), exercise_service)
41 }));
42 let buffer_unordered = iter_stream.buffer_unordered(N);
44 let results = buffer_unordered.collect::<Vec<_>>().await;
45 let (succeeded, failed) = results.into_iter().partition::<Vec<_>, _>(|o| o.is_ok());
46 info!(
47 "Fetching and updating statuses complete. Succeeded: {}, failed: {}",
48 succeeded.len(),
49 failed.len()
50 );
51 sleep(Duration::from_secs(60)).await;
52 }
53}
54
55pub async fn do_fetch_and_upsert_service_info(
56 pool: PgPool,
57 exercise_service: &ExerciseService,
58) -> Result<ExerciseServiceInfo> {
59 let mut conn = pool.acquire().await?;
60 Ok(fetch_and_upsert_service_info(
61 &mut conn,
62 exercise_service,
63 models_requests::fetch_service_info,
64 )
65 .await?)
66}