headless_lms_models/
marketing_consents.rs

1use itertools::multiunzip;
2use serde_json::json;
3
4use crate::prelude::*;
5
6#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
7#[cfg_attr(feature = "ts_rs", derive(TS))]
8pub struct UserMarketingConsent {
9    pub id: Uuid,
10    pub course_id: Uuid,
11    pub course_language_group_id: Uuid,
12    pub user_id: Uuid,
13    pub user_mailchimp_id: Option<String>,
14    pub consent: bool,
15    pub email_subscription_in_mailchimp: Option<String>,
16    pub created_at: DateTime<Utc>,
17    pub updated_at: DateTime<Utc>,
18    pub deleted_at: Option<DateTime<Utc>>,
19    pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
20}
21
22#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
23#[cfg_attr(feature = "ts_rs", derive(TS))]
24pub struct UserMarketingConsentWithDetails {
25    pub id: Uuid,
26    pub course_id: Uuid,
27    pub course_language_group_id: Uuid,
28    pub user_id: Uuid,
29    pub user_mailchimp_id: Option<String>,
30    pub consent: bool,
31    pub email_subscription_in_mailchimp: Option<String>,
32    pub created_at: DateTime<Utc>,
33    pub updated_at: DateTime<Utc>,
34    pub deleted_at: Option<DateTime<Utc>>,
35    pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
36    pub first_name: Option<String>,
37    pub last_name: Option<String>,
38    pub email: String,
39    pub course_name: String,
40    pub locale: Option<String>,
41    pub completed_course_at: Option<DateTime<Utc>>,
42    pub research_consent: Option<bool>,
43    pub country: Option<String>,
44}
45
46#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
47#[cfg_attr(feature = "ts_rs", derive(TS))]
48pub struct UserEmailSubscription {
49    pub user_id: Uuid,
50    pub email: String,
51    pub email_subscription_in_mailchimp: Option<String>,
52    pub user_mailchimp_id: Option<String>,
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
56#[cfg_attr(feature = "ts_rs", derive(TS))]
57pub struct MarketingMailingListAccessToken {
58    pub id: Uuid,
59    pub course_id: Uuid,
60    pub mailchimp_mailing_list_id: String,
61    pub course_language_group_id: Uuid,
62    pub server_prefix: String,
63    pub access_token: String,
64    pub created_at: DateTime<Utc>,
65    pub updated_at: DateTime<Utc>,
66    pub deleted_at: Option<DateTime<Utc>>,
67}
68
69#[derive(Debug, FromRow)]
70pub struct MailchimpCourseTag {
71    pub id: Uuid,
72    pub marketing_mailing_list_access_token_id: Uuid,
73    pub course_language_group_id: Uuid,
74    pub tag_name: String,
75    pub tag_id: String,
76    pub created_at: DateTime<Utc>,
77    pub updated_at: DateTime<Utc>,
78    pub deleted_at: Option<DateTime<Utc>>,
79}
80
81pub async fn upsert_marketing_consent(
82    conn: &mut PgConnection,
83    course_id: Uuid,
84    course_language_group_id: Uuid,
85    user_id: &Uuid,
86    email_subscription: &str,
87    marketing_consent: bool,
88) -> sqlx::Result<Uuid> {
89    let result = sqlx::query!(
90        r#"
91      INSERT INTO user_marketing_consents (user_id, course_id, course_language_group_id, consent, email_subscription_in_mailchimp)
92      VALUES ($1, $2, $3, $4, $5)
93      ON CONFLICT (user_id, course_language_group_id)
94      DO UPDATE
95      SET
96        consent = $4,
97        email_subscription_in_mailchimp = $5
98      RETURNING id
99      "#,
100        user_id,
101        course_id,
102        course_language_group_id,
103        marketing_consent,
104        email_subscription
105    )
106    .fetch_one(conn)
107    .await?;
108
109    Ok(result.id)
110}
111
112pub async fn fetch_user_marketing_consent(
113    conn: &mut PgConnection,
114    course_id: Uuid,
115    user_id: &Uuid,
116) -> sqlx::Result<UserMarketingConsent> {
117    let result = sqlx::query_as!(
118        UserMarketingConsent,
119        "
120    SELECT *
121    FROM user_marketing_consents
122    WHERE user_id = $1 AND course_id = $2
123    ",
124        user_id,
125        course_id,
126    )
127    .fetch_one(conn)
128    .await?;
129
130    Ok(result)
131}
132
133/// Fetches all user marketing consents with detailed user information for a specific course language group, if they haven't been synced to Mailchimp or if there have been updates since the last sync.
134pub async fn fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
135    conn: &mut PgConnection,
136    course_language_group_id: Uuid,
137) -> sqlx::Result<Vec<UserMarketingConsentWithDetails>> {
138    let result = sqlx::query_as!(
139        UserMarketingConsentWithDetails,
140        "
141    SELECT
142        umc.id,
143        umc.course_id,
144        umc.course_language_group_id,
145        umc.user_id,
146        umc.user_mailchimp_id,
147        umc.consent,
148        umc.email_subscription_in_mailchimp,
149        umc.created_at,
150        umc.updated_at,
151        umc.deleted_at,
152        umc.synced_to_mailchimp_at,
153        u.first_name AS first_name,
154        u.last_name AS last_name,
155        u.country AS country,
156        u.email AS email,
157        c.name AS course_name,
158        c.language_code AS locale,
159        CASE WHEN cmc.passed IS NOT NULL THEN cmc.completion_date ELSE NULL END AS completed_course_at,
160        COALESCE(csfa.research_consent, urc.research_consent) AS research_consent
161    FROM user_marketing_consents AS umc
162    JOIN user_details AS u ON u.user_id = umc.user_id
163    JOIN courses AS c ON c.id = umc.course_id
164    LEFT JOIN course_module_completions AS cmc
165        ON cmc.user_id = umc.user_id AND cmc.course_id = umc.course_id
166     LEFT JOIN course_specific_consent_form_answers AS csfa
167        ON csfa.course_id = umc.course_id AND csfa.user_id = umc.user_id
168    LEFT JOIN user_research_consents AS urc
169        ON urc.user_id = umc.user_id
170    LEFT JOIN mailchimp_course_tags AS tags
171        ON tags.course_language_group_id = umc.course_language_group_id
172    WHERE umc.course_language_group_id = $1
173    AND (
174        umc.synced_to_mailchimp_at IS NULL
175        OR umc.synced_to_mailchimp_at < umc.updated_at
176        OR csfa.updated_at > umc.synced_to_mailchimp_at
177        OR urc.updated_at > umc.synced_to_mailchimp_at
178        OR cmc.updated_at > umc.synced_to_mailchimp_at
179        OR u.updated_at > umc.synced_to_mailchimp_at
180        OR EXISTS (
181            SELECT 1
182            FROM mailchimp_course_tags
183            WHERE mailchimp_course_tags.course_language_group_id = umc.course_language_group_id
184            AND mailchimp_course_tags.updated_at > umc.synced_to_mailchimp_at
185            AND deleted_at IS NULL
186        )
187    )
188    ",
189        course_language_group_id
190    )
191    .fetch_all(conn)
192    .await?;
193
194    Ok(result)
195}
196
197/// Fetches email, email subscription status and user ID for users whose details have been updated after their marketing consent was last synced to Mailchimp
198pub async fn fetch_all_unsynced_updated_emails(
199    conn: &mut PgConnection,
200    course_language_group_id: Uuid,
201) -> sqlx::Result<Vec<UserEmailSubscription>> {
202    let result = sqlx::query_as!(
203        UserEmailSubscription,
204        "
205    SELECT
206        umc.user_id,
207        u.email AS email,
208        umc.email_subscription_in_mailchimp,
209        umc.user_mailchimp_id
210    FROM user_marketing_consents AS umc
211    JOIN user_details AS u ON u.user_id = umc.user_id
212    WHERE umc.course_language_group_id = $1
213    AND umc.synced_to_mailchimp_at < u.updated_at
214    ",
215        course_language_group_id
216    )
217    .fetch_all(conn)
218    .await?;
219
220    Ok(result)
221}
222
223/// Used to update the synced_to_mailchimp_at to a list of users when they are successfully synced to mailchimp
224pub async fn update_synced_to_mailchimp_at_to_all_synced_users(
225    conn: &mut PgConnection,
226    ids: &[Uuid],
227) -> ModelResult<()> {
228    sqlx::query!(
229        "
230UPDATE user_marketing_consents
231SET synced_to_mailchimp_at = now()
232WHERE user_id IN (
233    SELECT UNNEST($1::uuid [])
234  )
235",
236        &ids
237    )
238    .execute(conn)
239    .await?;
240    Ok(())
241}
242
243/// Used to add the user_mailchimp_ids to a list of new users when they are successfully synced to mailchimp
244pub async fn update_user_mailchimp_id_at_to_all_synced_users(
245    pool: &mut PgConnection,
246    user_contact_pairs: Vec<(String, String)>,
247) -> ModelResult<()> {
248    let (user_ids_raw, user_mailchimp_ids): (Vec<_>, Vec<_>) =
249        user_contact_pairs.into_iter().unzip();
250
251    // Parse user_ids into Uuid
252    let user_ids: Vec<Uuid> = user_ids_raw
253        .into_iter()
254        .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
255        .collect();
256
257    sqlx::query!(
258        "
259UPDATE user_marketing_consents
260SET user_mailchimp_id = updated_data.user_mailchimp_id
261FROM (
262    SELECT UNNEST($1::uuid[]) AS user_id, UNNEST($2::text[]) AS user_mailchimp_id
263) AS updated_data
264WHERE user_marketing_consents.user_id = updated_data.user_id
265",
266        &user_ids,
267        &user_mailchimp_ids
268    )
269    .execute(pool)
270    .await?;
271    Ok(())
272}
273
274/// Updates user consents to false in bulk using Mailchimp data.
275pub async fn update_unsubscribed_users_from_mailchimp_in_bulk(
276    conn: &mut PgConnection,
277    mailchimp_data: Vec<(String, String, String, String)>,
278) -> anyhow::Result<()> {
279    let (
280        user_ids_raw,
281        timestamps_raw,
282        course_language_group_ids_raw,
283        email_subscriptions_in_mailchimp,
284    ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>) = multiunzip(mailchimp_data);
285
286    let user_ids: Vec<Uuid> = user_ids_raw
287        .into_iter()
288        .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
289        .collect();
290
291    let timestamps: Vec<DateTime<Utc>> = timestamps_raw
292        .into_iter()
293        .filter_map(|ts| {
294            DateTime::parse_from_rfc3339(&ts)
295                .ok()
296                .map(|dt| dt.with_timezone(&Utc))
297        })
298        .collect();
299
300    let course_language_group_ids: Vec<Uuid> = course_language_group_ids_raw
301        .into_iter()
302        .filter_map(|lang_id| Uuid::parse_str(&lang_id).ok())
303        .collect();
304
305    sqlx::query!(
306        "
307    UPDATE user_marketing_consents
308    SET consent = false,
309        email_subscription_in_mailchimp = updated_data.email_subscription_in_mailchimp,
310        synced_to_mailchimp_at = updated_data.last_updated
311    FROM (
312        SELECT UNNEST($1::Uuid[]) AS user_id,
313               UNNEST($2::timestamptz[]) AS last_updated,
314               UNNEST($3::Uuid[]) AS course_language_group_id,
315               UNNEST($4::text[]) AS email_subscription_in_mailchimp
316
317    ) AS updated_data
318    WHERE user_marketing_consents.user_id = updated_data.user_id
319      AND user_marketing_consents.consent = true
320      AND user_marketing_consents.synced_to_mailchimp_at < updated_data.last_updated
321      AND user_marketing_consents.course_language_group_id = updated_data.course_language_group_id
322    ",
323        &user_ids,
324        &timestamps,
325        &course_language_group_ids,
326        &email_subscriptions_in_mailchimp
327    )
328    .execute(conn)
329    .await?;
330    Ok(())
331}
332
333pub async fn fetch_all_marketing_mailing_list_access_tokens(
334    conn: &mut PgConnection,
335) -> sqlx::Result<Vec<MarketingMailingListAccessToken>> {
336    let results = sqlx::query_as!(
337        MarketingMailingListAccessToken,
338        "
339    SELECT
340      id,
341      course_id,
342      course_language_group_id,
343      server_prefix,
344      access_token,
345      mailchimp_mailing_list_id,
346      created_at,
347      updated_at,
348      deleted_at
349    FROM marketing_mailing_list_access_tokens
350    "
351    )
352    .fetch_all(conn)
353    .await?;
354
355    Ok(results)
356}
357
358pub async fn fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
359    conn: &mut PgConnection,
360    course_language_group_id: Uuid,
361    marketing_mailing_access_token_id: Uuid,
362) -> sqlx::Result<Vec<serde_json::Value>> {
363    let results = sqlx::query!(
364        "
365        SELECT
366          tag_name,
367          tag_id
368        FROM mailchimp_course_tags
369        WHERE course_language_group_id = $1
370        AND marketing_mailing_list_access_token_id = $2
371        AND deleted_at IS NULL
372        ",
373        course_language_group_id,
374        marketing_mailing_access_token_id
375    )
376    .fetch_all(conn)
377    .await?;
378
379    let tag_objects: Vec<serde_json::Value> = results
380        .into_iter()
381        .filter_map(|row| {
382            let tag_name = row.tag_name.trim();
383            let tag_id = row.tag_id.trim();
384
385            if tag_name.is_empty() {
386                return None;
387            }
388
389            Some(json!({
390                "name": tag_name,
391                "id": tag_id,
392                "status": "active"
393            }))
394        })
395        .collect();
396
397    Ok(tag_objects)
398}
399
400pub async fn upsert_tag(
401    conn: &mut PgConnection,
402    course_language_group_id: Uuid,
403    marketing_mailing_access_token_id: Uuid,
404    tag_id: String,
405    tag_name: String,
406) -> sqlx::Result<()> {
407    sqlx::query!(
408        "
409        INSERT INTO mailchimp_course_tags (
410          tag_name,
411          tag_id,
412          course_language_group_id,
413          marketing_mailing_list_access_token_id
414        )
415        VALUES ($1, $2, $3, $4)
416        ON CONFLICT (course_language_group_id, tag_name)
417        DO UPDATE
418        SET
419          tag_name = EXCLUDED.tag_name
420        ",
421        tag_name,
422        tag_id,
423        course_language_group_id,
424        marketing_mailing_access_token_id,
425    )
426    .execute(&mut *conn)
427    .await?;
428    Ok(())
429}
430
431pub async fn delete_tag(
432    conn: &mut PgConnection,
433    deleted_tag_id: String,
434    course_language_group_id: Uuid,
435) -> sqlx::Result<()> {
436    sqlx::query!(
437        "
438        UPDATE mailchimp_course_tags
439        SET deleted_at = now()
440        WHERE tag_id = $1
441        AND course_language_group_id = $2
442        AND deleted_at IS NULL
443        ",
444        deleted_tag_id,
445        course_language_group_id
446    )
447    .execute(&mut *conn)
448    .await?;
449
450    Ok(())
451}