1use itertools::multiunzip;
2use serde_json::json;
3use utoipa::ToSchema;
4
5use crate::prelude::*;
6
7#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, ToSchema)]
8
9pub struct UserMarketingConsent {
10 pub id: Uuid,
11 pub course_id: Uuid,
12 pub course_language_group_id: Uuid,
13 pub user_id: Uuid,
14 pub user_mailchimp_id: Option<String>,
15 pub consent: bool,
16 pub email_subscription_in_mailchimp: Option<String>,
17 pub created_at: DateTime<Utc>,
18 pub updated_at: DateTime<Utc>,
19 pub deleted_at: Option<DateTime<Utc>>,
20 pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
21}
22
23#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
24
25pub struct UserMarketingConsentWithDetails {
26 pub id: Uuid,
27 pub course_id: Uuid,
28 pub course_language_group_id: Uuid,
29 pub user_id: Uuid,
30 pub user_mailchimp_id: Option<String>,
31 pub consent: bool,
32 pub email_subscription_in_mailchimp: Option<String>,
33 pub created_at: DateTime<Utc>,
34 pub updated_at: DateTime<Utc>,
35 pub deleted_at: Option<DateTime<Utc>>,
36 pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
37 pub first_name: Option<String>,
38 pub last_name: Option<String>,
39 pub email: String,
40 pub course_name: String,
41 pub locale: Option<String>,
42 pub completed_course_at: Option<DateTime<Utc>>,
43 pub research_consent: Option<bool>,
44 pub country: Option<String>,
45}
46
47#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
48
49pub struct UserEmailSubscription {
50 pub user_id: Uuid,
51 pub email: String,
52 pub email_subscription_in_mailchimp: Option<String>,
53 pub user_mailchimp_id: Option<String>,
54}
55
56#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
57
58pub struct MarketingMailingListAccessToken {
59 pub id: Uuid,
60 pub course_id: Uuid,
61 pub mailchimp_mailing_list_id: String,
62 pub course_language_group_id: Uuid,
63 pub server_prefix: String,
64 pub access_token: String,
65 pub created_at: DateTime<Utc>,
66 pub updated_at: DateTime<Utc>,
67 pub deleted_at: Option<DateTime<Utc>>,
68}
69
70#[derive(Debug, FromRow)]
71pub struct MailchimpCourseTag {
72 pub id: Uuid,
73 pub marketing_mailing_list_access_token_id: Uuid,
74 pub course_language_group_id: Uuid,
75 pub tag_name: String,
76 pub tag_id: String,
77 pub created_at: DateTime<Utc>,
78 pub updated_at: DateTime<Utc>,
79 pub deleted_at: Option<DateTime<Utc>>,
80}
81
82#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
83
84pub struct MailchimpLanguageCodeMapping {
85 pub id: Uuid,
86 pub marketing_mailing_list_access_token_id: Uuid,
87 pub our_language_code: String,
88 pub mailchimp_language_code: String,
89 pub created_at: DateTime<Utc>,
90 pub updated_at: DateTime<Utc>,
91 pub deleted_at: Option<DateTime<Utc>>,
92}
93
94pub async fn upsert_marketing_consent(
95 conn: &mut PgConnection,
96 course_id: Uuid,
97 course_language_group_id: Uuid,
98 user_id: &Uuid,
99 email_subscription: &str,
100 marketing_consent: bool,
101) -> sqlx::Result<Uuid> {
102 let result = sqlx::query!(
103 r#"
104 INSERT INTO user_marketing_consents (user_id, course_id, course_language_group_id, consent, email_subscription_in_mailchimp)
105 VALUES ($1, $2, $3, $4, $5)
106 ON CONFLICT (user_id, course_language_group_id)
107 DO UPDATE
108 SET
109 course_id = $2,
110 consent = $4,
111 email_subscription_in_mailchimp = $5
112 RETURNING id
113 "#,
114 user_id,
115 course_id,
116 course_language_group_id,
117 marketing_consent,
118 email_subscription
119 )
120 .fetch_one(conn)
121 .await?;
122
123 Ok(result.id)
124}
125
126pub async fn fetch_user_marketing_consent(
127 conn: &mut PgConnection,
128 course_id: Uuid,
129 user_id: &Uuid,
130) -> sqlx::Result<UserMarketingConsent> {
131 let result = sqlx::query_as!(
132 UserMarketingConsent,
133 "
134 SELECT *
135 FROM user_marketing_consents
136 WHERE user_id = $1 AND course_id = $2
137 ",
138 user_id,
139 course_id,
140 )
141 .fetch_one(conn)
142 .await?;
143
144 Ok(result)
145}
146
147pub async fn fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
149 conn: &mut PgConnection,
150 course_language_group_id: Uuid,
151) -> sqlx::Result<Vec<UserMarketingConsentWithDetails>> {
152 let result = sqlx::query_as!(
153 UserMarketingConsentWithDetails,
154 r#"
155 SELECT
156 umc.id,
157 umc.course_id,
158 umc.course_language_group_id,
159 umc.user_id,
160 umc.user_mailchimp_id,
161 umc.consent,
162 umc.email_subscription_in_mailchimp,
163 umc.created_at,
164 umc.updated_at,
165 umc.deleted_at,
166 umc.synced_to_mailchimp_at,
167 u.first_name AS first_name,
168 u.last_name AS last_name,
169 u.country AS country,
170 u.email AS email,
171 c.name AS course_name,
172 COALESCE(
173 mlcm.mailchimp_language_code,
174 c.language_code
175 ) AS locale,
176 CASE WHEN cmc.passed IS NOT NULL THEN cmc.completion_date ELSE NULL END AS completed_course_at,
177 COALESCE(csfa.research_consent, urc.research_consent) AS research_consent
178 FROM user_marketing_consents AS umc
179 JOIN user_details AS u ON u.user_id = umc.user_id
180 JOIN courses AS c ON c.id = umc.course_id
181 LEFT JOIN course_module_completions AS cmc
182 ON cmc.user_id = umc.user_id AND cmc.course_id = umc.course_id
183 LEFT JOIN course_specific_consent_form_answers AS csfa
184 ON csfa.course_id = umc.course_id AND csfa.user_id = umc.user_id
185 LEFT JOIN user_research_consents AS urc
186 ON urc.user_id = umc.user_id
187 LEFT JOIN mailchimp_course_tags AS tags
188 ON tags.course_language_group_id = umc.course_language_group_id
189 LEFT JOIN marketing_mailing_list_access_tokens AS mmlat
190 ON mmlat.course_language_group_id = umc.course_language_group_id AND mmlat.deleted_at IS NULL
191 LEFT JOIN mailchimp_language_code_mappings AS mlcm
192 ON mlcm.marketing_mailing_list_access_token_id = mmlat.id
193 AND mlcm.our_language_code = c.language_code
194 AND mlcm.deleted_at IS NULL
195 WHERE umc.course_language_group_id = $1
196 AND (
197 umc.synced_to_mailchimp_at IS NULL
198 OR umc.synced_to_mailchimp_at < umc.updated_at
199 OR csfa.updated_at > umc.synced_to_mailchimp_at
200 OR urc.updated_at > umc.synced_to_mailchimp_at
201 OR cmc.updated_at > umc.synced_to_mailchimp_at
202 OR u.updated_at > umc.synced_to_mailchimp_at
203 OR c.updated_at > umc.synced_to_mailchimp_at
204 OR mlcm.updated_at > umc.synced_to_mailchimp_at
205 OR EXISTS (
206 SELECT 1
207 FROM mailchimp_course_tags
208 WHERE mailchimp_course_tags.course_language_group_id = umc.course_language_group_id
209 AND mailchimp_course_tags.updated_at > umc.synced_to_mailchimp_at
210 AND deleted_at IS NULL
211 )
212 )
213 "#,
214 course_language_group_id
215 )
216 .fetch_all(conn)
217 .await?;
218
219 Ok(result)
220}
221
222pub async fn fetch_user_marketing_consents_with_details_by_user_ids(
223 conn: &mut PgConnection,
224 course_language_group_id: Uuid,
225 user_ids: &[Uuid],
226) -> sqlx::Result<Vec<UserMarketingConsentWithDetails>> {
227 if user_ids.is_empty() {
228 return Ok(vec![]);
229 }
230
231 let result = sqlx::query_as!(
232 UserMarketingConsentWithDetails,
233 r#"
234 SELECT
235 umc.id,
236 umc.course_id,
237 umc.course_language_group_id,
238 umc.user_id,
239 umc.user_mailchimp_id,
240 umc.consent,
241 umc.email_subscription_in_mailchimp,
242 umc.created_at,
243 umc.updated_at,
244 umc.deleted_at,
245 umc.synced_to_mailchimp_at,
246 u.first_name AS first_name,
247 u.last_name AS last_name,
248 u.country AS country,
249 u.email AS email,
250 c.name AS course_name,
251 COALESCE(
252 mlcm.mailchimp_language_code,
253 c.language_code
254 ) AS locale,
255 CASE WHEN cmc.passed IS NOT NULL THEN cmc.completion_date ELSE NULL END AS completed_course_at,
256 COALESCE(csfa.research_consent, urc.research_consent) AS research_consent
257 FROM user_marketing_consents AS umc
258 JOIN user_details AS u ON u.user_id = umc.user_id
259 JOIN courses AS c ON c.id = umc.course_id
260 LEFT JOIN course_module_completions AS cmc
261 ON cmc.user_id = umc.user_id AND cmc.course_id = umc.course_id
262 LEFT JOIN course_specific_consent_form_answers AS csfa
263 ON csfa.course_id = umc.course_id AND csfa.user_id = umc.user_id
264 LEFT JOIN user_research_consents AS urc
265 ON urc.user_id = umc.user_id
266 LEFT JOIN marketing_mailing_list_access_tokens AS mmlat
267 ON mmlat.course_language_group_id = umc.course_language_group_id AND mmlat.deleted_at IS NULL
268 LEFT JOIN mailchimp_language_code_mappings AS mlcm
269 ON mlcm.marketing_mailing_list_access_token_id = mmlat.id
270 AND mlcm.our_language_code = c.language_code
271 AND mlcm.deleted_at IS NULL
272 WHERE umc.course_language_group_id = $1
273 AND umc.user_id = ANY($2::uuid[])
274 AND umc.deleted_at IS NULL
275 "#,
276 course_language_group_id,
277 user_ids
278 )
279 .fetch_all(conn)
280 .await?;
281
282 Ok(result)
283}
284
285pub async fn fetch_user_mailchimp_id_mapping(
287 conn: &mut PgConnection,
288 course_language_group_id: Uuid,
289 user_ids: &[Uuid],
290) -> sqlx::Result<std::collections::HashMap<Uuid, String>> {
291 if user_ids.is_empty() {
292 return Ok(std::collections::HashMap::new());
293 }
294 let rows = sqlx::query!(
295 r#"
296 SELECT user_id, user_mailchimp_id
297 FROM user_marketing_consents
298 WHERE course_language_group_id = $1 AND user_id = ANY($2::uuid[]) AND deleted_at IS NULL
299 "#,
300 course_language_group_id,
301 user_ids
302 )
303 .fetch_all(conn)
304 .await?;
305
306 let map = rows
307 .into_iter()
308 .filter_map(|r| r.user_mailchimp_id.map(|id| (r.user_id, id)))
309 .collect();
310 Ok(map)
311}
312
313pub async fn fetch_all_unsynced_updated_emails(
315 conn: &mut PgConnection,
316 course_language_group_id: Uuid,
317) -> sqlx::Result<Vec<UserEmailSubscription>> {
318 let result = sqlx::query_as!(
319 UserEmailSubscription,
320 "
321 SELECT
322 umc.user_id,
323 u.email AS email,
324 umc.email_subscription_in_mailchimp,
325 umc.user_mailchimp_id
326 FROM user_marketing_consents AS umc
327 JOIN user_details AS u ON u.user_id = umc.user_id
328 WHERE umc.course_language_group_id = $1
329 AND umc.synced_to_mailchimp_at < u.updated_at
330 ",
331 course_language_group_id
332 )
333 .fetch_all(conn)
334 .await?;
335
336 Ok(result)
337}
338
339pub async fn update_synced_to_mailchimp_at_to_all_synced_users(
341 conn: &mut PgConnection,
342 ids: &[Uuid],
343) -> ModelResult<()> {
344 sqlx::query!(
345 "
346UPDATE user_marketing_consents
347SET synced_to_mailchimp_at = now()
348WHERE user_id IN (
349 SELECT UNNEST($1::uuid [])
350 )
351",
352 &ids
353 )
354 .execute(conn)
355 .await?;
356 Ok(())
357}
358
359pub async fn update_user_mailchimp_id_at_to_all_synced_users(
361 pool: &mut PgConnection,
362 user_contact_pairs: Vec<(String, String)>,
363) -> ModelResult<()> {
364 let (user_ids_raw, user_mailchimp_ids): (Vec<_>, Vec<_>) =
365 user_contact_pairs.into_iter().unzip();
366
367 let user_ids: Vec<Uuid> = user_ids_raw
369 .into_iter()
370 .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
371 .collect();
372
373 sqlx::query!(
374 "
375UPDATE user_marketing_consents
376SET user_mailchimp_id = updated_data.user_mailchimp_id
377FROM (
378 SELECT UNNEST($1::uuid[]) AS user_id, UNNEST($2::text[]) AS user_mailchimp_id
379) AS updated_data
380WHERE user_marketing_consents.user_id = updated_data.user_id
381",
382 &user_ids,
383 &user_mailchimp_ids
384 )
385 .execute(pool)
386 .await?;
387 Ok(())
388}
389
390pub async fn update_unsubscribed_users_from_mailchimp_in_bulk(
392 conn: &mut PgConnection,
393 mailchimp_data: Vec<(String, String, String, String)>,
394) -> anyhow::Result<()> {
395 let (
396 user_ids_raw,
397 timestamps_raw,
398 course_language_group_ids_raw,
399 email_subscriptions_in_mailchimp,
400 ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>) = multiunzip(mailchimp_data);
401
402 let user_ids: Vec<Uuid> = user_ids_raw
403 .into_iter()
404 .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
405 .collect();
406
407 let timestamps: Vec<DateTime<Utc>> = timestamps_raw
408 .into_iter()
409 .filter_map(|ts| {
410 DateTime::parse_from_rfc3339(&ts)
411 .ok()
412 .map(|dt| dt.with_timezone(&Utc))
413 })
414 .collect();
415
416 let course_language_group_ids: Vec<Uuid> = course_language_group_ids_raw
417 .into_iter()
418 .filter_map(|lang_id| Uuid::parse_str(&lang_id).ok())
419 .collect();
420
421 sqlx::query!(
422 "
423 UPDATE user_marketing_consents
424 SET consent = false,
425 email_subscription_in_mailchimp = updated_data.email_subscription_in_mailchimp,
426 synced_to_mailchimp_at = updated_data.last_updated
427 FROM (
428 SELECT UNNEST($1::Uuid[]) AS user_id,
429 UNNEST($2::timestamptz[]) AS last_updated,
430 UNNEST($3::Uuid[]) AS course_language_group_id,
431 UNNEST($4::text[]) AS email_subscription_in_mailchimp
432
433 ) AS updated_data
434 WHERE user_marketing_consents.user_id = updated_data.user_id
435 AND user_marketing_consents.consent = true
436 AND user_marketing_consents.synced_to_mailchimp_at < updated_data.last_updated
437 AND user_marketing_consents.course_language_group_id = updated_data.course_language_group_id
438 ",
439 &user_ids,
440 ×tamps,
441 &course_language_group_ids,
442 &email_subscriptions_in_mailchimp
443 )
444 .execute(conn)
445 .await?;
446 Ok(())
447}
448
449pub async fn fetch_all_marketing_mailing_list_access_tokens(
450 conn: &mut PgConnection,
451) -> sqlx::Result<Vec<MarketingMailingListAccessToken>> {
452 let results = sqlx::query_as!(
453 MarketingMailingListAccessToken,
454 "
455 SELECT
456 id,
457 course_id,
458 course_language_group_id,
459 server_prefix,
460 access_token,
461 mailchimp_mailing_list_id,
462 created_at,
463 updated_at,
464 deleted_at
465 FROM marketing_mailing_list_access_tokens
466 "
467 )
468 .fetch_all(conn)
469 .await?;
470
471 Ok(results)
472}
473
474pub async fn fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
475 conn: &mut PgConnection,
476 course_language_group_id: Uuid,
477 marketing_mailing_access_token_id: Uuid,
478) -> sqlx::Result<Vec<serde_json::Value>> {
479 let results = sqlx::query!(
480 "
481 SELECT
482 tag_name,
483 tag_id
484 FROM mailchimp_course_tags
485 WHERE course_language_group_id = $1
486 AND marketing_mailing_list_access_token_id = $2
487 AND deleted_at IS NULL
488 ",
489 course_language_group_id,
490 marketing_mailing_access_token_id
491 )
492 .fetch_all(conn)
493 .await?;
494
495 let tag_objects: Vec<serde_json::Value> = results
496 .into_iter()
497 .filter_map(|row| {
498 let tag_name = row.tag_name.trim();
499 let tag_id = row.tag_id.trim();
500
501 if tag_name.is_empty() {
502 return None;
503 }
504
505 Some(json!({
506 "name": tag_name,
507 "id": tag_id,
508 "status": "active"
509 }))
510 })
511 .collect();
512
513 Ok(tag_objects)
514}
515
516pub async fn upsert_tag(
517 conn: &mut PgConnection,
518 course_language_group_id: Uuid,
519 marketing_mailing_access_token_id: Uuid,
520 tag_id: String,
521 tag_name: String,
522) -> sqlx::Result<()> {
523 sqlx::query!(
524 "
525 INSERT INTO mailchimp_course_tags (
526 tag_name,
527 tag_id,
528 course_language_group_id,
529 marketing_mailing_list_access_token_id
530 )
531 VALUES ($1, $2, $3, $4)
532 ON CONFLICT (course_language_group_id, tag_name)
533 DO UPDATE
534 SET
535 tag_name = EXCLUDED.tag_name
536 ",
537 tag_name,
538 tag_id,
539 course_language_group_id,
540 marketing_mailing_access_token_id,
541 )
542 .execute(&mut *conn)
543 .await?;
544 Ok(())
545}
546
547pub async fn delete_tag(
548 conn: &mut PgConnection,
549 deleted_tag_id: String,
550 course_language_group_id: Uuid,
551) -> sqlx::Result<()> {
552 sqlx::query!(
553 "
554 UPDATE mailchimp_course_tags
555 SET deleted_at = now()
556 WHERE tag_id = $1
557 AND course_language_group_id = $2
558 AND deleted_at IS NULL
559 ",
560 deleted_tag_id,
561 course_language_group_id
562 )
563 .execute(&mut *conn)
564 .await?;
565
566 Ok(())
567}