headless_lms_models/
marketing_consents.rs

1use itertools::multiunzip;
2use serde_json::json;
3
4use crate::prelude::*;
5
6#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
7#[cfg_attr(feature = "ts_rs", derive(TS))]
8pub struct UserMarketingConsent {
9    pub id: Uuid,
10    pub course_id: Uuid,
11    pub course_language_group_id: Uuid,
12    pub user_id: Uuid,
13    pub user_mailchimp_id: Option<String>,
14    pub consent: bool,
15    pub email_subscription_in_mailchimp: Option<String>,
16    pub created_at: DateTime<Utc>,
17    pub updated_at: DateTime<Utc>,
18    pub deleted_at: Option<DateTime<Utc>>,
19    pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
20}
21
22#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
23#[cfg_attr(feature = "ts_rs", derive(TS))]
24pub struct UserMarketingConsentWithDetails {
25    pub id: Uuid,
26    pub course_id: Uuid,
27    pub course_language_group_id: Uuid,
28    pub user_id: Uuid,
29    pub user_mailchimp_id: Option<String>,
30    pub consent: bool,
31    pub email_subscription_in_mailchimp: Option<String>,
32    pub created_at: DateTime<Utc>,
33    pub updated_at: DateTime<Utc>,
34    pub deleted_at: Option<DateTime<Utc>>,
35    pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
36    pub first_name: Option<String>,
37    pub last_name: Option<String>,
38    pub email: String,
39    pub course_name: String,
40    pub locale: Option<String>,
41    pub completed_course_at: Option<DateTime<Utc>>,
42    pub research_consent: Option<bool>,
43    pub country: Option<String>,
44}
45
46#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
47#[cfg_attr(feature = "ts_rs", derive(TS))]
48pub struct UserEmailSubscription {
49    pub user_id: Uuid,
50    pub email: String,
51    pub email_subscription_in_mailchimp: Option<String>,
52    pub user_mailchimp_id: Option<String>,
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
56#[cfg_attr(feature = "ts_rs", derive(TS))]
57pub struct MarketingMailingListAccessToken {
58    pub id: Uuid,
59    pub course_id: Uuid,
60    pub mailchimp_mailing_list_id: String,
61    pub course_language_group_id: Uuid,
62    pub server_prefix: String,
63    pub access_token: String,
64    pub created_at: DateTime<Utc>,
65    pub updated_at: DateTime<Utc>,
66    pub deleted_at: Option<DateTime<Utc>>,
67}
68
69#[derive(Debug, FromRow)]
70pub struct MailchimpCourseTag {
71    pub id: Uuid,
72    pub marketing_mailing_list_access_token_id: Uuid,
73    pub course_language_group_id: Uuid,
74    pub tag_name: String,
75    pub tag_id: String,
76    pub created_at: DateTime<Utc>,
77    pub updated_at: DateTime<Utc>,
78    pub deleted_at: Option<DateTime<Utc>>,
79}
80
81#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
82#[cfg_attr(feature = "ts_rs", derive(TS))]
83pub struct MailchimpLanguageCodeMapping {
84    pub id: Uuid,
85    pub marketing_mailing_list_access_token_id: Uuid,
86    pub our_language_code: String,
87    pub mailchimp_language_code: String,
88    pub created_at: DateTime<Utc>,
89    pub updated_at: DateTime<Utc>,
90    pub deleted_at: Option<DateTime<Utc>>,
91}
92
93pub async fn upsert_marketing_consent(
94    conn: &mut PgConnection,
95    course_id: Uuid,
96    course_language_group_id: Uuid,
97    user_id: &Uuid,
98    email_subscription: &str,
99    marketing_consent: bool,
100) -> sqlx::Result<Uuid> {
101    let result = sqlx::query!(
102        r#"
103      INSERT INTO user_marketing_consents (user_id, course_id, course_language_group_id, consent, email_subscription_in_mailchimp)
104      VALUES ($1, $2, $3, $4, $5)
105      ON CONFLICT (user_id, course_language_group_id)
106      DO UPDATE
107      SET
108        course_id = $2,
109        consent = $4,
110        email_subscription_in_mailchimp = $5
111      RETURNING id
112      "#,
113        user_id,
114        course_id,
115        course_language_group_id,
116        marketing_consent,
117        email_subscription
118    )
119    .fetch_one(conn)
120    .await?;
121
122    Ok(result.id)
123}
124
125pub async fn fetch_user_marketing_consent(
126    conn: &mut PgConnection,
127    course_id: Uuid,
128    user_id: &Uuid,
129) -> sqlx::Result<UserMarketingConsent> {
130    let result = sqlx::query_as!(
131        UserMarketingConsent,
132        "
133    SELECT *
134    FROM user_marketing_consents
135    WHERE user_id = $1 AND course_id = $2
136    ",
137        user_id,
138        course_id,
139    )
140    .fetch_one(conn)
141    .await?;
142
143    Ok(result)
144}
145
146/// Fetches all user marketing consents with detailed user information for a specific course language group, if they haven't been synced to Mailchimp or if there have been updates since the last sync.
147pub async fn fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
148    conn: &mut PgConnection,
149    course_language_group_id: Uuid,
150) -> sqlx::Result<Vec<UserMarketingConsentWithDetails>> {
151    let result = sqlx::query_as!(
152        UserMarketingConsentWithDetails,
153        r#"
154    SELECT
155        umc.id,
156        umc.course_id,
157        umc.course_language_group_id,
158        umc.user_id,
159        umc.user_mailchimp_id,
160        umc.consent,
161        umc.email_subscription_in_mailchimp,
162        umc.created_at,
163        umc.updated_at,
164        umc.deleted_at,
165        umc.synced_to_mailchimp_at,
166        u.first_name AS first_name,
167        u.last_name AS last_name,
168        u.country AS country,
169        u.email AS email,
170        c.name AS course_name,
171        COALESCE(
172            mlcm.mailchimp_language_code,
173            c.language_code
174        ) AS locale,
175        CASE WHEN cmc.passed IS NOT NULL THEN cmc.completion_date ELSE NULL END AS completed_course_at,
176        COALESCE(csfa.research_consent, urc.research_consent) AS research_consent
177    FROM user_marketing_consents AS umc
178    JOIN user_details AS u ON u.user_id = umc.user_id
179    JOIN courses AS c ON c.id = umc.course_id
180    LEFT JOIN course_module_completions AS cmc
181        ON cmc.user_id = umc.user_id AND cmc.course_id = umc.course_id
182    LEFT JOIN course_specific_consent_form_answers AS csfa
183        ON csfa.course_id = umc.course_id AND csfa.user_id = umc.user_id
184    LEFT JOIN user_research_consents AS urc
185        ON urc.user_id = umc.user_id
186    LEFT JOIN mailchimp_course_tags AS tags
187        ON tags.course_language_group_id = umc.course_language_group_id
188    LEFT JOIN marketing_mailing_list_access_tokens AS mmlat
189        ON mmlat.course_language_group_id = umc.course_language_group_id AND mmlat.deleted_at IS NULL
190    LEFT JOIN mailchimp_language_code_mappings AS mlcm
191        ON mlcm.marketing_mailing_list_access_token_id = mmlat.id
192        AND mlcm.our_language_code = c.language_code
193        AND mlcm.deleted_at IS NULL
194    WHERE umc.course_language_group_id = $1
195    AND (
196        umc.synced_to_mailchimp_at IS NULL
197        OR umc.synced_to_mailchimp_at < umc.updated_at
198        OR csfa.updated_at > umc.synced_to_mailchimp_at
199        OR urc.updated_at > umc.synced_to_mailchimp_at
200        OR cmc.updated_at > umc.synced_to_mailchimp_at
201        OR u.updated_at > umc.synced_to_mailchimp_at
202        OR c.updated_at > umc.synced_to_mailchimp_at
203        OR mlcm.updated_at > umc.synced_to_mailchimp_at
204        OR EXISTS (
205            SELECT 1
206            FROM mailchimp_course_tags
207            WHERE mailchimp_course_tags.course_language_group_id = umc.course_language_group_id
208            AND mailchimp_course_tags.updated_at > umc.synced_to_mailchimp_at
209            AND deleted_at IS NULL
210        )
211    )
212    "#,
213        course_language_group_id
214    )
215    .fetch_all(conn)
216    .await?;
217
218    Ok(result)
219}
220
221/// Fetches email, email subscription status and user ID for users whose details have been updated after their marketing consent was last synced to Mailchimp
222pub async fn fetch_all_unsynced_updated_emails(
223    conn: &mut PgConnection,
224    course_language_group_id: Uuid,
225) -> sqlx::Result<Vec<UserEmailSubscription>> {
226    let result = sqlx::query_as!(
227        UserEmailSubscription,
228        "
229    SELECT
230        umc.user_id,
231        u.email AS email,
232        umc.email_subscription_in_mailchimp,
233        umc.user_mailchimp_id
234    FROM user_marketing_consents AS umc
235    JOIN user_details AS u ON u.user_id = umc.user_id
236    WHERE umc.course_language_group_id = $1
237    AND umc.synced_to_mailchimp_at < u.updated_at
238    ",
239        course_language_group_id
240    )
241    .fetch_all(conn)
242    .await?;
243
244    Ok(result)
245}
246
247/// Used to update the synced_to_mailchimp_at to a list of users when they are successfully synced to mailchimp
248pub async fn update_synced_to_mailchimp_at_to_all_synced_users(
249    conn: &mut PgConnection,
250    ids: &[Uuid],
251) -> ModelResult<()> {
252    sqlx::query!(
253        "
254UPDATE user_marketing_consents
255SET synced_to_mailchimp_at = now()
256WHERE user_id IN (
257    SELECT UNNEST($1::uuid [])
258  )
259",
260        &ids
261    )
262    .execute(conn)
263    .await?;
264    Ok(())
265}
266
267/// Used to add the user_mailchimp_ids to a list of new users when they are successfully synced to mailchimp
268pub async fn update_user_mailchimp_id_at_to_all_synced_users(
269    pool: &mut PgConnection,
270    user_contact_pairs: Vec<(String, String)>,
271) -> ModelResult<()> {
272    let (user_ids_raw, user_mailchimp_ids): (Vec<_>, Vec<_>) =
273        user_contact_pairs.into_iter().unzip();
274
275    // Parse user_ids into Uuid
276    let user_ids: Vec<Uuid> = user_ids_raw
277        .into_iter()
278        .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
279        .collect();
280
281    sqlx::query!(
282        "
283UPDATE user_marketing_consents
284SET user_mailchimp_id = updated_data.user_mailchimp_id
285FROM (
286    SELECT UNNEST($1::uuid[]) AS user_id, UNNEST($2::text[]) AS user_mailchimp_id
287) AS updated_data
288WHERE user_marketing_consents.user_id = updated_data.user_id
289",
290        &user_ids,
291        &user_mailchimp_ids
292    )
293    .execute(pool)
294    .await?;
295    Ok(())
296}
297
298/// Updates user consents to false in bulk using Mailchimp data.
299pub async fn update_unsubscribed_users_from_mailchimp_in_bulk(
300    conn: &mut PgConnection,
301    mailchimp_data: Vec<(String, String, String, String)>,
302) -> anyhow::Result<()> {
303    let (
304        user_ids_raw,
305        timestamps_raw,
306        course_language_group_ids_raw,
307        email_subscriptions_in_mailchimp,
308    ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>) = multiunzip(mailchimp_data);
309
310    let user_ids: Vec<Uuid> = user_ids_raw
311        .into_iter()
312        .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
313        .collect();
314
315    let timestamps: Vec<DateTime<Utc>> = timestamps_raw
316        .into_iter()
317        .filter_map(|ts| {
318            DateTime::parse_from_rfc3339(&ts)
319                .ok()
320                .map(|dt| dt.with_timezone(&Utc))
321        })
322        .collect();
323
324    let course_language_group_ids: Vec<Uuid> = course_language_group_ids_raw
325        .into_iter()
326        .filter_map(|lang_id| Uuid::parse_str(&lang_id).ok())
327        .collect();
328
329    sqlx::query!(
330        "
331    UPDATE user_marketing_consents
332    SET consent = false,
333        email_subscription_in_mailchimp = updated_data.email_subscription_in_mailchimp,
334        synced_to_mailchimp_at = updated_data.last_updated
335    FROM (
336        SELECT UNNEST($1::Uuid[]) AS user_id,
337               UNNEST($2::timestamptz[]) AS last_updated,
338               UNNEST($3::Uuid[]) AS course_language_group_id,
339               UNNEST($4::text[]) AS email_subscription_in_mailchimp
340
341    ) AS updated_data
342    WHERE user_marketing_consents.user_id = updated_data.user_id
343      AND user_marketing_consents.consent = true
344      AND user_marketing_consents.synced_to_mailchimp_at < updated_data.last_updated
345      AND user_marketing_consents.course_language_group_id = updated_data.course_language_group_id
346    ",
347        &user_ids,
348        &timestamps,
349        &course_language_group_ids,
350        &email_subscriptions_in_mailchimp
351    )
352    .execute(conn)
353    .await?;
354    Ok(())
355}
356
357pub async fn fetch_all_marketing_mailing_list_access_tokens(
358    conn: &mut PgConnection,
359) -> sqlx::Result<Vec<MarketingMailingListAccessToken>> {
360    let results = sqlx::query_as!(
361        MarketingMailingListAccessToken,
362        "
363    SELECT
364      id,
365      course_id,
366      course_language_group_id,
367      server_prefix,
368      access_token,
369      mailchimp_mailing_list_id,
370      created_at,
371      updated_at,
372      deleted_at
373    FROM marketing_mailing_list_access_tokens
374    "
375    )
376    .fetch_all(conn)
377    .await?;
378
379    Ok(results)
380}
381
382pub async fn fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
383    conn: &mut PgConnection,
384    course_language_group_id: Uuid,
385    marketing_mailing_access_token_id: Uuid,
386) -> sqlx::Result<Vec<serde_json::Value>> {
387    let results = sqlx::query!(
388        "
389        SELECT
390          tag_name,
391          tag_id
392        FROM mailchimp_course_tags
393        WHERE course_language_group_id = $1
394        AND marketing_mailing_list_access_token_id = $2
395        AND deleted_at IS NULL
396        ",
397        course_language_group_id,
398        marketing_mailing_access_token_id
399    )
400    .fetch_all(conn)
401    .await?;
402
403    let tag_objects: Vec<serde_json::Value> = results
404        .into_iter()
405        .filter_map(|row| {
406            let tag_name = row.tag_name.trim();
407            let tag_id = row.tag_id.trim();
408
409            if tag_name.is_empty() {
410                return None;
411            }
412
413            Some(json!({
414                "name": tag_name,
415                "id": tag_id,
416                "status": "active"
417            }))
418        })
419        .collect();
420
421    Ok(tag_objects)
422}
423
424pub async fn upsert_tag(
425    conn: &mut PgConnection,
426    course_language_group_id: Uuid,
427    marketing_mailing_access_token_id: Uuid,
428    tag_id: String,
429    tag_name: String,
430) -> sqlx::Result<()> {
431    sqlx::query!(
432        "
433        INSERT INTO mailchimp_course_tags (
434          tag_name,
435          tag_id,
436          course_language_group_id,
437          marketing_mailing_list_access_token_id
438        )
439        VALUES ($1, $2, $3, $4)
440        ON CONFLICT (course_language_group_id, tag_name)
441        DO UPDATE
442        SET
443          tag_name = EXCLUDED.tag_name
444        ",
445        tag_name,
446        tag_id,
447        course_language_group_id,
448        marketing_mailing_access_token_id,
449    )
450    .execute(&mut *conn)
451    .await?;
452    Ok(())
453}
454
455pub async fn delete_tag(
456    conn: &mut PgConnection,
457    deleted_tag_id: String,
458    course_language_group_id: Uuid,
459) -> sqlx::Result<()> {
460    sqlx::query!(
461        "
462        UPDATE mailchimp_course_tags
463        SET deleted_at = now()
464        WHERE tag_id = $1
465        AND course_language_group_id = $2
466        AND deleted_at IS NULL
467        ",
468        deleted_tag_id,
469        course_language_group_id
470    )
471    .execute(&mut *conn)
472    .await?;
473
474    Ok(())
475}