headless_lms_models/
marketing_consents.rs

1use itertools::multiunzip;
2use serde_json::json;
3
4use crate::prelude::*;
5
6#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
7#[cfg_attr(feature = "ts_rs", derive(TS))]
8pub struct UserMarketingConsent {
9    pub id: Uuid,
10    pub course_id: Uuid,
11    pub course_language_group_id: Uuid,
12    pub user_id: Uuid,
13    pub user_mailchimp_id: Option<String>,
14    pub consent: bool,
15    pub email_subscription_in_mailchimp: Option<String>,
16    pub created_at: DateTime<Utc>,
17    pub updated_at: DateTime<Utc>,
18    pub deleted_at: Option<DateTime<Utc>>,
19    pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
20}
21
22#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
23#[cfg_attr(feature = "ts_rs", derive(TS))]
24pub struct UserMarketingConsentWithDetails {
25    pub id: Uuid,
26    pub course_id: Uuid,
27    pub course_language_group_id: Uuid,
28    pub user_id: Uuid,
29    pub user_mailchimp_id: Option<String>,
30    pub consent: bool,
31    pub email_subscription_in_mailchimp: Option<String>,
32    pub created_at: DateTime<Utc>,
33    pub updated_at: DateTime<Utc>,
34    pub deleted_at: Option<DateTime<Utc>>,
35    pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
36    pub first_name: Option<String>,
37    pub last_name: Option<String>,
38    pub email: String,
39    pub course_name: String,
40    pub locale: Option<String>,
41    pub completed_course_at: Option<DateTime<Utc>>,
42    pub research_consent: Option<bool>,
43    pub country: Option<String>,
44}
45
46#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
47#[cfg_attr(feature = "ts_rs", derive(TS))]
48pub struct UserEmailSubscription {
49    pub user_id: Uuid,
50    pub email: String,
51    pub email_subscription_in_mailchimp: Option<String>,
52    pub user_mailchimp_id: Option<String>,
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
56#[cfg_attr(feature = "ts_rs", derive(TS))]
57pub struct MarketingMailingListAccessToken {
58    pub id: Uuid,
59    pub course_id: Uuid,
60    pub mailchimp_mailing_list_id: String,
61    pub course_language_group_id: Uuid,
62    pub server_prefix: String,
63    pub access_token: String,
64    pub created_at: DateTime<Utc>,
65    pub updated_at: DateTime<Utc>,
66    pub deleted_at: Option<DateTime<Utc>>,
67}
68
69#[derive(Debug, FromRow)]
70pub struct MailchimpCourseTag {
71    pub id: Uuid,
72    pub marketing_mailing_list_access_token_id: Uuid,
73    pub course_language_group_id: Uuid,
74    pub tag_name: String,
75    pub tag_id: String,
76    pub created_at: DateTime<Utc>,
77    pub updated_at: DateTime<Utc>,
78    pub deleted_at: Option<DateTime<Utc>>,
79}
80
81#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
82#[cfg_attr(feature = "ts_rs", derive(TS))]
83pub struct MailchimpLanguageCodeMapping {
84    pub id: Uuid,
85    pub marketing_mailing_list_access_token_id: Uuid,
86    pub our_language_code: String,
87    pub mailchimp_language_code: String,
88    pub created_at: DateTime<Utc>,
89    pub updated_at: DateTime<Utc>,
90    pub deleted_at: Option<DateTime<Utc>>,
91}
92
93pub async fn upsert_marketing_consent(
94    conn: &mut PgConnection,
95    course_id: Uuid,
96    course_language_group_id: Uuid,
97    user_id: &Uuid,
98    email_subscription: &str,
99    marketing_consent: bool,
100) -> sqlx::Result<Uuid> {
101    let result = sqlx::query!(
102        r#"
103      INSERT INTO user_marketing_consents (user_id, course_id, course_language_group_id, consent, email_subscription_in_mailchimp)
104      VALUES ($1, $2, $3, $4, $5)
105      ON CONFLICT (user_id, course_language_group_id)
106      DO UPDATE
107      SET
108        course_id = $2,
109        consent = $4,
110        email_subscription_in_mailchimp = $5
111      RETURNING id
112      "#,
113        user_id,
114        course_id,
115        course_language_group_id,
116        marketing_consent,
117        email_subscription
118    )
119    .fetch_one(conn)
120    .await?;
121
122    Ok(result.id)
123}
124
125pub async fn fetch_user_marketing_consent(
126    conn: &mut PgConnection,
127    course_id: Uuid,
128    user_id: &Uuid,
129) -> sqlx::Result<UserMarketingConsent> {
130    let result = sqlx::query_as!(
131        UserMarketingConsent,
132        "
133    SELECT *
134    FROM user_marketing_consents
135    WHERE user_id = $1 AND course_id = $2
136    ",
137        user_id,
138        course_id,
139    )
140    .fetch_one(conn)
141    .await?;
142
143    Ok(result)
144}
145
146/// Fetches all user marketing consents with detailed user information for a specific course language group, if they haven't been synced to Mailchimp or if there have been updates since the last sync.
147pub async fn fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
148    conn: &mut PgConnection,
149    course_language_group_id: Uuid,
150) -> sqlx::Result<Vec<UserMarketingConsentWithDetails>> {
151    let result = sqlx::query_as!(
152        UserMarketingConsentWithDetails,
153        r#"
154    SELECT
155        umc.id,
156        umc.course_id,
157        umc.course_language_group_id,
158        umc.user_id,
159        umc.user_mailchimp_id,
160        umc.consent,
161        umc.email_subscription_in_mailchimp,
162        umc.created_at,
163        umc.updated_at,
164        umc.deleted_at,
165        umc.synced_to_mailchimp_at,
166        u.first_name AS first_name,
167        u.last_name AS last_name,
168        u.country AS country,
169        u.email AS email,
170        c.name AS course_name,
171        COALESCE(
172            mlcm.mailchimp_language_code,
173            c.language_code
174        ) AS locale,
175        CASE WHEN cmc.passed IS NOT NULL THEN cmc.completion_date ELSE NULL END AS completed_course_at,
176        COALESCE(csfa.research_consent, urc.research_consent) AS research_consent
177    FROM user_marketing_consents AS umc
178    JOIN user_details AS u ON u.user_id = umc.user_id
179    JOIN courses AS c ON c.id = umc.course_id
180    LEFT JOIN course_module_completions AS cmc
181        ON cmc.user_id = umc.user_id AND cmc.course_id = umc.course_id
182    LEFT JOIN course_specific_consent_form_answers AS csfa
183        ON csfa.course_id = umc.course_id AND csfa.user_id = umc.user_id
184    LEFT JOIN user_research_consents AS urc
185        ON urc.user_id = umc.user_id
186    LEFT JOIN mailchimp_course_tags AS tags
187        ON tags.course_language_group_id = umc.course_language_group_id
188    LEFT JOIN marketing_mailing_list_access_tokens AS mmlat
189        ON mmlat.course_language_group_id = umc.course_language_group_id AND mmlat.deleted_at IS NULL
190    LEFT JOIN mailchimp_language_code_mappings AS mlcm
191        ON mlcm.marketing_mailing_list_access_token_id = mmlat.id
192        AND mlcm.our_language_code = c.language_code
193        AND mlcm.deleted_at IS NULL
194    WHERE umc.course_language_group_id = $1
195    AND (
196        umc.synced_to_mailchimp_at IS NULL
197        OR umc.synced_to_mailchimp_at < umc.updated_at
198        OR csfa.updated_at > umc.synced_to_mailchimp_at
199        OR urc.updated_at > umc.synced_to_mailchimp_at
200        OR cmc.updated_at > umc.synced_to_mailchimp_at
201        OR u.updated_at > umc.synced_to_mailchimp_at
202        OR c.updated_at > umc.synced_to_mailchimp_at
203        OR mlcm.updated_at > umc.synced_to_mailchimp_at
204        OR EXISTS (
205            SELECT 1
206            FROM mailchimp_course_tags
207            WHERE mailchimp_course_tags.course_language_group_id = umc.course_language_group_id
208            AND mailchimp_course_tags.updated_at > umc.synced_to_mailchimp_at
209            AND deleted_at IS NULL
210        )
211    )
212    "#,
213        course_language_group_id
214    )
215    .fetch_all(conn)
216    .await?;
217
218    Ok(result)
219}
220
221pub async fn fetch_user_marketing_consents_with_details_by_user_ids(
222    conn: &mut PgConnection,
223    course_language_group_id: Uuid,
224    user_ids: &[Uuid],
225) -> sqlx::Result<Vec<UserMarketingConsentWithDetails>> {
226    if user_ids.is_empty() {
227        return Ok(vec![]);
228    }
229
230    let result = sqlx::query_as!(
231        UserMarketingConsentWithDetails,
232        r#"
233    SELECT
234        umc.id,
235        umc.course_id,
236        umc.course_language_group_id,
237        umc.user_id,
238        umc.user_mailchimp_id,
239        umc.consent,
240        umc.email_subscription_in_mailchimp,
241        umc.created_at,
242        umc.updated_at,
243        umc.deleted_at,
244        umc.synced_to_mailchimp_at,
245        u.first_name AS first_name,
246        u.last_name AS last_name,
247        u.country AS country,
248        u.email AS email,
249        c.name AS course_name,
250        COALESCE(
251            mlcm.mailchimp_language_code,
252            c.language_code
253        ) AS locale,
254        CASE WHEN cmc.passed IS NOT NULL THEN cmc.completion_date ELSE NULL END AS completed_course_at,
255        COALESCE(csfa.research_consent, urc.research_consent) AS research_consent
256    FROM user_marketing_consents AS umc
257    JOIN user_details AS u ON u.user_id = umc.user_id
258    JOIN courses AS c ON c.id = umc.course_id
259    LEFT JOIN course_module_completions AS cmc
260        ON cmc.user_id = umc.user_id AND cmc.course_id = umc.course_id
261    LEFT JOIN course_specific_consent_form_answers AS csfa
262        ON csfa.course_id = umc.course_id AND csfa.user_id = umc.user_id
263    LEFT JOIN user_research_consents AS urc
264        ON urc.user_id = umc.user_id
265    LEFT JOIN marketing_mailing_list_access_tokens AS mmlat
266        ON mmlat.course_language_group_id = umc.course_language_group_id AND mmlat.deleted_at IS NULL
267    LEFT JOIN mailchimp_language_code_mappings AS mlcm
268        ON mlcm.marketing_mailing_list_access_token_id = mmlat.id
269        AND mlcm.our_language_code = c.language_code
270        AND mlcm.deleted_at IS NULL
271    WHERE umc.course_language_group_id = $1
272    AND umc.user_id = ANY($2::uuid[])
273    AND umc.deleted_at IS NULL
274    "#,
275        course_language_group_id,
276        user_ids
277    )
278    .fetch_all(conn)
279    .await?;
280
281    Ok(result)
282}
283
284/// Fetches user_id -> user_mailchimp_id for the given course language group and user IDs (e.g. after syncing to Mailchimp). Only includes entries with a non-null user_mailchimp_id.
285pub async fn fetch_user_mailchimp_id_mapping(
286    conn: &mut PgConnection,
287    course_language_group_id: Uuid,
288    user_ids: &[Uuid],
289) -> sqlx::Result<std::collections::HashMap<Uuid, String>> {
290    if user_ids.is_empty() {
291        return Ok(std::collections::HashMap::new());
292    }
293    let rows = sqlx::query!(
294        r#"
295    SELECT user_id, user_mailchimp_id
296    FROM user_marketing_consents
297    WHERE course_language_group_id = $1 AND user_id = ANY($2::uuid[]) AND deleted_at IS NULL
298    "#,
299        course_language_group_id,
300        user_ids
301    )
302    .fetch_all(conn)
303    .await?;
304
305    let map = rows
306        .into_iter()
307        .filter_map(|r| r.user_mailchimp_id.map(|id| (r.user_id, id)))
308        .collect();
309    Ok(map)
310}
311
312/// Fetches email, email subscription status and user ID for users whose details have been updated after their marketing consent was last synced to Mailchimp
313pub async fn fetch_all_unsynced_updated_emails(
314    conn: &mut PgConnection,
315    course_language_group_id: Uuid,
316) -> sqlx::Result<Vec<UserEmailSubscription>> {
317    let result = sqlx::query_as!(
318        UserEmailSubscription,
319        "
320    SELECT
321        umc.user_id,
322        u.email AS email,
323        umc.email_subscription_in_mailchimp,
324        umc.user_mailchimp_id
325    FROM user_marketing_consents AS umc
326    JOIN user_details AS u ON u.user_id = umc.user_id
327    WHERE umc.course_language_group_id = $1
328    AND umc.synced_to_mailchimp_at < u.updated_at
329    ",
330        course_language_group_id
331    )
332    .fetch_all(conn)
333    .await?;
334
335    Ok(result)
336}
337
338/// Used to update the synced_to_mailchimp_at to a list of users when they are successfully synced to mailchimp
339pub async fn update_synced_to_mailchimp_at_to_all_synced_users(
340    conn: &mut PgConnection,
341    ids: &[Uuid],
342) -> ModelResult<()> {
343    sqlx::query!(
344        "
345UPDATE user_marketing_consents
346SET synced_to_mailchimp_at = now()
347WHERE user_id IN (
348    SELECT UNNEST($1::uuid [])
349  )
350",
351        &ids
352    )
353    .execute(conn)
354    .await?;
355    Ok(())
356}
357
358/// Used to add the user_mailchimp_ids to a list of new users when they are successfully synced to mailchimp
359pub async fn update_user_mailchimp_id_at_to_all_synced_users(
360    pool: &mut PgConnection,
361    user_contact_pairs: Vec<(String, String)>,
362) -> ModelResult<()> {
363    let (user_ids_raw, user_mailchimp_ids): (Vec<_>, Vec<_>) =
364        user_contact_pairs.into_iter().unzip();
365
366    // Parse user_ids into Uuid
367    let user_ids: Vec<Uuid> = user_ids_raw
368        .into_iter()
369        .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
370        .collect();
371
372    sqlx::query!(
373        "
374UPDATE user_marketing_consents
375SET user_mailchimp_id = updated_data.user_mailchimp_id
376FROM (
377    SELECT UNNEST($1::uuid[]) AS user_id, UNNEST($2::text[]) AS user_mailchimp_id
378) AS updated_data
379WHERE user_marketing_consents.user_id = updated_data.user_id
380",
381        &user_ids,
382        &user_mailchimp_ids
383    )
384    .execute(pool)
385    .await?;
386    Ok(())
387}
388
389/// Updates user consents to false in bulk using Mailchimp data.
390pub async fn update_unsubscribed_users_from_mailchimp_in_bulk(
391    conn: &mut PgConnection,
392    mailchimp_data: Vec<(String, String, String, String)>,
393) -> anyhow::Result<()> {
394    let (
395        user_ids_raw,
396        timestamps_raw,
397        course_language_group_ids_raw,
398        email_subscriptions_in_mailchimp,
399    ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>) = multiunzip(mailchimp_data);
400
401    let user_ids: Vec<Uuid> = user_ids_raw
402        .into_iter()
403        .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
404        .collect();
405
406    let timestamps: Vec<DateTime<Utc>> = timestamps_raw
407        .into_iter()
408        .filter_map(|ts| {
409            DateTime::parse_from_rfc3339(&ts)
410                .ok()
411                .map(|dt| dt.with_timezone(&Utc))
412        })
413        .collect();
414
415    let course_language_group_ids: Vec<Uuid> = course_language_group_ids_raw
416        .into_iter()
417        .filter_map(|lang_id| Uuid::parse_str(&lang_id).ok())
418        .collect();
419
420    sqlx::query!(
421        "
422    UPDATE user_marketing_consents
423    SET consent = false,
424        email_subscription_in_mailchimp = updated_data.email_subscription_in_mailchimp,
425        synced_to_mailchimp_at = updated_data.last_updated
426    FROM (
427        SELECT UNNEST($1::Uuid[]) AS user_id,
428               UNNEST($2::timestamptz[]) AS last_updated,
429               UNNEST($3::Uuid[]) AS course_language_group_id,
430               UNNEST($4::text[]) AS email_subscription_in_mailchimp
431
432    ) AS updated_data
433    WHERE user_marketing_consents.user_id = updated_data.user_id
434      AND user_marketing_consents.consent = true
435      AND user_marketing_consents.synced_to_mailchimp_at < updated_data.last_updated
436      AND user_marketing_consents.course_language_group_id = updated_data.course_language_group_id
437    ",
438        &user_ids,
439        &timestamps,
440        &course_language_group_ids,
441        &email_subscriptions_in_mailchimp
442    )
443    .execute(conn)
444    .await?;
445    Ok(())
446}
447
448pub async fn fetch_all_marketing_mailing_list_access_tokens(
449    conn: &mut PgConnection,
450) -> sqlx::Result<Vec<MarketingMailingListAccessToken>> {
451    let results = sqlx::query_as!(
452        MarketingMailingListAccessToken,
453        "
454    SELECT
455      id,
456      course_id,
457      course_language_group_id,
458      server_prefix,
459      access_token,
460      mailchimp_mailing_list_id,
461      created_at,
462      updated_at,
463      deleted_at
464    FROM marketing_mailing_list_access_tokens
465    "
466    )
467    .fetch_all(conn)
468    .await?;
469
470    Ok(results)
471}
472
473pub async fn fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
474    conn: &mut PgConnection,
475    course_language_group_id: Uuid,
476    marketing_mailing_access_token_id: Uuid,
477) -> sqlx::Result<Vec<serde_json::Value>> {
478    let results = sqlx::query!(
479        "
480        SELECT
481          tag_name,
482          tag_id
483        FROM mailchimp_course_tags
484        WHERE course_language_group_id = $1
485        AND marketing_mailing_list_access_token_id = $2
486        AND deleted_at IS NULL
487        ",
488        course_language_group_id,
489        marketing_mailing_access_token_id
490    )
491    .fetch_all(conn)
492    .await?;
493
494    let tag_objects: Vec<serde_json::Value> = results
495        .into_iter()
496        .filter_map(|row| {
497            let tag_name = row.tag_name.trim();
498            let tag_id = row.tag_id.trim();
499
500            if tag_name.is_empty() {
501                return None;
502            }
503
504            Some(json!({
505                "name": tag_name,
506                "id": tag_id,
507                "status": "active"
508            }))
509        })
510        .collect();
511
512    Ok(tag_objects)
513}
514
515pub async fn upsert_tag(
516    conn: &mut PgConnection,
517    course_language_group_id: Uuid,
518    marketing_mailing_access_token_id: Uuid,
519    tag_id: String,
520    tag_name: String,
521) -> sqlx::Result<()> {
522    sqlx::query!(
523        "
524        INSERT INTO mailchimp_course_tags (
525          tag_name,
526          tag_id,
527          course_language_group_id,
528          marketing_mailing_list_access_token_id
529        )
530        VALUES ($1, $2, $3, $4)
531        ON CONFLICT (course_language_group_id, tag_name)
532        DO UPDATE
533        SET
534          tag_name = EXCLUDED.tag_name
535        ",
536        tag_name,
537        tag_id,
538        course_language_group_id,
539        marketing_mailing_access_token_id,
540    )
541    .execute(&mut *conn)
542    .await?;
543    Ok(())
544}
545
546pub async fn delete_tag(
547    conn: &mut PgConnection,
548    deleted_tag_id: String,
549    course_language_group_id: Uuid,
550) -> sqlx::Result<()> {
551    sqlx::query!(
552        "
553        UPDATE mailchimp_course_tags
554        SET deleted_at = now()
555        WHERE tag_id = $1
556        AND course_language_group_id = $2
557        AND deleted_at IS NULL
558        ",
559        deleted_tag_id,
560        course_language_group_id
561    )
562    .execute(&mut *conn)
563    .await?;
564
565    Ok(())
566}