headless_lms_server/programs/
sync_tmc_users.rs1use std::env;
5
6use crate::setup_tracing;
7use anyhow::Context;
8
9use chrono::DateTime;
10use dotenv::dotenv;
11use headless_lms_models as models;
12use models::users::{get_users_ids_in_db_from_upstream_ids, update_email_for_user};
13
14use serde::{Deserialize, Serialize};
15use sqlx::{PgConnection, PgPool};
16
17const URL: &str = "https://tmc.mooc.fi/api/v8/users/recently_changed_user_details";
18
19#[derive(Debug, Serialize, Deserialize)]
20pub struct TMCRecentChanges {
21 pub changes: Vec<Change>,
22}
23
24#[derive(Debug, Serialize, Deserialize)]
25pub struct Change {
26 pub change_type: String,
27 pub new_value: Option<String>,
28 pub old_value: Option<String>,
29 pub created_at: String,
30 pub id: i32,
31 pub user_id: Option<i32>,
32}
33
34pub async fn main() -> anyhow::Result<()> {
35 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
37 dotenv().ok();
38 setup_tracing()?;
39 let database_url = env::var("DATABASE_URL")
40 .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
41 let recent_changes = fetch_recently_changed_user_details().await?;
42 let db_pool = PgPool::connect(&database_url).await?;
43 let mut conn = db_pool.acquire().await?;
44 delete_users(&mut conn, &recent_changes).await?;
45 update_users(&mut conn, &recent_changes).await?;
46 Ok(())
47}
48
49pub async fn update_users(
50 conn: &mut PgConnection,
51 recent_changes: &TMCRecentChanges,
52) -> anyhow::Result<()> {
53 let email_update_list = recent_changes
54 .changes
55 .iter()
56 .filter(|c| c.change_type == "email_changed")
57 .collect::<Vec<_>>();
58
59 info!("Updating emails for {} users", email_update_list.len());
60
61 let mut parsed_updates: Vec<(DateTime<chrono::FixedOffset>, &Change)> = Vec::new();
63 for change in &email_update_list {
64 match DateTime::parse_from_rfc3339(change.created_at.as_str()) {
65 Ok(date) => parsed_updates.push((date, change)),
66 Err(e) => {
67 error!("Error converting date: '{}'", change.created_at);
68 error!("Error: {}", e);
69 return Err(anyhow::anyhow!(
70 "Error converting date: '{}'",
71 change.created_at
72 ));
73 }
74 }
75 }
76
77 parsed_updates.sort_by(|a, b| a.0.cmp(&b.0));
78
79 let email_update_list: Vec<&Change> = parsed_updates
80 .into_iter()
81 .map(|(_, change)| change)
82 .collect();
83
84 for change in email_update_list {
85 if let Some(user_id) = change.user_id {
86 match update_email_for_user(
87 &mut *conn,
88 &user_id,
89 change.new_value.as_deref().unwrap_or("unknown").to_string(),
90 )
91 .await
92 {
93 Ok(email) => email,
94 Err(e) => {
95 error!("Error updating user with id {}", user_id);
96 error!("Error: {}", e);
97 }
98 };
99 };
100 }
101
102 info!("Update done");
103 Ok(())
104}
105
106pub async fn delete_users(
107 conn: &mut PgConnection,
108 recent_changes: &TMCRecentChanges,
109) -> anyhow::Result<()> {
110 let to_delete = recent_changes
111 .changes
112 .iter()
113 .filter(|c| c.change_type == "deleted")
114 .filter_map(|c| c.user_id)
115 .collect::<Vec<_>>();
116 info!("Making sure {} users are deleted", to_delete.len());
117 let user_ids_in_db = get_users_ids_in_db_from_upstream_ids(&mut *conn, &to_delete).await?;
118 info!("{} users need to be deleted", to_delete.len());
119 for id in user_ids_in_db {
120 models::users::delete_user(&mut *conn, id).await?;
121 }
122 info!("Deletions done");
123 Ok(())
124}
125
126pub async fn fetch_recently_changed_user_details() -> anyhow::Result<TMCRecentChanges> {
127 let access_token = env::var("TMC_ACCESS_TOKEN").expect("TMC_ACCESS_TOKEN must be defined");
128 let ratelimit_api_key = env::var("RATELIMIT_PROTECTION_SAFE_API_KEY")
129 .expect("RATELIMIT_PROTECTION_SAFE_API_KEY must be defined");
130 let client = reqwest::Client::new();
131 let res = client
132 .get(URL)
133 .header("RATELIMIT-PROTECTION-SAFE-API-KEY", ratelimit_api_key)
134 .header(reqwest::header::CONTENT_TYPE, "application/json")
135 .header(reqwest::header::ACCEPT, "application/json")
136 .bearer_auth(&access_token)
137 .send()
138 .await
139 .context("Failed to send request to https://tmc.mooc.fi")?;
140 if res.status().is_success() {
141 let res: TMCRecentChanges = res.json().await?;
142 info!("Fetched {} changes", res.changes.len());
143 Ok(res)
144 } else {
145 let response_body = res.bytes().await?.to_vec();
146 let response_body_string = String::from_utf8_lossy(&response_body);
147 error!(
148 ?response_body_string,
149 "Failed to fetch recently changed user details",
150 );
151 Err(anyhow::anyhow!(
152 "Failed to get recently changed user details from TMC"
153 ))
154 }
155}