headless_lms_server/programs/
sync_tmc_users.rs

1/*!
2Syncs tmc users
3*/
4use std::env;
5
6use crate::setup_tracing;
7use anyhow::Context;
8
9use chrono::DateTime;
10use dotenv::dotenv;
11use headless_lms_models as models;
12use models::users::{get_users_ids_in_db_from_upstream_ids, update_email_for_user};
13
14use serde::{Deserialize, Serialize};
15use sqlx::{PgConnection, PgPool};
16
17const URL: &str = "https://tmc.mooc.fi/api/v8/users/recently_changed_user_details";
18
19#[derive(Debug, Serialize, Deserialize)]
20pub struct TMCRecentChanges {
21    pub changes: Vec<Change>,
22}
23
24#[derive(Debug, Serialize, Deserialize)]
25pub struct Change {
26    pub change_type: String,
27    pub new_value: Option<String>,
28    pub old_value: Option<String>,
29    pub created_at: String,
30    pub id: i32,
31    pub user_id: Option<i32>,
32}
33
34pub async fn main() -> anyhow::Result<()> {
35    // TODO: Audit that the environment access only happens in single-threaded code.
36    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
37    dotenv().ok();
38    setup_tracing()?;
39    let database_url = env::var("DATABASE_URL")
40        .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
41    let recent_changes = fetch_recently_changed_user_details().await?;
42    let db_pool = PgPool::connect(&database_url).await?;
43    let mut conn = db_pool.acquire().await?;
44    delete_users(&mut conn, &recent_changes).await?;
45    update_users(&mut conn, &recent_changes).await?;
46    Ok(())
47}
48
49pub async fn update_users(
50    conn: &mut PgConnection,
51    recent_changes: &TMCRecentChanges,
52) -> anyhow::Result<()> {
53    let mut email_update_list = recent_changes
54        .changes
55        .iter()
56        .filter(|c| c.change_type == "email_changed")
57        .collect::<Vec<_>>();
58
59    info!("Updating emails for {} users", email_update_list.len());
60    email_update_list.sort_by(|a, b| {
61        let date_a = match DateTime::parse_from_rfc3339(a.created_at.as_str()) {
62            Ok(val) => val,
63            Err(e) => {
64                error!("Error converting date: '{}'", a.created_at);
65                error!("Error: {}", e);
66                DateTime::parse_from_str("01.01.1450", "%d.%m.%Y").unwrap()
67            }
68        };
69
70        let date_b = match DateTime::parse_from_rfc3339(b.created_at.as_str()) {
71            Ok(val) => val,
72            Err(e) => {
73                error!("Error converting date: '{}'", b.created_at);
74                error!("Error: {}", e);
75                DateTime::parse_from_str("01.01.1450", "%d.%m.%Y").unwrap()
76            }
77        };
78
79        date_a.cmp(&date_b)
80    });
81
82    for change in email_update_list {
83        if let Some(user_id) = change.user_id {
84            match update_email_for_user(
85                &mut *conn,
86                &user_id,
87                change.new_value.as_deref().unwrap_or("unknown").to_string(),
88            )
89            .await
90            {
91                Ok(email) => email,
92                Err(e) => {
93                    error!("Error updating user with id {}", user_id);
94                    error!("Error: {}", e);
95                }
96            };
97        };
98    }
99
100    info!("Update done");
101    Ok(())
102}
103
104pub async fn delete_users(
105    conn: &mut PgConnection,
106    recent_changes: &TMCRecentChanges,
107) -> anyhow::Result<()> {
108    let to_delete = recent_changes
109        .changes
110        .iter()
111        .filter(|c| c.change_type == "deleted")
112        .filter_map(|c| c.user_id)
113        .collect::<Vec<_>>();
114    info!("Making sure {} users are deleted", to_delete.len());
115    let user_ids_in_db = get_users_ids_in_db_from_upstream_ids(&mut *conn, &to_delete).await?;
116    info!("{} users need to be deleted", to_delete.len());
117    for id in user_ids_in_db {
118        models::users::delete_user(&mut *conn, id).await?;
119    }
120    info!("Deletions done");
121    Ok(())
122}
123
124pub async fn fetch_recently_changed_user_details() -> anyhow::Result<TMCRecentChanges> {
125    let access_token = env::var("TMC_ACCESS_TOKEN").expect("TMC_ACCESS_TOKEN must be defined");
126    let ratelimit_api_key = env::var("RATELIMIT_PROTECTION_SAFE_API_KEY")
127        .expect("RATELIMIT_PROTECTION_SAFE_API_KEY must be defined");
128    let client = reqwest::Client::new();
129    let res = client
130        .get(URL)
131        .header("RATELIMIT-PROTECTION-SAFE-API-KEY", ratelimit_api_key)
132        .header(reqwest::header::CONTENT_TYPE, "application/json")
133        .header(reqwest::header::ACCEPT, "application/json")
134        .bearer_auth(&access_token)
135        .send()
136        .await
137        .context("Failed to send request to https://tmc.mooc.fi")?;
138    if res.status().is_success() {
139        let res: TMCRecentChanges = res.json().await?;
140        info!("Fetched {} changes", res.changes.len());
141        Ok(res)
142    } else {
143        let response_body = res.bytes().await?.to_vec();
144        let response_body_string = String::from_utf8_lossy(&response_body);
145        error!(
146            ?response_body_string,
147            "Failed to fetch recently changed user details",
148        );
149        Err(anyhow::anyhow!(
150            "Failed to get recently changed user details from TMC"
151        ))
152    }
153}