headless_lms_models/
marketing_consents.rs

1use itertools::multiunzip;
2use serde_json::json;
3use utoipa::ToSchema;
4
5use crate::prelude::*;
6
7#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, ToSchema)]
8
9pub struct UserMarketingConsent {
10    pub id: Uuid,
11    pub course_id: Uuid,
12    pub course_language_group_id: Uuid,
13    pub user_id: Uuid,
14    pub user_mailchimp_id: Option<String>,
15    pub consent: bool,
16    pub email_subscription_in_mailchimp: Option<String>,
17    pub created_at: DateTime<Utc>,
18    pub updated_at: DateTime<Utc>,
19    pub deleted_at: Option<DateTime<Utc>>,
20    pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
21}
22
23#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
24
25pub struct UserMarketingConsentWithDetails {
26    pub id: Uuid,
27    pub course_id: Uuid,
28    pub course_language_group_id: Uuid,
29    pub user_id: Uuid,
30    pub user_mailchimp_id: Option<String>,
31    pub consent: bool,
32    pub email_subscription_in_mailchimp: Option<String>,
33    pub created_at: DateTime<Utc>,
34    pub updated_at: DateTime<Utc>,
35    pub deleted_at: Option<DateTime<Utc>>,
36    pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
37    pub first_name: Option<String>,
38    pub last_name: Option<String>,
39    pub email: String,
40    pub course_name: String,
41    pub locale: Option<String>,
42    pub completed_course_at: Option<DateTime<Utc>>,
43    pub research_consent: Option<bool>,
44    pub country: Option<String>,
45}
46
47#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
48
49pub struct UserEmailSubscription {
50    pub user_id: Uuid,
51    pub email: String,
52    pub email_subscription_in_mailchimp: Option<String>,
53    pub user_mailchimp_id: Option<String>,
54}
55
56#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
57
58pub struct MarketingMailingListAccessToken {
59    pub id: Uuid,
60    pub course_id: Uuid,
61    pub mailchimp_mailing_list_id: String,
62    pub course_language_group_id: Uuid,
63    pub server_prefix: String,
64    pub access_token: String,
65    pub created_at: DateTime<Utc>,
66    pub updated_at: DateTime<Utc>,
67    pub deleted_at: Option<DateTime<Utc>>,
68}
69
70#[derive(Debug, FromRow)]
71pub struct MailchimpCourseTag {
72    pub id: Uuid,
73    pub marketing_mailing_list_access_token_id: Uuid,
74    pub course_language_group_id: Uuid,
75    pub tag_name: String,
76    pub tag_id: String,
77    pub created_at: DateTime<Utc>,
78    pub updated_at: DateTime<Utc>,
79    pub deleted_at: Option<DateTime<Utc>>,
80}
81
82#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
83
84pub struct MailchimpLanguageCodeMapping {
85    pub id: Uuid,
86    pub marketing_mailing_list_access_token_id: Uuid,
87    pub our_language_code: String,
88    pub mailchimp_language_code: String,
89    pub created_at: DateTime<Utc>,
90    pub updated_at: DateTime<Utc>,
91    pub deleted_at: Option<DateTime<Utc>>,
92}
93
94pub async fn upsert_marketing_consent(
95    conn: &mut PgConnection,
96    course_id: Uuid,
97    course_language_group_id: Uuid,
98    user_id: &Uuid,
99    email_subscription: &str,
100    marketing_consent: bool,
101) -> sqlx::Result<Uuid> {
102    let result = sqlx::query!(
103        r#"
104      INSERT INTO user_marketing_consents (user_id, course_id, course_language_group_id, consent, email_subscription_in_mailchimp)
105      VALUES ($1, $2, $3, $4, $5)
106      ON CONFLICT (user_id, course_language_group_id)
107      DO UPDATE
108      SET
109        course_id = $2,
110        consent = $4,
111        email_subscription_in_mailchimp = $5
112      RETURNING id
113      "#,
114        user_id,
115        course_id,
116        course_language_group_id,
117        marketing_consent,
118        email_subscription
119    )
120    .fetch_one(conn)
121    .await?;
122
123    Ok(result.id)
124}
125
126pub async fn fetch_user_marketing_consent(
127    conn: &mut PgConnection,
128    course_id: Uuid,
129    user_id: &Uuid,
130) -> sqlx::Result<UserMarketingConsent> {
131    let result = sqlx::query_as!(
132        UserMarketingConsent,
133        "
134    SELECT *
135    FROM user_marketing_consents
136    WHERE user_id = $1 AND course_id = $2
137    ",
138        user_id,
139        course_id,
140    )
141    .fetch_one(conn)
142    .await?;
143
144    Ok(result)
145}
146
147/// Fetches all user marketing consents with detailed user information for a specific course language group, if they haven't been synced to Mailchimp or if there have been updates since the last sync.
148pub async fn fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
149    conn: &mut PgConnection,
150    course_language_group_id: Uuid,
151) -> sqlx::Result<Vec<UserMarketingConsentWithDetails>> {
152    let result = sqlx::query_as!(
153        UserMarketingConsentWithDetails,
154        r#"
155    SELECT
156        umc.id,
157        umc.course_id,
158        umc.course_language_group_id,
159        umc.user_id,
160        umc.user_mailchimp_id,
161        umc.consent,
162        umc.email_subscription_in_mailchimp,
163        umc.created_at,
164        umc.updated_at,
165        umc.deleted_at,
166        umc.synced_to_mailchimp_at,
167        u.first_name AS first_name,
168        u.last_name AS last_name,
169        u.country AS country,
170        u.email AS email,
171        c.name AS course_name,
172        COALESCE(
173            mlcm.mailchimp_language_code,
174            c.language_code
175        ) AS locale,
176        CASE WHEN cmc.passed IS NOT NULL THEN cmc.completion_date ELSE NULL END AS completed_course_at,
177        COALESCE(csfa.research_consent, urc.research_consent) AS research_consent
178    FROM user_marketing_consents AS umc
179    JOIN user_details AS u ON u.user_id = umc.user_id
180    JOIN courses AS c ON c.id = umc.course_id
181    LEFT JOIN course_module_completions AS cmc
182        ON cmc.user_id = umc.user_id AND cmc.course_id = umc.course_id
183    LEFT JOIN course_specific_consent_form_answers AS csfa
184        ON csfa.course_id = umc.course_id AND csfa.user_id = umc.user_id
185    LEFT JOIN user_research_consents AS urc
186        ON urc.user_id = umc.user_id
187    LEFT JOIN mailchimp_course_tags AS tags
188        ON tags.course_language_group_id = umc.course_language_group_id
189    LEFT JOIN marketing_mailing_list_access_tokens AS mmlat
190        ON mmlat.course_language_group_id = umc.course_language_group_id AND mmlat.deleted_at IS NULL
191    LEFT JOIN mailchimp_language_code_mappings AS mlcm
192        ON mlcm.marketing_mailing_list_access_token_id = mmlat.id
193        AND mlcm.our_language_code = c.language_code
194        AND mlcm.deleted_at IS NULL
195    WHERE umc.course_language_group_id = $1
196    AND (
197        umc.synced_to_mailchimp_at IS NULL
198        OR umc.synced_to_mailchimp_at < umc.updated_at
199        OR csfa.updated_at > umc.synced_to_mailchimp_at
200        OR urc.updated_at > umc.synced_to_mailchimp_at
201        OR cmc.updated_at > umc.synced_to_mailchimp_at
202        OR u.updated_at > umc.synced_to_mailchimp_at
203        OR c.updated_at > umc.synced_to_mailchimp_at
204        OR mlcm.updated_at > umc.synced_to_mailchimp_at
205        OR EXISTS (
206            SELECT 1
207            FROM mailchimp_course_tags
208            WHERE mailchimp_course_tags.course_language_group_id = umc.course_language_group_id
209            AND mailchimp_course_tags.updated_at > umc.synced_to_mailchimp_at
210            AND deleted_at IS NULL
211        )
212    )
213    "#,
214        course_language_group_id
215    )
216    .fetch_all(conn)
217    .await?;
218
219    Ok(result)
220}
221
222pub async fn fetch_user_marketing_consents_with_details_by_user_ids(
223    conn: &mut PgConnection,
224    course_language_group_id: Uuid,
225    user_ids: &[Uuid],
226) -> sqlx::Result<Vec<UserMarketingConsentWithDetails>> {
227    if user_ids.is_empty() {
228        return Ok(vec![]);
229    }
230
231    let result = sqlx::query_as!(
232        UserMarketingConsentWithDetails,
233        r#"
234    SELECT
235        umc.id,
236        umc.course_id,
237        umc.course_language_group_id,
238        umc.user_id,
239        umc.user_mailchimp_id,
240        umc.consent,
241        umc.email_subscription_in_mailchimp,
242        umc.created_at,
243        umc.updated_at,
244        umc.deleted_at,
245        umc.synced_to_mailchimp_at,
246        u.first_name AS first_name,
247        u.last_name AS last_name,
248        u.country AS country,
249        u.email AS email,
250        c.name AS course_name,
251        COALESCE(
252            mlcm.mailchimp_language_code,
253            c.language_code
254        ) AS locale,
255        CASE WHEN cmc.passed IS NOT NULL THEN cmc.completion_date ELSE NULL END AS completed_course_at,
256        COALESCE(csfa.research_consent, urc.research_consent) AS research_consent
257    FROM user_marketing_consents AS umc
258    JOIN user_details AS u ON u.user_id = umc.user_id
259    JOIN courses AS c ON c.id = umc.course_id
260    LEFT JOIN course_module_completions AS cmc
261        ON cmc.user_id = umc.user_id AND cmc.course_id = umc.course_id
262    LEFT JOIN course_specific_consent_form_answers AS csfa
263        ON csfa.course_id = umc.course_id AND csfa.user_id = umc.user_id
264    LEFT JOIN user_research_consents AS urc
265        ON urc.user_id = umc.user_id
266    LEFT JOIN marketing_mailing_list_access_tokens AS mmlat
267        ON mmlat.course_language_group_id = umc.course_language_group_id AND mmlat.deleted_at IS NULL
268    LEFT JOIN mailchimp_language_code_mappings AS mlcm
269        ON mlcm.marketing_mailing_list_access_token_id = mmlat.id
270        AND mlcm.our_language_code = c.language_code
271        AND mlcm.deleted_at IS NULL
272    WHERE umc.course_language_group_id = $1
273    AND umc.user_id = ANY($2::uuid[])
274    AND umc.deleted_at IS NULL
275    "#,
276        course_language_group_id,
277        user_ids
278    )
279    .fetch_all(conn)
280    .await?;
281
282    Ok(result)
283}
284
285/// Fetches user_id -> user_mailchimp_id for the given course language group and user IDs (e.g. after syncing to Mailchimp). Only includes entries with a non-null user_mailchimp_id.
286pub async fn fetch_user_mailchimp_id_mapping(
287    conn: &mut PgConnection,
288    course_language_group_id: Uuid,
289    user_ids: &[Uuid],
290) -> sqlx::Result<std::collections::HashMap<Uuid, String>> {
291    if user_ids.is_empty() {
292        return Ok(std::collections::HashMap::new());
293    }
294    let rows = sqlx::query!(
295        r#"
296    SELECT user_id, user_mailchimp_id
297    FROM user_marketing_consents
298    WHERE course_language_group_id = $1 AND user_id = ANY($2::uuid[]) AND deleted_at IS NULL
299    "#,
300        course_language_group_id,
301        user_ids
302    )
303    .fetch_all(conn)
304    .await?;
305
306    let map = rows
307        .into_iter()
308        .filter_map(|r| r.user_mailchimp_id.map(|id| (r.user_id, id)))
309        .collect();
310    Ok(map)
311}
312
313/// Fetches email, email subscription status and user ID for users whose details have been updated after their marketing consent was last synced to Mailchimp
314pub async fn fetch_all_unsynced_updated_emails(
315    conn: &mut PgConnection,
316    course_language_group_id: Uuid,
317) -> sqlx::Result<Vec<UserEmailSubscription>> {
318    let result = sqlx::query_as!(
319        UserEmailSubscription,
320        "
321    SELECT
322        umc.user_id,
323        u.email AS email,
324        umc.email_subscription_in_mailchimp,
325        umc.user_mailchimp_id
326    FROM user_marketing_consents AS umc
327    JOIN user_details AS u ON u.user_id = umc.user_id
328    WHERE umc.course_language_group_id = $1
329    AND umc.synced_to_mailchimp_at < u.updated_at
330    ",
331        course_language_group_id
332    )
333    .fetch_all(conn)
334    .await?;
335
336    Ok(result)
337}
338
339/// Used to update the synced_to_mailchimp_at to a list of users when they are successfully synced to mailchimp
340pub async fn update_synced_to_mailchimp_at_to_all_synced_users(
341    conn: &mut PgConnection,
342    ids: &[Uuid],
343) -> ModelResult<()> {
344    sqlx::query!(
345        "
346UPDATE user_marketing_consents
347SET synced_to_mailchimp_at = now()
348WHERE user_id IN (
349    SELECT UNNEST($1::uuid [])
350  )
351",
352        &ids
353    )
354    .execute(conn)
355    .await?;
356    Ok(())
357}
358
359/// Used to add the user_mailchimp_ids to a list of new users when they are successfully synced to mailchimp
360pub async fn update_user_mailchimp_id_at_to_all_synced_users(
361    pool: &mut PgConnection,
362    user_contact_pairs: Vec<(String, String)>,
363) -> ModelResult<()> {
364    let (user_ids_raw, user_mailchimp_ids): (Vec<_>, Vec<_>) =
365        user_contact_pairs.into_iter().unzip();
366
367    // Parse user_ids into Uuid
368    let user_ids: Vec<Uuid> = user_ids_raw
369        .into_iter()
370        .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
371        .collect();
372
373    sqlx::query!(
374        "
375UPDATE user_marketing_consents
376SET user_mailchimp_id = updated_data.user_mailchimp_id
377FROM (
378    SELECT UNNEST($1::uuid[]) AS user_id, UNNEST($2::text[]) AS user_mailchimp_id
379) AS updated_data
380WHERE user_marketing_consents.user_id = updated_data.user_id
381",
382        &user_ids,
383        &user_mailchimp_ids
384    )
385    .execute(pool)
386    .await?;
387    Ok(())
388}
389
390/// Updates user consents to false in bulk using Mailchimp data.
391pub async fn update_unsubscribed_users_from_mailchimp_in_bulk(
392    conn: &mut PgConnection,
393    mailchimp_data: Vec<(String, String, String, String)>,
394) -> anyhow::Result<()> {
395    let (
396        user_ids_raw,
397        timestamps_raw,
398        course_language_group_ids_raw,
399        email_subscriptions_in_mailchimp,
400    ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>) = multiunzip(mailchimp_data);
401
402    let user_ids: Vec<Uuid> = user_ids_raw
403        .into_iter()
404        .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
405        .collect();
406
407    let timestamps: Vec<DateTime<Utc>> = timestamps_raw
408        .into_iter()
409        .filter_map(|ts| {
410            DateTime::parse_from_rfc3339(&ts)
411                .ok()
412                .map(|dt| dt.with_timezone(&Utc))
413        })
414        .collect();
415
416    let course_language_group_ids: Vec<Uuid> = course_language_group_ids_raw
417        .into_iter()
418        .filter_map(|lang_id| Uuid::parse_str(&lang_id).ok())
419        .collect();
420
421    sqlx::query!(
422        "
423    UPDATE user_marketing_consents
424    SET consent = false,
425        email_subscription_in_mailchimp = updated_data.email_subscription_in_mailchimp,
426        synced_to_mailchimp_at = updated_data.last_updated
427    FROM (
428        SELECT UNNEST($1::Uuid[]) AS user_id,
429               UNNEST($2::timestamptz[]) AS last_updated,
430               UNNEST($3::Uuid[]) AS course_language_group_id,
431               UNNEST($4::text[]) AS email_subscription_in_mailchimp
432
433    ) AS updated_data
434    WHERE user_marketing_consents.user_id = updated_data.user_id
435      AND user_marketing_consents.consent = true
436      AND user_marketing_consents.synced_to_mailchimp_at < updated_data.last_updated
437      AND user_marketing_consents.course_language_group_id = updated_data.course_language_group_id
438    ",
439        &user_ids,
440        &timestamps,
441        &course_language_group_ids,
442        &email_subscriptions_in_mailchimp
443    )
444    .execute(conn)
445    .await?;
446    Ok(())
447}
448
449pub async fn fetch_all_marketing_mailing_list_access_tokens(
450    conn: &mut PgConnection,
451) -> sqlx::Result<Vec<MarketingMailingListAccessToken>> {
452    let results = sqlx::query_as!(
453        MarketingMailingListAccessToken,
454        "
455    SELECT
456      id,
457      course_id,
458      course_language_group_id,
459      server_prefix,
460      access_token,
461      mailchimp_mailing_list_id,
462      created_at,
463      updated_at,
464      deleted_at
465    FROM marketing_mailing_list_access_tokens
466    "
467    )
468    .fetch_all(conn)
469    .await?;
470
471    Ok(results)
472}
473
474pub async fn fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
475    conn: &mut PgConnection,
476    course_language_group_id: Uuid,
477    marketing_mailing_access_token_id: Uuid,
478) -> sqlx::Result<Vec<serde_json::Value>> {
479    let results = sqlx::query!(
480        "
481        SELECT
482          tag_name,
483          tag_id
484        FROM mailchimp_course_tags
485        WHERE course_language_group_id = $1
486        AND marketing_mailing_list_access_token_id = $2
487        AND deleted_at IS NULL
488        ",
489        course_language_group_id,
490        marketing_mailing_access_token_id
491    )
492    .fetch_all(conn)
493    .await?;
494
495    let tag_objects: Vec<serde_json::Value> = results
496        .into_iter()
497        .filter_map(|row| {
498            let tag_name = row.tag_name.trim();
499            let tag_id = row.tag_id.trim();
500
501            if tag_name.is_empty() {
502                return None;
503            }
504
505            Some(json!({
506                "name": tag_name,
507                "id": tag_id,
508                "status": "active"
509            }))
510        })
511        .collect();
512
513    Ok(tag_objects)
514}
515
516pub async fn upsert_tag(
517    conn: &mut PgConnection,
518    course_language_group_id: Uuid,
519    marketing_mailing_access_token_id: Uuid,
520    tag_id: String,
521    tag_name: String,
522) -> sqlx::Result<()> {
523    sqlx::query!(
524        "
525        INSERT INTO mailchimp_course_tags (
526          tag_name,
527          tag_id,
528          course_language_group_id,
529          marketing_mailing_list_access_token_id
530        )
531        VALUES ($1, $2, $3, $4)
532        ON CONFLICT (course_language_group_id, tag_name)
533        DO UPDATE
534        SET
535          tag_name = EXCLUDED.tag_name
536        ",
537        tag_name,
538        tag_id,
539        course_language_group_id,
540        marketing_mailing_access_token_id,
541    )
542    .execute(&mut *conn)
543    .await?;
544    Ok(())
545}
546
547pub async fn delete_tag(
548    conn: &mut PgConnection,
549    deleted_tag_id: String,
550    course_language_group_id: Uuid,
551) -> sqlx::Result<()> {
552    sqlx::query!(
553        "
554        UPDATE mailchimp_course_tags
555        SET deleted_at = now()
556        WHERE tag_id = $1
557        AND course_language_group_id = $2
558        AND deleted_at IS NULL
559        ",
560        deleted_tag_id,
561        course_language_group_id
562    )
563    .execute(&mut *conn)
564    .await?;
565
566    Ok(())
567}