1use crate::prelude::*;
2
3#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
4
5pub struct User {
6 pub id: Uuid,
7 pub created_at: DateTime<Utc>,
8 pub updated_at: DateTime<Utc>,
9 pub deleted_at: Option<DateTime<Utc>>,
10 pub upstream_id: Option<i32>,
11 pub email_domain: Option<String>,
12}
13
14pub async fn insert(
15 conn: &mut PgConnection,
16 pkey_policy: PKeyPolicy<Uuid>,
17 email: &str,
18 first_name: Option<&str>,
19 last_name: Option<&str>,
20) -> ModelResult<Uuid> {
21 let mut tx = conn.begin().await?;
22 let email_domain = email.trim().split('@').next_back();
23 let res = sqlx::query!(
24 "
25INSERT INTO users (id, email_domain)
26VALUES ($1, $2)
27RETURNING id
28",
29 pkey_policy.into_uuid(),
30 email_domain
31 )
32 .fetch_one(&mut *tx)
33 .await?;
34
35 let _res2 = sqlx::query!(
36 "
37INSERT INTO user_details (user_id, email, first_name, last_name)
38VALUES ($1, $2, $3, $4)
39",
40 res.id,
41 email,
42 first_name,
43 last_name
44 )
45 .execute(&mut *tx)
46 .await?;
47 tx.commit().await?;
48 Ok(res.id)
49}
50
51pub async fn insert_with_upstream_id_and_moocfi_id(
52 conn: &mut PgConnection,
53 email: &str,
54 first_name: Option<&str>,
55 last_name: Option<&str>,
56 upstream_id: i32,
57 moocfi_id: Uuid,
58) -> ModelResult<User> {
59 info!("The user is not in the database yet, inserting");
60 let email_domain = email.trim().split('@').next_back();
61 let mut tx = conn.begin().await?;
62 let user = sqlx::query_as!(
63 User,
64 r#"
65INSERT INTO
66 users (id, upstream_id, email_domain)
67VALUES ($1, $2, $3)
68RETURNING *;
69 "#,
70 moocfi_id,
71 upstream_id,
72 email_domain
73 )
74 .fetch_one(&mut *tx)
75 .await?;
76
77 let _res2 = sqlx::query!(
78 "
79INSERT INTO user_details (user_id, email, first_name, last_name)
80VALUES ($1, $2, $3, $4)
81",
82 user.id,
83 email,
84 first_name,
85 last_name
86 )
87 .execute(&mut *tx)
88 .await?;
89 tx.commit().await?;
90 Ok(user)
91}
92
93pub async fn get_by_email(conn: &mut PgConnection, email: &str) -> ModelResult<User> {
95 let user = sqlx::query_as!(
96 User,
97 "
98SELECT users.*
99FROM user_details
100JOIN users ON (user_details.user_id = users.id)
101WHERE lower(user_details.email) = lower($1)
102 ",
103 email
104 )
105 .fetch_one(conn)
106 .await?;
107 Ok(user)
108}
109
110pub async fn get_by_id(conn: &mut PgConnection, id: Uuid) -> ModelResult<User> {
111 let user = sqlx::query_as!(
112 User,
113 "
114SELECT *
115FROM users
116WHERE id = $1
117 ",
118 id
119 )
120 .fetch_one(conn)
121 .await?;
122 Ok(user)
123}
124
125pub async fn find_by_upstream_id(
126 conn: &mut PgConnection,
127 upstream_id: i32,
128) -> ModelResult<Option<User>> {
129 let user = sqlx::query_as!(
130 User,
131 "SELECT * FROM users WHERE upstream_id = $1",
132 upstream_id
133 )
134 .fetch_optional(conn)
135 .await?;
136 Ok(user)
137}
138
139pub async fn get_all_user_ids_with_user_exercise_states_on_course(
141 conn: &mut PgConnection,
142 course_id: Uuid,
143) -> ModelResult<Vec<Uuid>> {
144 let res = sqlx::query!(
145 "
146SELECT DISTINCT user_id
147FROM user_exercise_states
148WHERE course_id = $1
149 AND deleted_at IS NULL
150 ",
151 course_id
152 )
153 .map(|x| x.user_id)
154 .fetch_all(conn)
155 .await?;
156 Ok(res)
157}
158
159pub async fn get_users_by_course_instance_enrollment(
160 conn: &mut PgConnection,
161 course_instance_id: Uuid,
162) -> ModelResult<Vec<User>> {
163 let res = sqlx::query_as!(
164 User,
165 "
166SELECT *
167FROM users
168WHERE id IN (
169 SELECT user_id
170 FROM course_instance_enrollments
171 WHERE course_instance_id = $1
172 AND deleted_at IS NULL
173 )
174",
175 course_instance_id,
176 )
177 .fetch_all(&mut *conn)
178 .await?;
179 Ok(res)
180}
181
182pub async fn get_users_ids_in_db_from_upstream_ids(
183 conn: &mut PgConnection,
184 upstream_ids: &[i32],
185) -> ModelResult<Vec<Uuid>> {
186 let res = sqlx::query!(
187 "
188SELECT id
189FROM users
190WHERE upstream_id IN (
191 SELECT UNNEST($1::integer [])
192 )
193AND deleted_at IS NULL
194",
195 upstream_ids,
196 )
197 .fetch_all(&mut *conn)
198 .await?;
199 Ok(res.iter().map(|x| x.id).collect::<Vec<_>>())
200}
201
202pub async fn update_email_for_user(
203 conn: &mut PgConnection,
204 upstream_id: &i32,
205 new_email: String,
206) -> ModelResult<()> {
207 info!("Updating user (Upstream id: {upstream_id})");
208 let mut tx = conn.begin().await?;
209
210 let user = sqlx::query_as!(
211 User,
212 "SELECT * FROM users WHERE upstream_id = $1",
213 upstream_id
214 )
215 .fetch_one(&mut *tx)
216 .await?;
217
218 sqlx::query!(
219 "UPDATE user_details SET email = $1 WHERE user_id = $2",
220 new_email,
221 user.id,
222 )
223 .execute(&mut *tx)
224 .await?;
225
226 let email_domain = new_email.trim().split('@').next_back();
227 sqlx::query!(
228 "UPDATE users SET email_domain = $1 WHERE id = $2",
229 email_domain,
230 user.id,
231 )
232 .execute(&mut *tx)
233 .await?;
234
235 tx.commit().await?;
236
237 info!("Email change succeeded");
238 Ok(())
239}
240
241pub async fn delete_user(conn: &mut PgConnection, id: Uuid) -> ModelResult<()> {
242 info!("Deleting user {id}");
243 let mut tx = conn.begin().await?;
244 crate::email_deliveries::soft_delete_unsent_retryable_deliveries_for_user(&mut tx, id).await?;
245 sqlx::query!("DELETE FROM user_details WHERE user_id = $1", id,)
246 .execute(&mut *tx)
247 .await?;
248 sqlx::query!("DELETE FROM user_passwords WHERE user_id = $1", id,)
249 .execute(&mut *tx)
250 .await?;
251 sqlx::query!(
252 "UPDATE users set deleted_at = now() WHERE id = $1 AND deleted_at IS NULL",
253 id,
254 )
255 .execute(&mut *tx)
256 .await?;
257 sqlx::query!(
258 "UPDATE roles set deleted_at = now() WHERE user_id = $1 AND deleted_at IS NULL",
259 id,
260 )
261 .execute(&mut *tx)
262 .await?;
263 tx.commit().await?;
264 info!("Deletion succeeded");
265 Ok(())
266}