1use itertools::multiunzip;
2use serde_json::json;
3
4use crate::prelude::*;
5
6#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
7#[cfg_attr(feature = "ts_rs", derive(TS))]
8pub struct UserMarketingConsent {
9 pub id: Uuid,
10 pub course_id: Uuid,
11 pub course_language_group_id: Uuid,
12 pub user_id: Uuid,
13 pub user_mailchimp_id: Option<String>,
14 pub consent: bool,
15 pub email_subscription_in_mailchimp: Option<String>,
16 pub created_at: DateTime<Utc>,
17 pub updated_at: DateTime<Utc>,
18 pub deleted_at: Option<DateTime<Utc>>,
19 pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
20}
21
22#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
23#[cfg_attr(feature = "ts_rs", derive(TS))]
24pub struct UserMarketingConsentWithDetails {
25 pub id: Uuid,
26 pub course_id: Uuid,
27 pub course_language_group_id: Uuid,
28 pub user_id: Uuid,
29 pub user_mailchimp_id: Option<String>,
30 pub consent: bool,
31 pub email_subscription_in_mailchimp: Option<String>,
32 pub created_at: DateTime<Utc>,
33 pub updated_at: DateTime<Utc>,
34 pub deleted_at: Option<DateTime<Utc>>,
35 pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
36 pub first_name: Option<String>,
37 pub last_name: Option<String>,
38 pub email: String,
39 pub course_name: String,
40 pub locale: Option<String>,
41 pub completed_course_at: Option<DateTime<Utc>>,
42 pub research_consent: Option<bool>,
43 pub country: Option<String>,
44}
45
46#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
47#[cfg_attr(feature = "ts_rs", derive(TS))]
48pub struct UserEmailSubscription {
49 pub user_id: Uuid,
50 pub email: String,
51 pub email_subscription_in_mailchimp: Option<String>,
52 pub user_mailchimp_id: Option<String>,
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
56#[cfg_attr(feature = "ts_rs", derive(TS))]
57pub struct MarketingMailingListAccessToken {
58 pub id: Uuid,
59 pub course_id: Uuid,
60 pub mailchimp_mailing_list_id: String,
61 pub course_language_group_id: Uuid,
62 pub server_prefix: String,
63 pub access_token: String,
64 pub created_at: DateTime<Utc>,
65 pub updated_at: DateTime<Utc>,
66 pub deleted_at: Option<DateTime<Utc>>,
67}
68
69#[derive(Debug, FromRow)]
70pub struct MailchimpCourseTag {
71 pub id: Uuid,
72 pub marketing_mailing_list_access_token_id: Uuid,
73 pub course_language_group_id: Uuid,
74 pub tag_name: String,
75 pub tag_id: String,
76 pub created_at: DateTime<Utc>,
77 pub updated_at: DateTime<Utc>,
78 pub deleted_at: Option<DateTime<Utc>>,
79}
80
81#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
82#[cfg_attr(feature = "ts_rs", derive(TS))]
83pub struct MailchimpLanguageCodeMapping {
84 pub id: Uuid,
85 pub marketing_mailing_list_access_token_id: Uuid,
86 pub our_language_code: String,
87 pub mailchimp_language_code: String,
88 pub created_at: DateTime<Utc>,
89 pub updated_at: DateTime<Utc>,
90 pub deleted_at: Option<DateTime<Utc>>,
91}
92
93pub async fn upsert_marketing_consent(
94 conn: &mut PgConnection,
95 course_id: Uuid,
96 course_language_group_id: Uuid,
97 user_id: &Uuid,
98 email_subscription: &str,
99 marketing_consent: bool,
100) -> sqlx::Result<Uuid> {
101 let result = sqlx::query!(
102 r#"
103 INSERT INTO user_marketing_consents (user_id, course_id, course_language_group_id, consent, email_subscription_in_mailchimp)
104 VALUES ($1, $2, $3, $4, $5)
105 ON CONFLICT (user_id, course_language_group_id)
106 DO UPDATE
107 SET
108 course_id = $2,
109 consent = $4,
110 email_subscription_in_mailchimp = $5
111 RETURNING id
112 "#,
113 user_id,
114 course_id,
115 course_language_group_id,
116 marketing_consent,
117 email_subscription
118 )
119 .fetch_one(conn)
120 .await?;
121
122 Ok(result.id)
123}
124
125pub async fn fetch_user_marketing_consent(
126 conn: &mut PgConnection,
127 course_id: Uuid,
128 user_id: &Uuid,
129) -> sqlx::Result<UserMarketingConsent> {
130 let result = sqlx::query_as!(
131 UserMarketingConsent,
132 "
133 SELECT *
134 FROM user_marketing_consents
135 WHERE user_id = $1 AND course_id = $2
136 ",
137 user_id,
138 course_id,
139 )
140 .fetch_one(conn)
141 .await?;
142
143 Ok(result)
144}
145
146pub async fn fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
148 conn: &mut PgConnection,
149 course_language_group_id: Uuid,
150) -> sqlx::Result<Vec<UserMarketingConsentWithDetails>> {
151 let result = sqlx::query_as!(
152 UserMarketingConsentWithDetails,
153 r#"
154 SELECT
155 umc.id,
156 umc.course_id,
157 umc.course_language_group_id,
158 umc.user_id,
159 umc.user_mailchimp_id,
160 umc.consent,
161 umc.email_subscription_in_mailchimp,
162 umc.created_at,
163 umc.updated_at,
164 umc.deleted_at,
165 umc.synced_to_mailchimp_at,
166 u.first_name AS first_name,
167 u.last_name AS last_name,
168 u.country AS country,
169 u.email AS email,
170 c.name AS course_name,
171 COALESCE(
172 mlcm.mailchimp_language_code,
173 c.language_code
174 ) AS locale,
175 CASE WHEN cmc.passed IS NOT NULL THEN cmc.completion_date ELSE NULL END AS completed_course_at,
176 COALESCE(csfa.research_consent, urc.research_consent) AS research_consent
177 FROM user_marketing_consents AS umc
178 JOIN user_details AS u ON u.user_id = umc.user_id
179 JOIN courses AS c ON c.id = umc.course_id
180 LEFT JOIN course_module_completions AS cmc
181 ON cmc.user_id = umc.user_id AND cmc.course_id = umc.course_id
182 LEFT JOIN course_specific_consent_form_answers AS csfa
183 ON csfa.course_id = umc.course_id AND csfa.user_id = umc.user_id
184 LEFT JOIN user_research_consents AS urc
185 ON urc.user_id = umc.user_id
186 LEFT JOIN mailchimp_course_tags AS tags
187 ON tags.course_language_group_id = umc.course_language_group_id
188 LEFT JOIN marketing_mailing_list_access_tokens AS mmlat
189 ON mmlat.course_language_group_id = umc.course_language_group_id AND mmlat.deleted_at IS NULL
190 LEFT JOIN mailchimp_language_code_mappings AS mlcm
191 ON mlcm.marketing_mailing_list_access_token_id = mmlat.id
192 AND mlcm.our_language_code = c.language_code
193 AND mlcm.deleted_at IS NULL
194 WHERE umc.course_language_group_id = $1
195 AND (
196 umc.synced_to_mailchimp_at IS NULL
197 OR umc.synced_to_mailchimp_at < umc.updated_at
198 OR csfa.updated_at > umc.synced_to_mailchimp_at
199 OR urc.updated_at > umc.synced_to_mailchimp_at
200 OR cmc.updated_at > umc.synced_to_mailchimp_at
201 OR u.updated_at > umc.synced_to_mailchimp_at
202 OR c.updated_at > umc.synced_to_mailchimp_at
203 OR mlcm.updated_at > umc.synced_to_mailchimp_at
204 OR EXISTS (
205 SELECT 1
206 FROM mailchimp_course_tags
207 WHERE mailchimp_course_tags.course_language_group_id = umc.course_language_group_id
208 AND mailchimp_course_tags.updated_at > umc.synced_to_mailchimp_at
209 AND deleted_at IS NULL
210 )
211 )
212 "#,
213 course_language_group_id
214 )
215 .fetch_all(conn)
216 .await?;
217
218 Ok(result)
219}
220
221pub async fn fetch_all_unsynced_updated_emails(
223 conn: &mut PgConnection,
224 course_language_group_id: Uuid,
225) -> sqlx::Result<Vec<UserEmailSubscription>> {
226 let result = sqlx::query_as!(
227 UserEmailSubscription,
228 "
229 SELECT
230 umc.user_id,
231 u.email AS email,
232 umc.email_subscription_in_mailchimp,
233 umc.user_mailchimp_id
234 FROM user_marketing_consents AS umc
235 JOIN user_details AS u ON u.user_id = umc.user_id
236 WHERE umc.course_language_group_id = $1
237 AND umc.synced_to_mailchimp_at < u.updated_at
238 ",
239 course_language_group_id
240 )
241 .fetch_all(conn)
242 .await?;
243
244 Ok(result)
245}
246
247pub async fn update_synced_to_mailchimp_at_to_all_synced_users(
249 conn: &mut PgConnection,
250 ids: &[Uuid],
251) -> ModelResult<()> {
252 sqlx::query!(
253 "
254UPDATE user_marketing_consents
255SET synced_to_mailchimp_at = now()
256WHERE user_id IN (
257 SELECT UNNEST($1::uuid [])
258 )
259",
260 &ids
261 )
262 .execute(conn)
263 .await?;
264 Ok(())
265}
266
267pub async fn update_user_mailchimp_id_at_to_all_synced_users(
269 pool: &mut PgConnection,
270 user_contact_pairs: Vec<(String, String)>,
271) -> ModelResult<()> {
272 let (user_ids_raw, user_mailchimp_ids): (Vec<_>, Vec<_>) =
273 user_contact_pairs.into_iter().unzip();
274
275 let user_ids: Vec<Uuid> = user_ids_raw
277 .into_iter()
278 .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
279 .collect();
280
281 sqlx::query!(
282 "
283UPDATE user_marketing_consents
284SET user_mailchimp_id = updated_data.user_mailchimp_id
285FROM (
286 SELECT UNNEST($1::uuid[]) AS user_id, UNNEST($2::text[]) AS user_mailchimp_id
287) AS updated_data
288WHERE user_marketing_consents.user_id = updated_data.user_id
289",
290 &user_ids,
291 &user_mailchimp_ids
292 )
293 .execute(pool)
294 .await?;
295 Ok(())
296}
297
298pub async fn update_unsubscribed_users_from_mailchimp_in_bulk(
300 conn: &mut PgConnection,
301 mailchimp_data: Vec<(String, String, String, String)>,
302) -> anyhow::Result<()> {
303 let (
304 user_ids_raw,
305 timestamps_raw,
306 course_language_group_ids_raw,
307 email_subscriptions_in_mailchimp,
308 ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>) = multiunzip(mailchimp_data);
309
310 let user_ids: Vec<Uuid> = user_ids_raw
311 .into_iter()
312 .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
313 .collect();
314
315 let timestamps: Vec<DateTime<Utc>> = timestamps_raw
316 .into_iter()
317 .filter_map(|ts| {
318 DateTime::parse_from_rfc3339(&ts)
319 .ok()
320 .map(|dt| dt.with_timezone(&Utc))
321 })
322 .collect();
323
324 let course_language_group_ids: Vec<Uuid> = course_language_group_ids_raw
325 .into_iter()
326 .filter_map(|lang_id| Uuid::parse_str(&lang_id).ok())
327 .collect();
328
329 sqlx::query!(
330 "
331 UPDATE user_marketing_consents
332 SET consent = false,
333 email_subscription_in_mailchimp = updated_data.email_subscription_in_mailchimp,
334 synced_to_mailchimp_at = updated_data.last_updated
335 FROM (
336 SELECT UNNEST($1::Uuid[]) AS user_id,
337 UNNEST($2::timestamptz[]) AS last_updated,
338 UNNEST($3::Uuid[]) AS course_language_group_id,
339 UNNEST($4::text[]) AS email_subscription_in_mailchimp
340
341 ) AS updated_data
342 WHERE user_marketing_consents.user_id = updated_data.user_id
343 AND user_marketing_consents.consent = true
344 AND user_marketing_consents.synced_to_mailchimp_at < updated_data.last_updated
345 AND user_marketing_consents.course_language_group_id = updated_data.course_language_group_id
346 ",
347 &user_ids,
348 ×tamps,
349 &course_language_group_ids,
350 &email_subscriptions_in_mailchimp
351 )
352 .execute(conn)
353 .await?;
354 Ok(())
355}
356
357pub async fn fetch_all_marketing_mailing_list_access_tokens(
358 conn: &mut PgConnection,
359) -> sqlx::Result<Vec<MarketingMailingListAccessToken>> {
360 let results = sqlx::query_as!(
361 MarketingMailingListAccessToken,
362 "
363 SELECT
364 id,
365 course_id,
366 course_language_group_id,
367 server_prefix,
368 access_token,
369 mailchimp_mailing_list_id,
370 created_at,
371 updated_at,
372 deleted_at
373 FROM marketing_mailing_list_access_tokens
374 "
375 )
376 .fetch_all(conn)
377 .await?;
378
379 Ok(results)
380}
381
382pub async fn fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
383 conn: &mut PgConnection,
384 course_language_group_id: Uuid,
385 marketing_mailing_access_token_id: Uuid,
386) -> sqlx::Result<Vec<serde_json::Value>> {
387 let results = sqlx::query!(
388 "
389 SELECT
390 tag_name,
391 tag_id
392 FROM mailchimp_course_tags
393 WHERE course_language_group_id = $1
394 AND marketing_mailing_list_access_token_id = $2
395 AND deleted_at IS NULL
396 ",
397 course_language_group_id,
398 marketing_mailing_access_token_id
399 )
400 .fetch_all(conn)
401 .await?;
402
403 let tag_objects: Vec<serde_json::Value> = results
404 .into_iter()
405 .filter_map(|row| {
406 let tag_name = row.tag_name.trim();
407 let tag_id = row.tag_id.trim();
408
409 if tag_name.is_empty() {
410 return None;
411 }
412
413 Some(json!({
414 "name": tag_name,
415 "id": tag_id,
416 "status": "active"
417 }))
418 })
419 .collect();
420
421 Ok(tag_objects)
422}
423
424pub async fn upsert_tag(
425 conn: &mut PgConnection,
426 course_language_group_id: Uuid,
427 marketing_mailing_access_token_id: Uuid,
428 tag_id: String,
429 tag_name: String,
430) -> sqlx::Result<()> {
431 sqlx::query!(
432 "
433 INSERT INTO mailchimp_course_tags (
434 tag_name,
435 tag_id,
436 course_language_group_id,
437 marketing_mailing_list_access_token_id
438 )
439 VALUES ($1, $2, $3, $4)
440 ON CONFLICT (course_language_group_id, tag_name)
441 DO UPDATE
442 SET
443 tag_name = EXCLUDED.tag_name
444 ",
445 tag_name,
446 tag_id,
447 course_language_group_id,
448 marketing_mailing_access_token_id,
449 )
450 .execute(&mut *conn)
451 .await?;
452 Ok(())
453}
454
455pub async fn delete_tag(
456 conn: &mut PgConnection,
457 deleted_tag_id: String,
458 course_language_group_id: Uuid,
459) -> sqlx::Result<()> {
460 sqlx::query!(
461 "
462 UPDATE mailchimp_course_tags
463 SET deleted_at = now()
464 WHERE tag_id = $1
465 AND course_language_group_id = $2
466 AND deleted_at IS NULL
467 ",
468 deleted_tag_id,
469 course_language_group_id
470 )
471 .execute(&mut *conn)
472 .await?;
473
474 Ok(())
475}