Skip to main content

headless_lms_models/
users.rs

1use crate::prelude::*;
2
3#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
4
5pub struct User {
6    pub id: Uuid,
7    pub created_at: DateTime<Utc>,
8    pub updated_at: DateTime<Utc>,
9    pub deleted_at: Option<DateTime<Utc>>,
10    pub upstream_id: Option<i32>,
11    pub email_domain: Option<String>,
12}
13
14pub async fn insert(
15    conn: &mut PgConnection,
16    pkey_policy: PKeyPolicy<Uuid>,
17    email: &str,
18    first_name: Option<&str>,
19    last_name: Option<&str>,
20) -> ModelResult<Uuid> {
21    let mut tx = conn.begin().await?;
22    let email_domain = email.trim().split('@').next_back();
23    let res = sqlx::query!(
24        "
25INSERT INTO users (id, email_domain)
26VALUES ($1, $2)
27RETURNING id
28",
29        pkey_policy.into_uuid(),
30        email_domain
31    )
32    .fetch_one(&mut *tx)
33    .await?;
34
35    let _res2 = sqlx::query!(
36        "
37INSERT INTO user_details (user_id, email, first_name, last_name)
38VALUES ($1, $2, $3, $4)
39",
40        res.id,
41        email,
42        first_name,
43        last_name
44    )
45    .execute(&mut *tx)
46    .await?;
47    tx.commit().await?;
48    Ok(res.id)
49}
50
51pub async fn insert_with_upstream_id_and_moocfi_id(
52    conn: &mut PgConnection,
53    email: &str,
54    first_name: Option<&str>,
55    last_name: Option<&str>,
56    upstream_id: i32,
57    moocfi_id: Uuid,
58) -> ModelResult<User> {
59    info!("The user is not in the database yet, inserting");
60    let email_domain = email.trim().split('@').next_back();
61    let mut tx = conn.begin().await?;
62    let user = sqlx::query_as!(
63        User,
64        r#"
65INSERT INTO
66  users (id, upstream_id, email_domain)
67VALUES ($1, $2, $3)
68RETURNING *;
69          "#,
70        moocfi_id,
71        upstream_id,
72        email_domain
73    )
74    .fetch_one(&mut *tx)
75    .await?;
76
77    let _res2 = sqlx::query!(
78        "
79INSERT INTO user_details (user_id, email, first_name, last_name)
80VALUES ($1, $2, $3, $4)
81",
82        user.id,
83        email,
84        first_name,
85        last_name
86    )
87    .execute(&mut *tx)
88    .await?;
89    tx.commit().await?;
90    Ok(user)
91}
92
93/// Looks up a user by email (case-insensitive) using the `lower(email)` index on `user_details`.
94pub async fn get_by_email(conn: &mut PgConnection, email: &str) -> ModelResult<User> {
95    let user = sqlx::query_as!(
96        User,
97        "
98SELECT users.*
99FROM user_details
100JOIN users ON (user_details.user_id = users.id)
101WHERE lower(user_details.email) = lower($1)
102        ",
103        email
104    )
105    .fetch_one(conn)
106    .await?;
107    Ok(user)
108}
109
110pub async fn get_by_id(conn: &mut PgConnection, id: Uuid) -> ModelResult<User> {
111    let user = sqlx::query_as!(
112        User,
113        "
114SELECT *
115FROM users
116WHERE id = $1
117        ",
118        id
119    )
120    .fetch_one(conn)
121    .await?;
122    Ok(user)
123}
124
125pub async fn find_by_upstream_id(
126    conn: &mut PgConnection,
127    upstream_id: i32,
128) -> ModelResult<Option<User>> {
129    let user = sqlx::query_as!(
130        User,
131        "SELECT * FROM users WHERE upstream_id = $1",
132        upstream_id
133    )
134    .fetch_optional(conn)
135    .await?;
136    Ok(user)
137}
138
139/// Includes all users who have returned an exercise on a course course instance
140pub async fn get_all_user_ids_with_user_exercise_states_on_course(
141    conn: &mut PgConnection,
142    course_id: Uuid,
143) -> ModelResult<Vec<Uuid>> {
144    let res = sqlx::query!(
145        "
146SELECT DISTINCT user_id
147FROM user_exercise_states
148WHERE course_id = $1
149  AND deleted_at IS NULL
150        ",
151        course_id
152    )
153    .map(|x| x.user_id)
154    .fetch_all(conn)
155    .await?;
156    Ok(res)
157}
158
159pub async fn get_users_by_course_instance_enrollment(
160    conn: &mut PgConnection,
161    course_instance_id: Uuid,
162) -> ModelResult<Vec<User>> {
163    let res = sqlx::query_as!(
164        User,
165        "
166SELECT *
167FROM users
168WHERE id IN (
169    SELECT user_id
170    FROM course_instance_enrollments
171    WHERE course_instance_id = $1
172      AND deleted_at IS NULL
173  )
174",
175        course_instance_id,
176    )
177    .fetch_all(&mut *conn)
178    .await?;
179    Ok(res)
180}
181
182pub async fn get_users_ids_in_db_from_upstream_ids(
183    conn: &mut PgConnection,
184    upstream_ids: &[i32],
185) -> ModelResult<Vec<Uuid>> {
186    let res = sqlx::query!(
187        "
188SELECT id
189FROM users
190WHERE upstream_id IN (
191    SELECT UNNEST($1::integer [])
192  )
193AND deleted_at IS NULL
194",
195        upstream_ids,
196    )
197    .fetch_all(&mut *conn)
198    .await?;
199    Ok(res.iter().map(|x| x.id).collect::<Vec<_>>())
200}
201
202pub async fn update_email_for_user(
203    conn: &mut PgConnection,
204    upstream_id: &i32,
205    new_email: String,
206) -> ModelResult<()> {
207    info!("Updating user (Upstream id: {upstream_id})");
208    let mut tx = conn.begin().await?;
209
210    let user = sqlx::query_as!(
211        User,
212        "SELECT * FROM users WHERE upstream_id = $1",
213        upstream_id
214    )
215    .fetch_one(&mut *tx)
216    .await?;
217
218    sqlx::query!(
219        "UPDATE user_details SET email = $1 WHERE user_id = $2",
220        new_email,
221        user.id,
222    )
223    .execute(&mut *tx)
224    .await?;
225
226    let email_domain = new_email.trim().split('@').next_back();
227    sqlx::query!(
228        "UPDATE users SET email_domain = $1 WHERE id = $2",
229        email_domain,
230        user.id,
231    )
232    .execute(&mut *tx)
233    .await?;
234
235    tx.commit().await?;
236
237    info!("Email change succeeded");
238    Ok(())
239}
240
241pub async fn delete_user(conn: &mut PgConnection, id: Uuid) -> ModelResult<()> {
242    info!("Deleting user {id}");
243    let mut tx = conn.begin().await?;
244    crate::email_deliveries::soft_delete_unsent_retryable_deliveries_for_user(&mut tx, id).await?;
245    sqlx::query!("DELETE FROM user_details WHERE user_id = $1", id,)
246        .execute(&mut *tx)
247        .await?;
248    sqlx::query!("DELETE FROM user_passwords WHERE user_id = $1", id,)
249        .execute(&mut *tx)
250        .await?;
251    sqlx::query!(
252        "UPDATE users set deleted_at = now() WHERE id = $1 AND deleted_at IS NULL",
253        id,
254    )
255    .execute(&mut *tx)
256    .await?;
257    sqlx::query!(
258        "UPDATE roles set deleted_at = now() WHERE user_id = $1 AND deleted_at IS NULL",
259        id,
260    )
261    .execute(&mut *tx)
262    .await?;
263    tx.commit().await?;
264    info!("Deletion succeeded");
265    Ok(())
266}