Skip to main content

headless_lms_server/programs/
sync_tmc_users.rs

1/*!
2Syncs tmc users
3*/
4use std::env;
5
6use crate::config::program_config::ProgramConfig;
7use crate::setup_tracing;
8use anyhow::Context;
9
10use chrono::DateTime;
11use dotenvy::dotenv;
12use headless_lms_models as models;
13use models::users::{get_users_ids_in_db_from_upstream_ids, update_email_for_user};
14
15use serde::{Deserialize, Serialize};
16use sqlx::{PgConnection, PgPool};
17
18const URL: &str = "https://tmc.mooc.fi/api/v8/users/recently_changed_user_details";
19
20#[derive(Debug, Serialize, Deserialize)]
21pub struct TMCRecentChanges {
22    pub changes: Vec<Change>,
23}
24
25#[derive(Debug, Serialize, Deserialize)]
26pub struct Change {
27    pub change_type: String,
28    pub new_value: Option<String>,
29    pub old_value: Option<String>,
30    pub created_at: String,
31    pub id: i32,
32    pub user_id: Option<i32>,
33}
34
35pub async fn main() -> anyhow::Result<()> {
36    // TODO: Audit that the environment access only happens in single-threaded code.
37    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
38    dotenv().ok();
39    setup_tracing()?;
40    let database_url = ProgramConfig::database_url_with_default();
41    let recent_changes = fetch_recently_changed_user_details().await?;
42    let db_pool = PgPool::connect(&database_url).await?;
43    let mut conn = db_pool.acquire().await?;
44    delete_users(&mut conn, &recent_changes).await?;
45    update_users(&mut conn, &recent_changes).await?;
46    Ok(())
47}
48
49pub async fn update_users(
50    conn: &mut PgConnection,
51    recent_changes: &TMCRecentChanges,
52) -> anyhow::Result<()> {
53    let email_update_list = recent_changes
54        .changes
55        .iter()
56        .filter(|c| c.change_type == "email_changed")
57        .collect::<Vec<_>>();
58
59    info!("Updating emails for {} users", email_update_list.len());
60
61    // Parse dates and filter out invalid entries
62    let mut parsed_updates: Vec<(DateTime<chrono::FixedOffset>, &Change)> = Vec::new();
63    for change in &email_update_list {
64        match DateTime::parse_from_rfc3339(change.created_at.as_str()) {
65            Ok(date) => parsed_updates.push((date, change)),
66            Err(e) => {
67                error!("Error converting date: '{}'", change.created_at);
68                error!("Error: {}", e);
69                return Err(anyhow::anyhow!(
70                    "Error converting date: '{}'",
71                    change.created_at
72                ));
73            }
74        }
75    }
76
77    parsed_updates.sort_by_key(|a| a.0);
78
79    let email_update_list: Vec<&Change> = parsed_updates
80        .into_iter()
81        .map(|(_, change)| change)
82        .collect();
83
84    for change in email_update_list {
85        if let Some(user_id) = change.user_id {
86            match update_email_for_user(
87                &mut *conn,
88                &user_id,
89                change.new_value.as_deref().unwrap_or("unknown").to_string(),
90            )
91            .await
92            {
93                Ok(email) => email,
94                Err(e) => {
95                    error!("Error updating user with id {}", user_id);
96                    error!("Error: {}", e);
97                }
98            };
99        };
100    }
101
102    info!("Update done");
103    Ok(())
104}
105
106pub async fn delete_users(
107    conn: &mut PgConnection,
108    recent_changes: &TMCRecentChanges,
109) -> anyhow::Result<()> {
110    let to_delete = recent_changes
111        .changes
112        .iter()
113        .filter(|c| c.change_type == "deleted")
114        .filter_map(|c| c.user_id)
115        .collect::<Vec<_>>();
116    info!("Making sure {} users are deleted", to_delete.len());
117    let user_ids_in_db = get_users_ids_in_db_from_upstream_ids(&mut *conn, &to_delete).await?;
118    info!("{} users need to be deleted", to_delete.len());
119    for id in user_ids_in_db {
120        models::users::delete_user(&mut *conn, id).await?;
121    }
122    info!("Deletions done");
123    Ok(())
124}
125
126pub async fn fetch_recently_changed_user_details() -> anyhow::Result<TMCRecentChanges> {
127    let access_token = ProgramConfig::required("TMC_ACCESS_TOKEN")?;
128    let ratelimit_api_key = ProgramConfig::required("RATELIMIT_PROTECTION_SAFE_API_KEY")?;
129    let client = reqwest::Client::new();
130    let res = client
131        .get(URL)
132        .header("RATELIMIT-PROTECTION-SAFE-API-KEY", ratelimit_api_key)
133        .header(reqwest::header::CONTENT_TYPE, "application/json")
134        .header(reqwest::header::ACCEPT, "application/json")
135        .bearer_auth(&access_token)
136        .send()
137        .await
138        .context("Failed to send request to https://tmc.mooc.fi")?;
139    if res.status().is_success() {
140        let res: TMCRecentChanges = res.json().await?;
141        info!("Fetched {} changes", res.changes.len());
142        Ok(res)
143    } else {
144        let response_body = res.bytes().await?.to_vec();
145        let response_body_string = String::from_utf8_lossy(&response_body);
146        error!(
147            ?response_body_string,
148            "Failed to fetch recently changed user details",
149        );
150        Err(anyhow::anyhow!(
151            "Failed to get recently changed user details from TMC"
152        ))
153    }
154}