1use itertools::multiunzip;
2use serde_json::json;
3
4use crate::prelude::*;
5
6#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
7#[cfg_attr(feature = "ts_rs", derive(TS))]
8pub struct UserMarketingConsent {
9 pub id: Uuid,
10 pub course_id: Uuid,
11 pub course_language_group_id: Uuid,
12 pub user_id: Uuid,
13 pub user_mailchimp_id: Option<String>,
14 pub consent: bool,
15 pub email_subscription_in_mailchimp: Option<String>,
16 pub created_at: DateTime<Utc>,
17 pub updated_at: DateTime<Utc>,
18 pub deleted_at: Option<DateTime<Utc>>,
19 pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
20}
21
22#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
23#[cfg_attr(feature = "ts_rs", derive(TS))]
24pub struct UserMarketingConsentWithDetails {
25 pub id: Uuid,
26 pub course_id: Uuid,
27 pub course_language_group_id: Uuid,
28 pub user_id: Uuid,
29 pub user_mailchimp_id: Option<String>,
30 pub consent: bool,
31 pub email_subscription_in_mailchimp: Option<String>,
32 pub created_at: DateTime<Utc>,
33 pub updated_at: DateTime<Utc>,
34 pub deleted_at: Option<DateTime<Utc>>,
35 pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
36 pub first_name: Option<String>,
37 pub last_name: Option<String>,
38 pub email: String,
39 pub course_name: String,
40 pub locale: Option<String>,
41 pub completed_course_at: Option<DateTime<Utc>>,
42 pub research_consent: Option<bool>,
43 pub country: Option<String>,
44}
45
46#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
47#[cfg_attr(feature = "ts_rs", derive(TS))]
48pub struct UserEmailSubscription {
49 pub user_id: Uuid,
50 pub email: String,
51 pub email_subscription_in_mailchimp: Option<String>,
52 pub user_mailchimp_id: Option<String>,
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
56#[cfg_attr(feature = "ts_rs", derive(TS))]
57pub struct MarketingMailingListAccessToken {
58 pub id: Uuid,
59 pub course_id: Uuid,
60 pub mailchimp_mailing_list_id: String,
61 pub course_language_group_id: Uuid,
62 pub server_prefix: String,
63 pub access_token: String,
64 pub created_at: DateTime<Utc>,
65 pub updated_at: DateTime<Utc>,
66 pub deleted_at: Option<DateTime<Utc>>,
67}
68
69#[derive(Debug, FromRow)]
70pub struct MailchimpCourseTag {
71 pub id: Uuid,
72 pub marketing_mailing_list_access_token_id: Uuid,
73 pub course_language_group_id: Uuid,
74 pub tag_name: String,
75 pub tag_id: String,
76 pub created_at: DateTime<Utc>,
77 pub updated_at: DateTime<Utc>,
78 pub deleted_at: Option<DateTime<Utc>>,
79}
80
81#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
82#[cfg_attr(feature = "ts_rs", derive(TS))]
83pub struct MailchimpLanguageCodeMapping {
84 pub id: Uuid,
85 pub marketing_mailing_list_access_token_id: Uuid,
86 pub our_language_code: String,
87 pub mailchimp_language_code: String,
88 pub created_at: DateTime<Utc>,
89 pub updated_at: DateTime<Utc>,
90 pub deleted_at: Option<DateTime<Utc>>,
91}
92
93pub async fn upsert_marketing_consent(
94 conn: &mut PgConnection,
95 course_id: Uuid,
96 course_language_group_id: Uuid,
97 user_id: &Uuid,
98 email_subscription: &str,
99 marketing_consent: bool,
100) -> sqlx::Result<Uuid> {
101 let result = sqlx::query!(
102 r#"
103 INSERT INTO user_marketing_consents (user_id, course_id, course_language_group_id, consent, email_subscription_in_mailchimp)
104 VALUES ($1, $2, $3, $4, $5)
105 ON CONFLICT (user_id, course_language_group_id)
106 DO UPDATE
107 SET
108 course_id = $2,
109 consent = $4,
110 email_subscription_in_mailchimp = $5
111 RETURNING id
112 "#,
113 user_id,
114 course_id,
115 course_language_group_id,
116 marketing_consent,
117 email_subscription
118 )
119 .fetch_one(conn)
120 .await?;
121
122 Ok(result.id)
123}
124
125pub async fn fetch_user_marketing_consent(
126 conn: &mut PgConnection,
127 course_id: Uuid,
128 user_id: &Uuid,
129) -> sqlx::Result<UserMarketingConsent> {
130 let result = sqlx::query_as!(
131 UserMarketingConsent,
132 "
133 SELECT *
134 FROM user_marketing_consents
135 WHERE user_id = $1 AND course_id = $2
136 ",
137 user_id,
138 course_id,
139 )
140 .fetch_one(conn)
141 .await?;
142
143 Ok(result)
144}
145
146pub async fn fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
148 conn: &mut PgConnection,
149 course_language_group_id: Uuid,
150) -> sqlx::Result<Vec<UserMarketingConsentWithDetails>> {
151 let result = sqlx::query_as!(
152 UserMarketingConsentWithDetails,
153 r#"
154 SELECT
155 umc.id,
156 umc.course_id,
157 umc.course_language_group_id,
158 umc.user_id,
159 umc.user_mailchimp_id,
160 umc.consent,
161 umc.email_subscription_in_mailchimp,
162 umc.created_at,
163 umc.updated_at,
164 umc.deleted_at,
165 umc.synced_to_mailchimp_at,
166 u.first_name AS first_name,
167 u.last_name AS last_name,
168 u.country AS country,
169 u.email AS email,
170 c.name AS course_name,
171 COALESCE(
172 mlcm.mailchimp_language_code,
173 c.language_code
174 ) AS locale,
175 CASE WHEN cmc.passed IS NOT NULL THEN cmc.completion_date ELSE NULL END AS completed_course_at,
176 COALESCE(csfa.research_consent, urc.research_consent) AS research_consent
177 FROM user_marketing_consents AS umc
178 JOIN user_details AS u ON u.user_id = umc.user_id
179 JOIN courses AS c ON c.id = umc.course_id
180 LEFT JOIN course_module_completions AS cmc
181 ON cmc.user_id = umc.user_id AND cmc.course_id = umc.course_id
182 LEFT JOIN course_specific_consent_form_answers AS csfa
183 ON csfa.course_id = umc.course_id AND csfa.user_id = umc.user_id
184 LEFT JOIN user_research_consents AS urc
185 ON urc.user_id = umc.user_id
186 LEFT JOIN mailchimp_course_tags AS tags
187 ON tags.course_language_group_id = umc.course_language_group_id
188 LEFT JOIN marketing_mailing_list_access_tokens AS mmlat
189 ON mmlat.course_language_group_id = umc.course_language_group_id AND mmlat.deleted_at IS NULL
190 LEFT JOIN mailchimp_language_code_mappings AS mlcm
191 ON mlcm.marketing_mailing_list_access_token_id = mmlat.id
192 AND mlcm.our_language_code = c.language_code
193 AND mlcm.deleted_at IS NULL
194 WHERE umc.course_language_group_id = $1
195 AND (
196 umc.synced_to_mailchimp_at IS NULL
197 OR umc.synced_to_mailchimp_at < umc.updated_at
198 OR csfa.updated_at > umc.synced_to_mailchimp_at
199 OR urc.updated_at > umc.synced_to_mailchimp_at
200 OR cmc.updated_at > umc.synced_to_mailchimp_at
201 OR u.updated_at > umc.synced_to_mailchimp_at
202 OR c.updated_at > umc.synced_to_mailchimp_at
203 OR mlcm.updated_at > umc.synced_to_mailchimp_at
204 OR EXISTS (
205 SELECT 1
206 FROM mailchimp_course_tags
207 WHERE mailchimp_course_tags.course_language_group_id = umc.course_language_group_id
208 AND mailchimp_course_tags.updated_at > umc.synced_to_mailchimp_at
209 AND deleted_at IS NULL
210 )
211 )
212 "#,
213 course_language_group_id
214 )
215 .fetch_all(conn)
216 .await?;
217
218 Ok(result)
219}
220
221pub async fn fetch_user_marketing_consents_with_details_by_user_ids(
222 conn: &mut PgConnection,
223 course_language_group_id: Uuid,
224 user_ids: &[Uuid],
225) -> sqlx::Result<Vec<UserMarketingConsentWithDetails>> {
226 if user_ids.is_empty() {
227 return Ok(vec![]);
228 }
229
230 let result = sqlx::query_as!(
231 UserMarketingConsentWithDetails,
232 r#"
233 SELECT
234 umc.id,
235 umc.course_id,
236 umc.course_language_group_id,
237 umc.user_id,
238 umc.user_mailchimp_id,
239 umc.consent,
240 umc.email_subscription_in_mailchimp,
241 umc.created_at,
242 umc.updated_at,
243 umc.deleted_at,
244 umc.synced_to_mailchimp_at,
245 u.first_name AS first_name,
246 u.last_name AS last_name,
247 u.country AS country,
248 u.email AS email,
249 c.name AS course_name,
250 COALESCE(
251 mlcm.mailchimp_language_code,
252 c.language_code
253 ) AS locale,
254 CASE WHEN cmc.passed IS NOT NULL THEN cmc.completion_date ELSE NULL END AS completed_course_at,
255 COALESCE(csfa.research_consent, urc.research_consent) AS research_consent
256 FROM user_marketing_consents AS umc
257 JOIN user_details AS u ON u.user_id = umc.user_id
258 JOIN courses AS c ON c.id = umc.course_id
259 LEFT JOIN course_module_completions AS cmc
260 ON cmc.user_id = umc.user_id AND cmc.course_id = umc.course_id
261 LEFT JOIN course_specific_consent_form_answers AS csfa
262 ON csfa.course_id = umc.course_id AND csfa.user_id = umc.user_id
263 LEFT JOIN user_research_consents AS urc
264 ON urc.user_id = umc.user_id
265 LEFT JOIN marketing_mailing_list_access_tokens AS mmlat
266 ON mmlat.course_language_group_id = umc.course_language_group_id AND mmlat.deleted_at IS NULL
267 LEFT JOIN mailchimp_language_code_mappings AS mlcm
268 ON mlcm.marketing_mailing_list_access_token_id = mmlat.id
269 AND mlcm.our_language_code = c.language_code
270 AND mlcm.deleted_at IS NULL
271 WHERE umc.course_language_group_id = $1
272 AND umc.user_id = ANY($2::uuid[])
273 AND umc.deleted_at IS NULL
274 "#,
275 course_language_group_id,
276 user_ids
277 )
278 .fetch_all(conn)
279 .await?;
280
281 Ok(result)
282}
283
284pub async fn fetch_user_mailchimp_id_mapping(
286 conn: &mut PgConnection,
287 course_language_group_id: Uuid,
288 user_ids: &[Uuid],
289) -> sqlx::Result<std::collections::HashMap<Uuid, String>> {
290 if user_ids.is_empty() {
291 return Ok(std::collections::HashMap::new());
292 }
293 let rows = sqlx::query!(
294 r#"
295 SELECT user_id, user_mailchimp_id
296 FROM user_marketing_consents
297 WHERE course_language_group_id = $1 AND user_id = ANY($2::uuid[]) AND deleted_at IS NULL
298 "#,
299 course_language_group_id,
300 user_ids
301 )
302 .fetch_all(conn)
303 .await?;
304
305 let map = rows
306 .into_iter()
307 .filter_map(|r| r.user_mailchimp_id.map(|id| (r.user_id, id)))
308 .collect();
309 Ok(map)
310}
311
312pub async fn fetch_all_unsynced_updated_emails(
314 conn: &mut PgConnection,
315 course_language_group_id: Uuid,
316) -> sqlx::Result<Vec<UserEmailSubscription>> {
317 let result = sqlx::query_as!(
318 UserEmailSubscription,
319 "
320 SELECT
321 umc.user_id,
322 u.email AS email,
323 umc.email_subscription_in_mailchimp,
324 umc.user_mailchimp_id
325 FROM user_marketing_consents AS umc
326 JOIN user_details AS u ON u.user_id = umc.user_id
327 WHERE umc.course_language_group_id = $1
328 AND umc.synced_to_mailchimp_at < u.updated_at
329 ",
330 course_language_group_id
331 )
332 .fetch_all(conn)
333 .await?;
334
335 Ok(result)
336}
337
338pub async fn update_synced_to_mailchimp_at_to_all_synced_users(
340 conn: &mut PgConnection,
341 ids: &[Uuid],
342) -> ModelResult<()> {
343 sqlx::query!(
344 "
345UPDATE user_marketing_consents
346SET synced_to_mailchimp_at = now()
347WHERE user_id IN (
348 SELECT UNNEST($1::uuid [])
349 )
350",
351 &ids
352 )
353 .execute(conn)
354 .await?;
355 Ok(())
356}
357
358pub async fn update_user_mailchimp_id_at_to_all_synced_users(
360 pool: &mut PgConnection,
361 user_contact_pairs: Vec<(String, String)>,
362) -> ModelResult<()> {
363 let (user_ids_raw, user_mailchimp_ids): (Vec<_>, Vec<_>) =
364 user_contact_pairs.into_iter().unzip();
365
366 let user_ids: Vec<Uuid> = user_ids_raw
368 .into_iter()
369 .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
370 .collect();
371
372 sqlx::query!(
373 "
374UPDATE user_marketing_consents
375SET user_mailchimp_id = updated_data.user_mailchimp_id
376FROM (
377 SELECT UNNEST($1::uuid[]) AS user_id, UNNEST($2::text[]) AS user_mailchimp_id
378) AS updated_data
379WHERE user_marketing_consents.user_id = updated_data.user_id
380",
381 &user_ids,
382 &user_mailchimp_ids
383 )
384 .execute(pool)
385 .await?;
386 Ok(())
387}
388
389pub async fn update_unsubscribed_users_from_mailchimp_in_bulk(
391 conn: &mut PgConnection,
392 mailchimp_data: Vec<(String, String, String, String)>,
393) -> anyhow::Result<()> {
394 let (
395 user_ids_raw,
396 timestamps_raw,
397 course_language_group_ids_raw,
398 email_subscriptions_in_mailchimp,
399 ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>) = multiunzip(mailchimp_data);
400
401 let user_ids: Vec<Uuid> = user_ids_raw
402 .into_iter()
403 .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
404 .collect();
405
406 let timestamps: Vec<DateTime<Utc>> = timestamps_raw
407 .into_iter()
408 .filter_map(|ts| {
409 DateTime::parse_from_rfc3339(&ts)
410 .ok()
411 .map(|dt| dt.with_timezone(&Utc))
412 })
413 .collect();
414
415 let course_language_group_ids: Vec<Uuid> = course_language_group_ids_raw
416 .into_iter()
417 .filter_map(|lang_id| Uuid::parse_str(&lang_id).ok())
418 .collect();
419
420 sqlx::query!(
421 "
422 UPDATE user_marketing_consents
423 SET consent = false,
424 email_subscription_in_mailchimp = updated_data.email_subscription_in_mailchimp,
425 synced_to_mailchimp_at = updated_data.last_updated
426 FROM (
427 SELECT UNNEST($1::Uuid[]) AS user_id,
428 UNNEST($2::timestamptz[]) AS last_updated,
429 UNNEST($3::Uuid[]) AS course_language_group_id,
430 UNNEST($4::text[]) AS email_subscription_in_mailchimp
431
432 ) AS updated_data
433 WHERE user_marketing_consents.user_id = updated_data.user_id
434 AND user_marketing_consents.consent = true
435 AND user_marketing_consents.synced_to_mailchimp_at < updated_data.last_updated
436 AND user_marketing_consents.course_language_group_id = updated_data.course_language_group_id
437 ",
438 &user_ids,
439 ×tamps,
440 &course_language_group_ids,
441 &email_subscriptions_in_mailchimp
442 )
443 .execute(conn)
444 .await?;
445 Ok(())
446}
447
448pub async fn fetch_all_marketing_mailing_list_access_tokens(
449 conn: &mut PgConnection,
450) -> sqlx::Result<Vec<MarketingMailingListAccessToken>> {
451 let results = sqlx::query_as!(
452 MarketingMailingListAccessToken,
453 "
454 SELECT
455 id,
456 course_id,
457 course_language_group_id,
458 server_prefix,
459 access_token,
460 mailchimp_mailing_list_id,
461 created_at,
462 updated_at,
463 deleted_at
464 FROM marketing_mailing_list_access_tokens
465 "
466 )
467 .fetch_all(conn)
468 .await?;
469
470 Ok(results)
471}
472
473pub async fn fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
474 conn: &mut PgConnection,
475 course_language_group_id: Uuid,
476 marketing_mailing_access_token_id: Uuid,
477) -> sqlx::Result<Vec<serde_json::Value>> {
478 let results = sqlx::query!(
479 "
480 SELECT
481 tag_name,
482 tag_id
483 FROM mailchimp_course_tags
484 WHERE course_language_group_id = $1
485 AND marketing_mailing_list_access_token_id = $2
486 AND deleted_at IS NULL
487 ",
488 course_language_group_id,
489 marketing_mailing_access_token_id
490 )
491 .fetch_all(conn)
492 .await?;
493
494 let tag_objects: Vec<serde_json::Value> = results
495 .into_iter()
496 .filter_map(|row| {
497 let tag_name = row.tag_name.trim();
498 let tag_id = row.tag_id.trim();
499
500 if tag_name.is_empty() {
501 return None;
502 }
503
504 Some(json!({
505 "name": tag_name,
506 "id": tag_id,
507 "status": "active"
508 }))
509 })
510 .collect();
511
512 Ok(tag_objects)
513}
514
515pub async fn upsert_tag(
516 conn: &mut PgConnection,
517 course_language_group_id: Uuid,
518 marketing_mailing_access_token_id: Uuid,
519 tag_id: String,
520 tag_name: String,
521) -> sqlx::Result<()> {
522 sqlx::query!(
523 "
524 INSERT INTO mailchimp_course_tags (
525 tag_name,
526 tag_id,
527 course_language_group_id,
528 marketing_mailing_list_access_token_id
529 )
530 VALUES ($1, $2, $3, $4)
531 ON CONFLICT (course_language_group_id, tag_name)
532 DO UPDATE
533 SET
534 tag_name = EXCLUDED.tag_name
535 ",
536 tag_name,
537 tag_id,
538 course_language_group_id,
539 marketing_mailing_access_token_id,
540 )
541 .execute(&mut *conn)
542 .await?;
543 Ok(())
544}
545
546pub async fn delete_tag(
547 conn: &mut PgConnection,
548 deleted_tag_id: String,
549 course_language_group_id: Uuid,
550) -> sqlx::Result<()> {
551 sqlx::query!(
552 "
553 UPDATE mailchimp_course_tags
554 SET deleted_at = now()
555 WHERE tag_id = $1
556 AND course_language_group_id = $2
557 AND deleted_at IS NULL
558 ",
559 deleted_tag_id,
560 course_language_group_id
561 )
562 .execute(&mut *conn)
563 .await?;
564
565 Ok(())
566}